TensorFlow|基于Transformer的自然语言推理(SNLI)

在经历了看论文,看源码,看Bert源码之后,整理思路,实现了一下Transformer,并搭建了一个小型的Transformer做了一下SNLI任务。

1.Transofrmer

原理不再重述,其他博客中讲的很好,

比如:https://jalammar.github.io/illustrated-transformer/

和他的翻译版:https://blog.csdn.net/qq_41664845/article/details/84969266

直接进入代码

1.1激活函数

Transformer原文中使用的都是Relu,但Bert包括之后的工作,大多采用的是Gelu(高斯误差线性单元),效果更好(只是参考了论文中的数据对比,还并未亲自实验对比)。

抱着举贤不举亲的原则,就算平时使用的大多Relu,在此也将默认的激活函数设为Gelu。

关于Gelu的原论文:https://arxiv.org/abs/1606.08415

Gelu:

def gelu(inputs):
    """
    gelu: https://arxiv.org/abs/1606.08415
    :param inputs: [Tensor]
    :return: [Tensor] outputs after activation
    """
    cdf = 0.5 * (1.0 + tf.tanh(tf.sqrt(2 / np.pi) * (inputs + 0.044715 * tf.pow(inputs, 3))))
    return inputs * cdf

获得激活函数的方法(设置默认gelu):

def get_activation(activation_name):
    """
    get activate function
    :param activation_name: [Tensor]
    :return: [Function] activation function
    """
    if activation_name is None:
        return gelu
    else:
        act = activation_name.lower()
        if act == "relu":
            return tf.nn.relu
        elif act == "gelu":
            return gelu
        elif act == "tanh":
            return tf.tanh
        else:
            raise ValueError("Unsupported activation: %s" % act)

1.2嵌入(embedding)

Transformer除了词嵌入,还做了位置嵌入(Positional Encoding),来使每个单词携带位置信息,否则可以想象它只是一个复杂一些的,通过训练获得每个单词权重的词袋模型了。

同时为了完成SNLI这类需要最终输出shape一致的任务,采用了Bert的想法,对每个输入的起始加入[CLS]token,使用该token的最终输出做预测,而这样做的话,需要加入segment embedding来更好的区分两个不同的句子(参考Bert)

1.2.1词嵌入(Word Embedding)

这里可以通过随机初始化嵌入矩阵,也可以通过载入其他任务(比如Glove,Fast text)产生的词嵌入矩阵来完成这部分,只需要在restore的时候声明一下即可。paper中提到需要对embedding做scale,这里照做。

TensorFlow|基于Transformer的自然语言推理(SNLI)

def get_embedding(inputs, vocab_size, channels, scale=True, scope="embedding", reuse=None):
    """
    embedding
    :param inputs: [Tensor] Tensor with first dimension of "batch_size"
    :param vocab_size: [Int] Vocabulary size
    :param channels: [Int] Embedding size
    :param scale: [Boolean] If True, the output will be multiplied by sqrt num_units
    :param scope: [String] name of "variable_scope"
    :param reuse: [Boolean] tf parameter reuse
    :return: [Tensor] outputs of embedding of sentence with shape of "batch_size * length * channels"
    """
    with tf.variable_scope(scope, reuse=reuse):
        lookup_table = tf.get_variable('lookup_table',
                                       dtype=tf.float32,
                                       shape=[vocab_size, channels],
                                       initializer=tf.contrib.layers.xavier_initializer())
        lookup_table = tf.concat((tf.zeros(shape=[1, channels], dtype=tf.float32),
                                  lookup_table[1:, :]), 0)

        outputs = tf.nn.embedding_lookup(lookup_table, inputs)

        if scale:
            outputs = outputs * math.sqrt(channels)

    return outputs

1.2.2位置嵌入(Position Embedding)

获得和inputs经过word embedding之后相同shape的位置嵌入,没有使用word embedding之后的作为输入,是考虑这样可以为之后的mask提供便利

