Pytorch实现Transformer
实现add&Normalize
首先实现这个部分是因为不论解码器还是编码器都需要这一部分。
首先我们假设归一化(LayerNorm)已经做好了。
class SublayerConnection(nn.Module):def __init__(self,size,dropout=0.1):super(SublayerConnection,self).__init__()self.layer_norm = LayerNorm(size)# 假设LayerNorm已经做好,做一个dropout防止过拟合self.dropout = nn.Dropout(p=dropout)def forward(self,x,sublayer):# x+sublayer(x)就是add,sublayer就是上一步的操作return self.dropout(self.layer_norm(x+sublayer(x)))
因为我们不确定是Encoder还是Decoder,所以用sublayer传入来作为上一层。
接下来实现LayerNorm
class LayerNorm(nn.Module):def __init__(self,x_size,eps=1e-6):super(LayerNorm,self).__init__()self.a_2 = nn.Parameter(torch.ones(x_size))self.b_2 = nn.Parameter(torch.zeros(x_size))self.eps = epsdef forward(self,x):mean = x.mean(-1,keepdim = True)std = x.std(-1,keepdim = True)return self.a_2*(x-mean)/(math.sqrt(std)+self.eps)+self.b_2
这一串代码很简单,就是公式的实现而已,注意要把a_2和b_2作为可以学习的变量。
实现多头注意力机制和自注意力机制
自注意力机制
def self_attention(query,key,value,dropout = None,mask = None):d_k = query.size(-1)sores = np.matmul(query,key.transpose(-2,-1))/math.sqrt(d_k)attn_softmax = F.softmax(sores,dim=-1)if dropout is not None:attn_softmax = dropout(attn_softmax)return np.matmul(attn_softmax,value),attn_softmax
实现QKV的相乘,除以 d k \sqrt{d_k} dk为了便于softmax得出的概率差距不会太极端。
多头自注意力机制
class MultiHeadattention(nn.Module):def __init__(self,head,d_model,dropout = 0.1):super(MultiHeadattention,self).__init__()assert(d_model % head == 0)self.d_k = d_model//headself.head = headself.d_model = d_modelself.linear_quary = nn.Linear(d_model,d_model)self.linear_key = nn.Linear(d_model,d_model)self.linear_value = nn.Linear(d_model,d_model)self.linear_out = nn.Linear(d_model,d_model)self.dropout = nn.Dropout(p=dropout)self.attn = Nonedef forward(self,quary,key,value,mask = None):n_batch = quary.size(0)quary = self.linear_quary(quary).view(n_batch,-1,self.head,self.d_k).transpose(1,2)key = self.linear_key(key).view(n_batch,-1,self.head,self.d_k).transpose(1,2)value = self.linear_value(value).view(n_batch,-1,self.head,self.d_k).transpose(1,2)x,self.attn = self_attention(quary,key,value,dropout=self.dropout,mask=mask)x = x.transpose(1,2).contiguous().view(n_batch,-1,self.head*self.d_k)return self.linear_out(x)
还有掩码自注意力需要添加,此时没有添加,等讲解到掩码的时候添加。
QKV实现分割并行计算之后,对x进行拼接重塑维度,做一个线性变换输出x。
位置编码
class PositionalEncoding(nn.Module):def __init__(self,dim,dropout,max_len=5000):super(PositionalEncoding,self).__init__()if dim%2 != 0:raise ValueError("Can't use sin/cos positional encoding with""odd dim (got dim={:d})".format(dim))'''维度必须是偶数,这个if就是如果维度是奇数的情况下跳出异常,只有维度是偶数的情况下才能都有对称性,确保每一个位置都有一个sin一个cos能够提供更完整的位置属性 '''pe = torch.zeros(max_len,dim)position = torch.arange(0,max_len).unsqueeze(1)div_term = torch.exp((torch.arange(0,dim,2,dtype=torch.float)*-(math.log(10000.0)/dim)))pe[:,1::2] = torch.sin(position.float() * div_term)pe[:,0::2] = torch.cos(position.float() * div_term)pe = pe.unsqueeze(1)self.register_buffer('pe',pe)# 创建pe的时候用的zeros使得pe是一个可学习的参数,但是位置编码是一直不变的,因此用这个函数使得pe不会改变,但会存储self.drop_out = nn.Dropout(p = dropout)self.dim = dimdef forward(self,emb,step=None):if step == None:emb = emb + self.pe[:emb.size(0)]else:emb = emb + self.pe[:step]emb = self.drop_out(emb)return emb
FeedForward实现
class PositionWiseFeedForward(nn.Module):def __init__(self,d_model,d_ff,dropout=0.1) -> None:super(PositionWiseFeedForward,self).__init__()self.w_1 = nn.Linear(d_model,d_ff)self.w_2 = nn.Linear(d_ff,d_model)self.relu = nn.ReLU()self.layer_norm = nn.LayerNorm(d_model,eps=1e-6)self.dropout1 = nn.Dropout(p=dropout)self.dropout2 = nn.Dropout(p=dropout)def forward(self,x):inter = self.dropout1(self.relu(self.w_1(self.layer_norm(x))))output = self.dropout2(self.relu(self.w_2(inter)))return output
FFN非常简单,就是两次wx+b,加一个激活函数即可,代码里面初始 w 1 , w 2 , r e l u w_1,w_2,relu w1,w2,relu等,为什么没有b呢,pytorch linear在初始化的时候已经帮助我们初始化好了不需要我们自己初始化。
EncoderDecoder层实现
def clones(module,n):return nn.ModuleList([deepcopy(module) for _ in range(n)])class EncoderLayer(nn.Module):def __init__(self,size,attn,feed_forward,dropout=0.1):super(EncoderLayer,self).__init__()self.attn = attnself.feed_forward = feed_forwardself.sublayer_connection_list = clones(SublayerConnection(size,dropout),2)def forward(self,x,mask):first_x = self.sublayer_connection_list[0](x,lambda x_attn:self.attn(x,x,x,mask))return self.sublayer_connection_list[1](x,self.feed_forward)class Encoder(nn.Module):def __init__(self,n,encoder_layer):super(Encoder,self).__init__()self.encoder_layer_list = clones(encoder_layer,n)def forward(self,x,src_mask):for encoder_layer in self.encoder_layer_list:x = encoder_layer(x,src_mask)return xclass DecoderLayer(nn.model):def __init__(self,d_model,attn,feed_forward,sublayer_num,dropout=0.1):super(DecoderLayer,self).__init__()self.attn = attnself.feed_forward = feed_forwardself.sublayer_connection_list = clones(SublayerConnection(d_model,dropout),sublayer_num)def forward(self,x,l2r_memory,src_mask,trg_mask,r2l_memory=None,r2l_trg_mask = None):first_x = self.sublayer_connection_list[0](x,lambda x_attn: self.attn(x,l2r_memory,l2r_memory,trg_mask))second_x = self.sublayer_connection_list[1](first_x,lambda sencond_x_attn:self.attn(first_x,l2r_memory,l2r_memory,src_mask))if r2l_memory is not None:third_x = self.sublayer_connection_list[-2](second_x,lambda third_x_attn:self.attn(second_x,r2l_memory,r2l_memory,r2l_trg_mask))return self.sublayer_connection_list[-1](third_x,self.feed_forward)else:return self.sublayer_connection_list[-1](second_x,self.feed_forward)class Decoder(nn.Module):def __init__(self,n_layers,decoder_layer):super(Decoder,self).__init__()self.decoder_layer_list = clones(decoder_layer,n_layers)def forward(self,x,memory,src_mask,trg_mask):for decoder_layer in self.decoder_layer_list:x = decoder_layer(x,memory,src_mask,trg_mask)return x
首先实现一个用来复制的函数,因为不管是decoder还是encoder都有两个add和Normalize。
encoder和decoder都先实现一层,再用一个类来多次创建实现多层叠加。
encoderlayer就是一层encoder,就是实现多头+add和Normalize+ffn+add和Normalize,其中传入的参数attn是创建好的多头自注意力模型,ffn同理
decoderlayer就是一层decoder,同理是实现多头+add和Normalize+ffn+add和Normalize,只不过接受的参数不同,x是输入,memory是encoder层的输出,后面两个是掩码,最后两个参数不用管,是反向解码。
拼凑Transformer
class WordProbGenerator(nn.Module):def __init__(self,d_model,vocab_size):super(WordProbGenerator,self).__init__()self.linear = nn.Linear(d_model,vocab_size)def forward(self,x):return F.log_softmax(self.linear(x),dim=-1)class WordEmbedding(nn.Module):def __init__(self,vocab_size,d_model) -> None:super(WordEmbedding,self).__init__()self.embedding = nn.Embedding(vocab_size,d_model)self.embed = self.embeddingself.d_model = d_modeldef forward(self,x):return self.embed(x)*math.sqrt(self.d_model)class ABDTransformer(nn.Module):def __init__(self,vocab,d_model,d_ff,n_head,n_layer,dropout,device='cuda'):super(ABDTransformer,self).__init__()self.vocab = vocabself.device = deviceattn = MultiHeadattention(n_head,d_model,dropout)feed_forward = PositionWiseFeedForward(d_model,d_ff,dropout)self.src_embed = WordEmbedding(vocab,d_model)self.pos_embed = PositionalEncoding(d_model,dropout)self.encoder = Encoder(n_layer,EncoderLayer(d_model,deepcopy(attn),deepcopy(feed_forward),dropout=dropout))self.decoder = Decoder(n_layer,DecoderLayer(d_model,deepcopy(attn),deepcopy(feed_forward),sublayer_num=3,dropout=dropout))self.word_prob_generator = WordProbGenerator(d_model,vocab)def encode(self,src,src_mask):x = self.src_embed(src[0])x = self.pos_embed(x)x = self.encoder(x,src_mask)return xdef decode(self,trg,memory,src_mask,trg_mask):x = self.src_embed(trg)x = self.pos_embed(x)return self.decoder(x,memory,src_mask,trg_mask)def forward(self,src,trg,mask):src_mask,trg_mask = maskencoder_output = self.encode(src,src_mask)decoder_output = self.decode(trg,encoder_output,src_mask,trg_mask)pred = self.word_prob_generator(decoder_output)return pred
第一个函数是得到各个词的概率
第二个函数是将输入词向量的维度,就是字典的维度改成d_model的维度。
组建transformer
第一步的init中将模型都初始化,包括encoder,decoder,ffn,多头自注意力机制,位置编码等。
第二步写decode和encode的输入输出,简单来说我们已经实现了内部的操作,现在写两个函数,将他需要的输入进去即可。
第三步调用函数,最后得到概率输出。