架构以及架构中的组件
# huggingface
# transformers
# https://www.bilibili.com/video/BV1At4y1W75x?spm_id_from=333.999.0.0
import copy
import math
from collections import namedtuple
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
Hypothesis = namedtuple('Hypothesis', ['value', 'score'])
def clones(module, n):
return nn.ModuleList([copy.deepcopy(module) for _ in range(n)])
"""
实现x 的标准化处理(标准化的作用:使x符合正太分布)
"""
class LayerNorm(nn.Module):
def __init__(self, feature, eps=1e-6):
"""
:param feature: self-attention 的 x 的大小
:param eps:
"""
super(LayerNorm, self).__init__()
self.a_2 = nn.Parameter(torch.ones(feature))
self.b_2 = nn.Parameter(torch.zeros(feature))
self.eps = eps
def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
"""
残差化的示例
"""
class SublayerConnection(nn.Module):
"""
这不仅仅做了残差,这是把残差和 layernorm 一起给做了
"""
def __init__(self, size, dropout=0.1):
super(SublayerConnection, self).__init__()
# 第一步做 layernorm 这是类的实例化的一种方法
self.layer_norm = LayerNorm(size)
# 第二步做 dropout
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, sublayer):
"""
:param x: 就是self-attention的输入
:param sublayer: self-attention层
:return:
"""
return self.dropout(self.layer_norm(x + sublayer(x)))
class FeatEmbedding(nn.Module):
def __init__(self, d_feat, d_model, dropout):
super(FeatEmbedding, self).__init__()
self.video_embeddings = nn.Sequential(
LayerNorm(d_feat),
nn.Dropout(dropout),
nn.Linear(d_feat, d_model))
def forward(self, x):
return self.video_embeddings(x)
class TextEmbedding(nn.Module):
def __init__(self, vocab_size, d_model):
super(TextEmbedding, self).__init__()
self.d_model = d_model
self.embed = nn.Embedding(vocab_size, d_model)
def forward(self, x):
return self.embed(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
def __init__(self, dim, dropout, max_len=5000):
if dim % 2 != 0:
raise ValueError("Cannot use sin/cos positional encoding with "
"odd dim (got dim={:d})".format(dim))
pe = torch.zeros(max_len, dim)
position = torch.arange(0, max_len).unsqueeze(1)
div_term = torch.exp((torch.arange(0, dim, 2, dtype=torch.float) *
-(math.log(10000.0) / dim)))
pe[:, 0::2] = torch.sin(position.float() * div_term)
pe[:, 1::2] = torch.cos(position.float() * div_term)
pe = pe.unsqueeze(1)
super(PositionalEncoding, self).__init__()
self.register_buffer('pe', pe)
self.drop_out = nn.Dropout(p=dropout)
self.dim = dim
def forward(self, emb, step=None):
emb = emb * math.sqrt(self.dim)
if step is None:
emb = emb + self.pe[:emb.size(0)]
else:
emb = emb + self.pe[step]
emb = self.drop_out(emb)
return emb
"""
自注意力机制的实现示例
"""
def self_attention(query, key, value, dropout=None, mask=None):
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
# mask的操作在QK之后,softmax之前
if mask is not None:
mask.cuda()
scores = scores.masked_fill(mask == 0, -1e9)
self_attn = F.softmax(scores, dim=-1)
if dropout is not None:
self_attn = dropout(self_attn)
return torch.matmul(self_attn, value), self_attn
"""
多头--注意力机制的实现示例
"""
class MultiHeadAttention(nn.Module):
def __init__(self, head, d_model, dropout=0.1):
super(MultiHeadAttention, self).__init__()
assert (d_model % head == 0)
self.d_k = d_model // head
self.head = head
self.d_model = d_model
self.linear_query = nn.Linear(d_model, d_model)
self.linear_key = nn.Linear(d_model, d_model)
self.linear_value = nn.Linear(d_model, d_model)
self.linear_out = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(p=dropout)
self.attn = None
def forward(self, query, key, value, mask=None):
if mask is not None:
# 多头注意力机制的线性变换层是4维,是把query[batch, frame_num, d_model]变成[batch, -1, head, d_k]
# 再1,2维交换变成[batch, head, -1, d_k], 所以mask要在第一维添加一维,与后面的self attention计算维度一样
mask = mask.unsqueeze(1)
n_batch = query.size(0)
# if self.head == 1:
# x, self.attn = self_attention(query, key, value, dropout=self.dropout, mask=mask)
# else:
# query = self.linear_query(query).view(n_batch, -1, self.head, self.d_k).transpose(1, 2) # [b, 8, 32, 64]
# key = self.linear_key(key).view(n_batch, -1, self.head, self.d_k).transpose(1, 2) # [b, 8, 28, 64]
# value = self.linear_value(value).view(n_batch, -1, self.head, self.d_k).transpose(1, 2) # [b, 8, 28, 64]
#
# x, self.attn = self_attention(query, key, value, dropout=self.dropout, mask=mask)
# # 变为三维, 或者说是concat head
# x = x.transpose(1, 2).contiguous().view(n_batch, -1, self.head * self.d_k)
query = self.linear_query(query).view(n_batch, -1, self.head, self.d_k).transpose(1, 2) # [b, 8, 32, 64]
key = self.linear_key(key).view(n_batch, -1, self.head, self.d_k).transpose(1, 2) # [b, 8, 28, 64]
value = self.linear_value(value).view(n_batch, -1, self.head, self.d_k).transpose(1, 2) # [b, 8, 28, 64]
x, self.attn = self_attention(query, key, value, dropout=self.dropout, mask=mask)
# 变为三维, 或者说是concat head
x = x.transpose(1, 2).contiguous().view(n_batch, -1, self.head * self.d_k)
return self.linear_out(x)
class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionWiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.layer_norm = nn.LayerNorm(d_model, eps=1e-6)
self.dropout_1 = nn.Dropout(dropout)
self.relu = nn.ReLU()
self.dropout_2 = nn.Dropout(dropout)
def forward(self, x):
inter = self.dropout_1(self.relu(self.w_1(self.layer_norm(x))))
output = self.dropout_2(self.w_2(inter))
return output
class EncoderLayer(nn.Module):
def __init__(self, size, attn, feed_forward, dropout=0.1):
super(EncoderLayer, self).__init__()
self.attn = attn
self.feed_forward = feed_forward
self.sublayer_connection = clones(SublayerConnection(size, dropout), 2)
def forward(self, x, mask):
x = self.sublayer_connection[0](x, lambda x: self.attn(x, x, x, mask))
return self.sublayer_connection[1](x, self.feed_forward)
class EncoderLayerNoAttention(nn.Module):
def __init__(self, size, attn, feed_forward, dropout=0.1):
super(EncoderLayerNoAttention, self).__init__()
self.attn = attn
self.feed_forward = feed_forward
self.sublayer_connection = clones(SublayerConnection(size, dropout), 2)
def forward(self, x, mask):
return self.sublayer_connection[1](x, self.feed_forward)
class DecoderLayer(nn.Module):
def __init__(self, size, attn, feed_forward, sublayer_num, dropout=0.1):
super(DecoderLayer, self).__init__()
self.attn = attn
self.feed_forward = feed_forward
self.sublayer_connection = clones(SublayerConnection(size, dropout), sublayer_num)
def forward(self, x, memory, src_mask, trg_mask, r2l_memory=None, r2l_trg_mask=None):
x = self.sublayer_connection[0](x, lambda x: self.attn(x, x, x, trg_mask))
x = self.sublayer_connection[1](x, lambda x: self.attn(x, memory, memory, src_mask))
if r2l_memory is not None:
x = self.sublayer_connection[-2](x, lambda x: self.attn(x, r2l_memory, r2l_memory, r2l_trg_mask))
return self.sublayer_connection[-1](x, self.feed_forward)
class Encoder(nn.Module):
def __init__(self, n, encoder_layer):
super(Encoder, self).__init__()
self.encoder_layer = clones(encoder_layer, n)
def forward(self, x, src_mask):
for layer in self.encoder_layer:
x = layer(x, src_mask)
return x
class R2L_Decoder(nn.Module):
def __init__(self, n, decoder_layer):
super(R2L_Decoder, self).__init__()
self.decoder_layer = clones(decoder_layer, n)
def forward(self, x, memory, src_mask, r2l_trg_mask):
for layer in self.decoder_layer:
x = layer(x, memory, src_mask, r2l_trg_mask)
return x
class L2R_Decoder(nn.Module):
def __init__(self, n, decoder_layer):
super(L2R_Decoder, self).__init__()
self.decoder_layer = clones(decoder_layer, n)
def forward(self, x, memory, src_mask, trg_mask, r2l_memory, r2l_trg_mask):
for layer in self.decoder_layer:
x = layer(x, memory, src_mask, trg_mask, r2l_memory, r2l_trg_mask)
return x
def pad_mask(src, r2l_trg, trg, pad_idx):
if isinstance(src, tuple):
if len(src) == 4:
src_image_mask = (src[0][:, :, 0] != pad_idx).unsqueeze(1)
src_motion_mask = (src[1][:, :, 0] != pad_idx).unsqueeze(1)
src_object_mask = (src[2][:, :, 0] != pad_idx).unsqueeze(1)
src_rel_mask = (src[3][:, :, 0] != pad_idx).unsqueeze(1)
enc_src_mask = (src_image_mask, src_motion_mask, src_object_mask, src_rel_mask)
dec_src_mask_1 = src_image_mask & src_motion_mask
dec_src_mask_2 = src_image_mask & src_motion_mask & src_object_mask & src_rel_mask
dec_src_mask = (dec_src_mask_1, dec_src_mask_2)
src_mask = (enc_src_mask, dec_src_mask)
if len(src) == 3:
src_image_mask = (src[0][:, :, 0] != pad_idx).unsqueeze(1)
src_motion_mask = (src[1][:, :, 0] != pad_idx).unsqueeze(1)
src_object_mask = (src[2][:, :, 0] != pad_idx).unsqueeze(1)
enc_src_mask = (src_image_mask, src_motion_mask, src_object_mask)
dec_src_mask = src_image_mask & src_motion_mask
src_mask = (enc_src_mask, dec_src_mask)
if len(src) == 2:
src_image_mask = (src[0][:, :, 0] != pad_idx).unsqueeze(1)
src_motion_mask = (src[1][:, :, 0] != pad_idx).unsqueeze(1)
enc_src_mask = (src_image_mask, src_motion_mask)
dec_src_mask = src_image_mask & src_motion_mask
src_mask = (enc_src_mask, dec_src_mask)
else:
src_mask = (src[:, :