def get_positional_encoding(inputs, channels, scale=False, scope="positional_embedding", reuse=None):
    """
    positional encoding
    :param inputs: [Tensor] with dimension of "batch_size * max_length"
    :param channels: [Int] Embedding size
    :param scale: [Boolean] If True, the output will be multiplied by sqrt num_units
    :param scope: [String] name of "variable_scope"
    :param reuse: [Boolean] tf parameter reuse
    :return: [Tensor] outputs after positional encoding
    """
    batch_size = tf.shape(inputs)[0]
    max_length = tf.shape(inputs)[1]
    with tf.variable_scope(scope, reuse=reuse):
        position_ind = tf.tile(tf.expand_dims(tf.range(tf.to_int32(1), tf.add(max_length, 1)), 0), [batch_size, 1])

        # Convert to a tensor
        lookup_table = tf.convert_to_tensor(get_timing_signal_1d(max_length, channels))

        lookup_table = tf.concat((tf.zeros(shape=[1, channels]),
                                  lookup_table[:, :]), 0)
        position_inputs = tf.where(tf.equal(inputs, 0), tf.zeros_like(inputs), position_ind)

        outputs = tf.nn.embedding_lookup(lookup_table, position_inputs)

        if scale:
            outputs = outputs * math.sqrt(channels)

    return tf.cast(outputs, tf.float32)

通过get_timing_signal_1d()方法获得 [ 句子长度 * embedding维度 ]的矩阵

def get_timing_signal_1d(length, channels, min_timescale=1.0, max_timescale=1.0e4, start_index=0):
    """
    positional encoding的方法
    :param length: [Int] max_length size
    :param channels: [Int] Embedding size
    :param min_timescale: [Float]
    :param max_timescale: [Float]
    :param start_index: [Int] index of first position
    :return: [Tensor] positional encoding of shape "length * channels"
    """
    position = tf.to_float(tf.range(start_index, length))
    num_timescales = channels // 2
    log_timescale_increment = (math.log(float(min_timescale) / float(max_timescale)) /
                               (tf.to_float(num_timescales) - 1))
    inv_timescales = min_timescale * tf.exp(tf.to_float(tf.range(num_timescales)) * -log_timescale_increment)

    scaled_time = tf.expand_dims(position, 1) * tf.expand_dims(inv_timescales, 0)
    signal = tf.concat([tf.sin(scaled_time), tf.cos(scaled_time)], axis=1)
    signal = tf.pad(signal, [[0, 0], [0, tf.mod(channels, 2)]])
    return signal

1.2.3Segment Embedding

该嵌入仅仅是为了让模型能够更好的区分输入的两个句子,其实通过[SEP]这个token以及能够区分两个句子了,但是对于模型来说显然还不够,在不加入segment embedding的情况下,模型的表现不太良好。

对于[PAD]这个token,所有的embedding(seg、pos)都设为了全零向量,以便后面attention的时候加入mask

def get_seg_embedding(inputs, channels, order=1, scale=True, scope="seg_embedding", reuse=None):
    """
    segment embedding
    :param inputs: [Tensor] with first dimension of "batch_size"
    :param channels: [Int] Embedding size
    :param order: [Int] The position of the sentence in all sentences
    :param scale: [Boolean] If True, the output will be multiplied by sqrt num_units
    :param scope: [String] name of "variable_scope"
    :param reuse: [Boolean] tf parameter reuse
    :return: [Tensor] outputs of embedding of sentence with shape of "batch_size * length * channels"
    """
    with tf.variable_scope(scope, reuse=reuse):
        lookup_table = tf.get_variable('lookup_table',
                                       dtype=tf.float32,
                                       shape=[3, channels],
                                       initializer=tf.contrib.layers.xavier_initializer())
        lookup_table = tf.concat((tf.zeros(shape=[1, channels], dtype=tf.float32),
                                  lookup_table[1:, :]), 0)
        seg_inputs = tf.where(tf.equal(inputs, 0), tf.zeros_like(inputs), tf.ones_like(inputs)*order)
        outputs = tf.nn.embedding_lookup(lookup_table, seg_inputs)
        if scale:
            outputs = outputs * math.sqrt(channels)

    return outputs

1.3Self-Attention和Encoder-Decoder Attention

到这里,输入的处理就算完成了,到了重头戏Attention机制

两个输入的tensor总觉的一行用英语讲不清楚,就写在这里吧,from tensor对于两个Attention都是一致的就是输入,to tensor对于self-attention来说也是一致的,但对于encoder-decoder attention来说是最后一层encoder的输出,用来捕捉decoder和encoder之间的attention关系。

因为前面做了处理,所有的[PAD]这个token的embedding都是全零,所以对这个维度求绝对值后reduce sum之后,零就是[PAD]这个token,这样就不用再额外的添加一个mask ids作为输入了。

按照paper中的描述

def multi_head_attention(from_tensor: tf.Tensor,  to_tensor: tf.Tensor, channels=None, num_units=None, num_heads=8,
                         dropout_rate=0, is_training=True, attention_mask_flag=False, scope="multihead_attention",
                         activation=None, reuse=None):
    """
    multihead attention
    :param from_tensor: [Tensor]
    :param to_tensor: [Tensor] 
    :param channels: [Int] channel of last dimension of output
    :param num_units: [Int] channel size of matrix Q, K, V
    :param num_heads: [Int] head number of attention
    :param dropout_rate: [Float] dropout rate when 0 means no dropout
    :param is_training: [Boolean] whether it is training, If true, use dropout
    :param attention_mask_flag: [Boolean] If true, units that reference the future are masked
    :param scope: [String] name of "variable_scope"
    :param activation: [String] name of activate function
    :param reuse: [Boolean] tf parameter reuse
    :return: [Tensor] outputs after multihead self attention with shape of "batch_size * max_length * (channels*num_heads)"
    """
    with tf.variable_scope(scope, reuse=reuse):
        if channels is None:
            channels = from_tensor.get_shape().as_list()[-1]
        if num_units is None:
            num_units = channels//num_heads
        activation_fn = get_activation(activation)
        # shape [batch_size, max_length, channels*num_heads]
        query_layer = tf.layers.dense(from_tensor, num_units * num_heads, activation=activation_fn)
        key_layer = tf.layers.dense(to_tensor, num_units * num_heads, activation=activation_fn)
        value_layer = tf.layers.dense(to_tensor, num_units * num_heads, activation=activation_fn)

        # shape [batch_size*num_heads, max_length, channels]
        query_layer_ = tf.concat(tf.split(query_layer, num_heads, axis=2), axis=0)
        key_layer_ = tf.concat(tf.split(key_layer, num_heads, axis=2), axis=0)
        value_layer_ = tf.concat(tf.split(value_layer, num_heads, axis=2), axis=0)

        # shape = [batch_size*num_heads, max_length, max_length]
        attention_scores = tf.matmul(query_layer_, tf.transpose(key_layer_, [0, 2, 1]))
        # Scale
        attention_scores = tf.multiply(attention_scores, 1.0 / tf.sqrt(float(channels)))
        # attention masks
        attention_masks = tf.sign(tf.abs(tf.reduce_sum(to_tensor, axis=-1)))
        attention_masks = tf.tile(attention_masks, [num_heads, 1])
        attention_masks = tf.tile(tf.expand_dims(attention_masks, axis=1), [1, tf.shape(from_tensor)[1], 1])
        neg_inf_matrix = tf.multiply(tf.ones_like(attention_scores), (-math.pow(2, 32) + 1))
        attention_scores = tf.where(tf.equal(attention_masks, 0), neg_inf_matrix, attention_scores)

        if attention_mask_flag:
            diag_vals = tf.ones_like(attention_scores[0, :, :])
            tril = tf.linalg.LinearOperatorLowerTriangular(diag_vals).to_dense()

            masks = tf.tile(tf.expand_dims(tril, 0), [tf.shape(attention_scores)[0], 1, 1])
            neg_inf_matrix = tf.multiply(tf.ones_like(masks), (-math.pow(2, 32) + 1))
            attention_scores = tf.where(tf.equal(masks, 0), neg_inf_matrix, attention_scores)

        # attention probability
        attention_probs = tf.nn.softmax(attention_scores)

        # query mask
        query_masks = tf.sign(tf.abs(tf.reduce_sum(from_tensor, axis=-1)))
        query_masks = tf.tile(query_masks, [num_heads, 1])
        query_masks = tf.tile(tf.expand_dims(query_masks, -1), [1, 1, tf.shape(to_tensor)[1]])

        attention_probs *= query_masks

        # dropout
        attention_probs = tf.layers.dropout(attention_probs, rate=dropout_rate,
                                            training=tf.convert_to_tensor(is_training))
        outputs = tf.matmul(attention_probs, value_layer_)
        # shape [batch_size, max_length, channels*num_heads]
        outputs = tf.concat(tf.split(outputs, num_heads, axis=0), axis=2)

        # reshape to from tensor
        outputs = tf.layers.dense(outputs, channels, activation=activation_fn)
        # Residual connection
        outputs += from_tensor
        # group normalization
        outputs = group_norm(outputs)
    return outputs

1.4Feed Ward

论文中的Position-wise Feed-Forward Networks,论文中第二层的激活函数为线性激活函数,将第二层的activation function参数改为None才是原论文的做法,这里出于一些实验的原因没有照做

def feed_forward(inputs, channels, hidden_dims=None, scope="multihead_attention", activation=None, reuse=None):
    """
    :param inputs: [Tensor] with first dimension of "batch_size"
    :param channels: [Int] Embedding size
    :param hidden_dims: [List] hidden dimensions
    :param scope: [String] name of "variable_scope"
    :param activation: [String] name of activate function
    :param reuse: [Boolean] tf parameter reuse
    :return: [Tensor] outputs after feed forward with shape of "batch_size * max_length * channels"
    """
    if hidden_dims is None:
        hidden_dims = 2*channels
    with tf.variable_scope(scope, reuse=reuse):
        activation_fn = get_activation(activation)

        params = {"inputs": inputs, "num_outputs": hidden_dims, "activation_fn": activation_fn}
        outputs = tf.contrib.layers.fully_connected(**params)

        params = {"inputs": outputs, "num_outputs": channels, "activation_fn": activation_fn}  # activation_fn可以改为None
        outputs = tf.contrib.layers.fully_connected(**params)
        outputs += inputs
        outputs = group_norm(outputs)
    return outputs

1.5Layer Normalization

对了,还有layer normalization。

def group_norm(inputs: tf.Tensor, epsilon=1e-8, scope="layer_normalization", reuse=None):
    """
    layer normalization
    :param inputs: [Tensor] with first dimension of "batch_size"
    :param epsilon: [Float] a number for preventing ZeroDivision
    :param scope: [String] name of "variable_scope"
    :param reuse: [Boolean] tf parameter reuse
    :return: [Tensor] outputs after normalized
    """
    with tf.variable_scope(scope, reuse=reuse):
        inputs_shape = inputs.get_shape()
        params_shape = inputs_shape[-1:]
        mean, variance = tf.nn.moments(inputs, [-1], keep_dims=True)
        beta = tf.Variable(tf.zeros(params_shape))
        gamma = tf.Variable(tf.ones(params_shape))
        normalized = (inputs - mean) * tf.rsqrt(variance + epsilon)
        outputs = gamma * normalized + beta
    return outputs

2未完待续。。。

上一篇:【Azure DevOps系列】使ASP.NET Core应用程序托管到Azure Web App Service


下一篇:javaScript点击更改内容