前方干貨預(yù)警:這可能是你能夠找到的最容易懂的最具實(shí)操性的最系統(tǒng)的學(xué)習(xí)transformer模型的入門教程。我們從零開始用pytorch搭建Transformer模型(中文可以翻譯成變形金剛)。
訓(xùn)練它來實(shí)現(xiàn)一個(gè)有趣的實(shí)例:兩數(shù)之和。
輸入輸出類似如下:
輸入:'12345+54321' 輸出:'66666'
我們把這個(gè)任務(wù)當(dāng)做一個(gè)機(jī)器翻譯任務(wù)來進(jìn)行。輸入是一個(gè)字符序列,輸出也是一個(gè)字符序列(seq-to-seq).
這和機(jī)器翻譯的輸入輸出結(jié)構(gòu)是類似的,所以可以用Transformer來做。
參考資料:
論文《Attention is All you needed》: https:///pdf/1706.03762.pdf
哈佛博客:https://github.com/harvardnlp/annotated-transformer/
一,準(zhǔn)備數(shù)據(jù)import random import numpy as np import torch from torch.utils.data import Dataset,DataLoader # 定義字典 words_x = '<PAD>,1,2,3,4,5,6,7,8,9,0,<SOS>,<EOS>,+' vocab_x = {word: i for i, word in enumerate(words_x.split(',' ))} vocab_xr = [k for k, v in vocab_x.items()] #反查詞典 words_y = '<PAD>,1,2,3,4,5,6,7,8,9,0,<SOS>,<EOS>' vocab_y = {word: i for i, word in enumerate(words_y.split(',' ))} vocab_yr = [k for k, v in vocab_y.items()] #反查詞典
#兩數(shù)相加數(shù)據(jù)集 def get_data(): # 定義詞集合 words = ['0' , '1' , '2' , '3' , '4' , '5' , '6' , '7' , '8' , '9' ] # 每個(gè)詞被選中的概率 p = np.array([7, 5, 5, 7, 6, 5, 7, 6, 5, 7]) p = p / p.sum() # 隨機(jī)采樣n1個(gè)詞作為s1 n1 = random.randint(10, 20) s1 = np.random.choice(words, size=n1, replace=True, p=p) s1 = s1.tolist() # 隨機(jī)采樣n2個(gè)詞作為s2 n2 = random.randint(10, 20) s2 = np.random.choice(words, size=n2, replace=True, p=p) s2 = s2.tolist() # x等于s1和s2字符上的相加 x = s1 + ['+' ] + s2 # y等于s1和s2數(shù)值上的相加 y = int('' .join(s1)) + int('' .join(s2)) y = list(str(y)) # 加上首尾符號(hào) x = ['<SOS>' ] + x + ['<EOS>' ] y = ['<SOS>' ] + y + ['<EOS>' ] # 補(bǔ)pad到固定長(zhǎng)度 x = x + ['<PAD>' ] * 50 y = y + ['<PAD>' ] * 51 x = x[:50] y = y[:51] # 編碼成token token_x = [vocab_x[i] for i in x] token_y = [vocab_y[i] for i in y] # 轉(zhuǎn)tensor tensor_x = torch.LongTensor(token_x) tensor_y = torch.LongTensor(token_y) return tensor_x, tensor_y def show_data(tensor_x,tensor_y) ->'str' : words_x = '' .join([vocab_xr[i] for i in tensor_x.tolist()]) words_y = '' .join([vocab_yr[i] for i in tensor_y.tolist()]) return words_x,words_y x,y = get_data() print (x,y,'\n' ) print (show_data(x,y))
# 定義數(shù)據(jù)集 class TwoSumDataset(torch.utils.data.Dataset): def __init__(self,size = 100000): super(Dataset, self).__init__() self.size = size def __len__(self): return self.size def __getitem__(self, i): return get_data() ds_train = TwoSumDataset(size = 100000) ds_val = TwoSumDataset(size = 10000) # 數(shù)據(jù)加載器 dl_train = DataLoader(dataset=ds_train, batch_size=200, drop_last=True, shuffle=True) dl_val = DataLoader(dataset=ds_val, batch_size=200, drop_last=True, shuffle=False)
for src,tgt in dl_train: print (src.shape) print (tgt.shape) break
torch.Size([200, 50]) torch.Size([200, 51])
二,定義模型下面,我們會(huì)像搭積木建城堡那樣從低往高地構(gòu)建Transformer模型。
先構(gòu)建6個(gè)基礎(chǔ)組件:多頭注意力、前饋網(wǎng)絡(luò)、層歸一化、殘差連接、單詞嵌入、位置編碼。類似用最基礎(chǔ)的積木塊搭建了 墻壁,屋頂,籬笆,廳柱,大門,窗戶 這樣的模塊。
然后用這6個(gè)基礎(chǔ)組件構(gòu)建了3個(gè)中間成品: 編碼器,解碼器,產(chǎn)生器。類似用基礎(chǔ)組件構(gòu)建了城堡的主樓,塔樓,花園。
最后用這3個(gè)中間成品組裝成Tranformer完整模型。類似用主樓,塔樓,花園這樣的中間成品拼湊出一座完整美麗的城堡。
1, 多頭注意力: MultiHeadAttention (用于融合不同單詞之間的信息, 三處使用場(chǎng)景,①Encoder self-attention, ② Decoder masked-self-attention, ③ Encoder-Decoder cross-attention)
2, 前饋網(wǎng)絡(luò): PositionwiseFeedForward (用于逐位置將多頭注意力融合后的信息進(jìn)行高維映射變換,簡(jiǎn)稱FFN)
3, 層歸一化: LayerNorm (用于穩(wěn)定輸入,每個(gè)樣本在Sequece和Feature維度歸一化,相比BatchNorm更能適應(yīng)NLP領(lǐng)域變長(zhǎng)序列)
4, 殘差連接: ResConnection (用于增強(qiáng)梯度流動(dòng)以降低網(wǎng)絡(luò)學(xué)習(xí)難度, 可以先LayerNorm再Add,LayerNorm也可以放在殘差A(yù)dd之后)
5, 單詞嵌入: WordEmbedding (用于編碼單詞信息,權(quán)重要學(xué)習(xí),輸出乘了sqrt(d_model)來和位置編碼保持相當(dāng)量級(jí))
6, 位置編碼: PositionEncoding (用于編碼位置信息,使用sin和cos函數(shù)直接編碼絕對(duì)位置)
7, 編碼器: TransformerEncoder (用于將輸入Sequence編碼成與Sequence等長(zhǎng)的memory向量序列, 由N個(gè)TransformerEncoderLayer堆疊而成)
8, 解碼器: TransformerDecoder (用于將編碼器編碼的memory向量解碼成另一個(gè)不定長(zhǎng)的向量序列, 由N個(gè)TransformerDecoderLayer堆疊而成)
9, 生成器: Generator (用于將解碼器解碼的向量序列中的每個(gè)向量映射成為輸出詞典中的詞,一般由一個(gè)Linear層構(gòu)成)
10, 變形金剛: Transformer (用于Seq2Seq轉(zhuǎn)碼,例如用于機(jī)器翻譯,采用EncoderDecoder架構(gòu),由Encoder, Decoder 和 Generator組成)
import torch from torch import nn import torch.nn.functional as F import copy import math import numpy as np import pandas as pd def clones(module, N): 'Produce N identical layers.' return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
1,多頭注意力 MultiHeadAttention需要逐步理解 ScaledDotProductAttention->MultiHeadAttention->MaskedMultiHeadAttention
先理解什么是 ScaledDotProductAttention,再理解MultiHeadAttention, 然后理解MaskedMultiHeadAttention
class ScaledDotProductAttention(nn.Module): 'Compute 'Scaled Dot Product Attention'' def __init__(self): super(ScaledDotProductAttention, self).__init__() def forward(self,query, key, value, mask=None, dropout=None): d_k = query.size(-1) scores = query@key.transpose(-2,-1) / math.sqrt(d_k) if mask is not None: scores = scores.masked_fill(mask == 0, -1e20) p_attn = F.softmax(scores, dim = -1) if dropout is not None: p_attn = dropout(p_attn) return p_attn@value, p_attn class MultiHeadAttention(nn.Module): def __init__(self, h, d_model, dropout=0.1): 'Take in model size and number of heads.' super(MultiHeadAttention, self).__init__() assert d_model % h == 0 # We assume d_v always equals d_k self.d_k = d_model // h self.h = h self.linears = clones(nn.Linear(d_model, d_model), 4) self.attn = None #記錄 attention矩陣結(jié)果 self.dropout = nn.Dropout(p=dropout) self.attention = ScaledDotProductAttention() def forward(self, query, key, value, mask=None): if mask is not None: # Same mask applied to all h heads. mask = mask.unsqueeze(1) nbatches = query.size(0) # 1) Do all the linear projections in batch from d_model => h x d_k query, key, value = [ l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2) for l, x in zip(self.linears, (query, key, value)) ] # 2) Apply attention on all the projected vectors in batch. x, self.attn = self.attention(query, key, value, mask=mask, dropout=self.dropout) # 3) 'Concat' using a view and apply a final linear. x = x.transpose(1, 2).contiguous() \ .view(nbatches, -1, self.h * self.d_k) return self.linears[-1](x) #為了讓訓(xùn)練過程與解碼過程信息流一致,遮擋tgt序列后面元素,設(shè)置其注意力為0 def tril_mask(data): 'Mask out future positions.' size = data.size(-1) #size為序列長(zhǎng)度 full = torch.full((1,size,size),1,dtype=torch.int,device=data.device) mask = torch.tril(full).bool() return mask #設(shè)置對(duì)<PAD>的注意力為0 def pad_mask(data, pad=0): 'Mask out pad positions.' mask = (data!=pad).unsqueeze(-2) return mask #計(jì)算一個(gè)batch數(shù)據(jù)的src_mask和tgt_mask class MaskedBatch: 'Object for holding a batch of data with mask during training.' def __init__(self, src, tgt=None, pad=0): self.src = src self.src_mask = pad_mask(src,pad) if tgt is not None: self.tgt = tgt[:,:-1] #訓(xùn)練時(shí),拿tgt的每一個(gè)詞輸入,去預(yù)測(cè)下一個(gè)詞,所以最后一個(gè)詞無需輸入 self.tgt_y = tgt[:, 1:] #第一個(gè)總是<SOS>無需預(yù)測(cè),預(yù)測(cè)從第二個(gè)詞開始 self.tgt_mask = \ self.make_tgt_mask(self.tgt, pad) self.ntokens = (self.tgt_y!= pad).sum() @staticmethod def make_tgt_mask(tgt, pad): 'Create a mask to hide padding and future words.' tgt_pad_mask = pad_mask(tgt,pad) tgt_tril_mask = tril_mask(tgt) tgt_mask = tgt_pad_mask & (tgt_tril_mask) return tgt_mask
import plotly.express as px # 測(cè)試tril_mask mask = tril_mask(torch.zeros(1,10)) #序列長(zhǎng)度為10 #sns.heatmap(mask[0],cmap=sns.cm.rocket); px.imshow(mask[0],color_continuous_scale='blues' ,height=600,width=600)
#測(cè)試 ScaledDotProductAttention query = torch.tensor([[[0.0,1.414],[1.414,0.0],[1.0,1.0],[-1.0,1.0],[1.0,-1.0]]]) key = query.clone() value = query.clone() attention = ScaledDotProductAttention() #沒有mask out,p_att = attention(query, key, value) fig = px.imshow(p_att[0],color_continuous_scale='blues' , title='without mask' ,height=600,width=600) fig.show()
#考慮mask out,p_att = attention(query, key, value, mask = tril_mask(torch.zeros(3,5))) fig = px.imshow(p_att[0],color_continuous_scale='blues' , height=600,width=600, title='with mask' ) fig.show()
# 測(cè)試MultiHeadAttention cross_attn = MultiHeadAttention(h=2, d_model=4) cross_attn.eval() q1 = torch.tensor([[[0.1,0.1,0.1,0.1],[0.1,0.3,0.1,0.3]]]) k1 = q1.clone() v1 = q1.clone() tgt_mask = tril_mask(torch.zeros(2,2)) out1 = cross_attn.forward(q1,k1,v1,mask = tgt_mask) print ('out1:\n' ,out1) #改變序列的第2個(gè)元素取值,由于有mask的遮擋,不會(huì)影響第1個(gè)輸出 q2 = torch.tensor([[[0.1,0.1,0.1,0.1],[0.4,0.5,0.5,0.8]]]) k2 = q2.clone() v2 = q2.clone() tgt_mask = tril_mask(torch.zeros(2,2)) out2 = cross_attn.forward(q2,k2,v2,mask = tgt_mask) print ('out2:\n' ,out2)
# 測(cè)試MaskedBatch mbatch = MaskedBatch(src = src,tgt = tgt, pad = 0) print (mbatch.src.shape) print (mbatch.tgt.shape) print (mbatch.tgt_y.shape) print (mbatch.src_mask.shape) print (mbatch.tgt_mask.shape) px.imshow(mbatch.tgt_mask[0],color_continuous_scale='blues' ,width=600,height=600)
關(guān)于Transformer的多頭注意力機(jī)制,有幾個(gè)要點(diǎn)問題,此處做一些梳理:
(1),Transformer是如何解決長(zhǎng)距離依賴的問題的?
Transformer是通過引入Scale-Dot-Product注意力機(jī)制來融合序列上不同位置的信息,從而解決長(zhǎng)距離依賴問題。以文本數(shù)據(jù)為例,在循環(huán)神經(jīng)網(wǎng)絡(luò)LSTM結(jié)構(gòu)中,輸入序列上相距很遠(yuǎn)的兩個(gè)單詞無法直接發(fā)生交互,只能通過隱藏層輸出或者細(xì)胞狀態(tài)按照時(shí)間步驟一個(gè)一個(gè)向后進(jìn)行傳遞。對(duì)于兩個(gè)在序列上相距非常遠(yuǎn)的單詞,中間經(jīng)過的其它單詞讓隱藏層輸出和細(xì)胞狀態(tài)混入了太多的信息,很難有效地捕捉這種長(zhǎng)距離依賴特征。但是在Scale-Dot-Product注意力機(jī)制中,序列上的每個(gè)單詞都會(huì)和其它所有單詞做一次點(diǎn)積計(jì)算注意力得分,這種注意力機(jī)制中單詞之間的交互是強(qiáng)制的不受距離影響的,所以可以解決長(zhǎng)距離依賴問題。
(2),Transformer在訓(xùn)練和測(cè)試階段可以在時(shí)間(序列)維度上進(jìn)行并行嗎?
在訓(xùn)練階段,Encoder和Decoder在時(shí)間(序列)維度都是并行的,在測(cè)試階段,Encoder在序列維度是并行的,Decoder是串行的。
首先,Encoder部分在訓(xùn)練階段和預(yù)測(cè)階段都可以并行比較好理解,無論在訓(xùn)練還是預(yù)測(cè)階段,它干的事情都是把已知的完整輸入編碼成memory,在序列維度可以并行。
對(duì)于Decoder部分有些微妙。在預(yù)測(cè)階段Decoder肯定是不能并行的,因?yàn)镈ecoder實(shí)際上是一個(gè)自回歸,它前面k-1位置的輸出會(huì)變成第k位的輸入的。前面沒有計(jì)算完,后面是拿不到輸入的,肯定不可以并行。那么訓(xùn)練階段能否并行呢?雖然訓(xùn)練階段知道了全部的解碼結(jié)果,但是訓(xùn)練階段要和預(yù)測(cè)階段一致啊,前面的解碼輸出不能受到后面解碼結(jié)果的影響啊。但Transformer通過在Decoder中巧妙地引入Mask技巧,使得在用Attention機(jī)制做序列特征融合的時(shí)候,每個(gè)單詞對(duì)位于它之后的單詞的注意力得分都為0,這樣就保證了前面的解碼輸出不會(huì)受到后面解碼結(jié)果的影響,因此Decoder在訓(xùn)練階段可以在序列維度做并行。
(3), Scaled-Dot Product Attention為什么要除以 ?
為了避免 變得很大時(shí) softmax函數(shù)的梯度趨于0。假設(shè) 和 中的取出的兩個(gè)向量 和 的每個(gè)元素值都是正態(tài)隨機(jī)分布, 數(shù)學(xué)上可以證明兩個(gè)獨(dú)立的正態(tài)隨機(jī)變量的積依然是一個(gè)正態(tài)隨機(jī)變量, 那么兩個(gè)向量做點(diǎn)積, 會(huì)得到 個(gè)正態(tài)隨機(jī)變量的和, 數(shù)學(xué)上 個(gè)正態(tài)隨機(jī)變量的和依然是一個(gè)正態(tài)隨機(jī)變量, 其方差是原來的 倍, 標(biāo)準(zhǔn)差是原來的 倍。如果不做 scale, 當(dāng) 很大時(shí), 求得的 元素的絕對(duì)值容易很大, 導(dǎo)致落在 softmax的極端區(qū)域(趨于 0 或者 1 ), 極端區(qū)域softmax函數(shù)的梯度值趨于 0 , 不利于模型學(xué)習(xí)。除以 , 恰好做了歸一, 不受 變化影響。
(4),MultiHeadAttention的參數(shù)數(shù)量和head數(shù)量有何關(guān)系?
MultiHeadAttention的參數(shù)數(shù)量和head數(shù)量無關(guān)。多頭注意力的參數(shù)來自對(duì)QKV的三個(gè)變換矩陣以及多頭結(jié)果concat后的輸出變換矩陣。假設(shè)嵌入向量的長(zhǎng)度是d_model, 一共有h個(gè)head. 對(duì)每個(gè)head, 這三個(gè)變換矩陣的尺寸都是 d_model×(d_model/h),所以h個(gè)head總的參數(shù)數(shù)量就是3×d_model×(d_model/h)×h = 3×d_model×d_model。它們的輸出向量長(zhǎng)度都變成 d_model/h,經(jīng)過attention作用后向量長(zhǎng)度保持,h個(gè)head的輸出拼接到一起后向量長(zhǎng)度還是d_model,所以最后輸出變換矩陣的尺寸是d_model×d_model。因此,MultiHeadAttention的參數(shù)數(shù)量為 4×d_model×d_model,和head數(shù)量無關(guān)。
2,前饋網(wǎng)絡(luò): PositionwiseFeedForward用于逐位置將多頭注意力融合后的信息進(jìn)行高維映射變換,簡(jiǎn)稱FFN。
FFN僅有兩個(gè)線性層,第一層將模型向量維度 從 d_model(512) 升到 d_ff(2048), 第二層再降回 d_model(512)
兩個(gè)線性層之間加了一個(gè)0.1的Dropout
class PositionwiseFeedForward(nn.Module): 'Implements FFN equation.' def __init__(self, d_model, d_ff, dropout=0.1): super(PositionwiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, d_ff) #線性層默認(rèn)作用在最后一維度 self.linear2 = nn.Linear(d_ff, d_model) self.dropout = nn.Dropout(dropout) def forward(self, x): return self.linear2(self.dropout(F.relu(self.linear1(x))))
3,層歸一化:LayerNorm在視覺領(lǐng)域,歸一化一般用BatchNorm,但是在NLP領(lǐng)域,歸一化一般用LayerNorm。
這是由于NLP領(lǐng)域的輸入常常是不等長(zhǎng)的Sequence,使用BatchNorm會(huì)讓較長(zhǎng)的Sequence輸入的后面特征能夠使用的參與歸一化的樣本數(shù)太少,讓輸入變得不穩(wěn)定。
同時(shí)同一個(gè)Sequence的被PADDING填充的特征也會(huì)因BatchNorm獲得不同的非零值,這對(duì)模型非常不友好。
相比之下,LayerNorm總是對(duì)一個(gè)樣本自己的特征進(jìn)行歸一化,沒有上述問題。
class LayerNorm(nn.Module): 'Construct a layernorm module (similar to torch.nn.LayerNorm).' def __init__(self, features, eps=1e-6): super(LayerNorm, self).__init__() self.weight = nn.Parameter(torch.ones(features)) self.bias = nn.Parameter(torch.zeros(features)) self.eps = eps def forward(self, x): mean = x.mean(-1, keepdim=True) std = x.std(-1, keepdim=True) return self.weight * (x - mean) / (std + self.eps) + self.bias
4,殘差連接:ResConnection用于增強(qiáng)梯度流動(dòng)以降低網(wǎng)絡(luò)學(xué)習(xí)難度。
ResConnection 包括LayerNorm和Add殘差連接操作, LayerNorm可以放在最開始(norm_first=True),也可以放在最后(norm_first=False)。
《Attention is All you needed》論文原文是殘差連接之后再 LayerNorm,但后面的一些研究發(fā)現(xiàn)最開始的時(shí)候就LayerNorm更好一些。
殘差連接對(duì)于訓(xùn)練深度網(wǎng)絡(luò)至關(guān)重要。有許多研究殘差連接(ResNet)作用機(jī)制,解釋它為什么有效的文章,主要的一些觀點(diǎn)如下。
1,殘差連接增強(qiáng)了梯度流動(dòng)。直觀上看,loss端的梯度能夠通過跳躍連接快速傳遞到不同深度的各個(gè)層,增強(qiáng)了梯度流動(dòng),降低了網(wǎng)絡(luò)的學(xué)習(xí)難度。數(shù)學(xué)上看,殘差塊的導(dǎo)數(shù) f(x)=x+h(x) 為 f'(x)=1+h'(x) 在1.0附近,避免了梯度消失問題。
2,殘差連接減輕了網(wǎng)絡(luò)退化。一個(gè)網(wǎng)絡(luò)層h(x)可以用一個(gè)變換矩陣H來表示,由于許多神經(jīng)元有相同的反應(yīng)模式,h(x)等價(jià)的變換矩陣H可能有許多行是線性相關(guān)的,這使得H的行列式為0,H為非可逆矩陣,h(x)會(huì)導(dǎo)致網(wǎng)絡(luò)的退化和信息丟失。但增加了殘差連接之后,f(x)=x+h(x)對(duì)應(yīng)的變換矩陣F=H+I,單位陣I消除了H中相關(guān)行的線性相關(guān)性,減輕了退化的可能。
3,殘差連接實(shí)現(xiàn)了模型集成。如果將訓(xùn)練好的ResNet的一些block移除,模型的預(yù)測(cè)精度并不會(huì)崩潰式下降,但是如果將訓(xùn)練好的VGG的一些block移除,模型的預(yù)測(cè)精度會(huì)雪崩。這說明ResNet中的各個(gè)Block類似基模型,ResNet通過殘差連接將它們整合成了一個(gè)ensemble集成模型,增強(qiáng)了泛化能力。
4,殘差連接增強(qiáng)了表達(dá)能力。使用殘差塊構(gòu)建的深層網(wǎng)絡(luò)所代表的函數(shù)簇集合是淺層網(wǎng)絡(luò)所代表的的函數(shù)簇集合的超集,表達(dá)能力更強(qiáng),所以可以通過添加殘差塊不斷擴(kuò)充模型表達(dá)能力。如果不使用殘差連接,一個(gè)一層的網(wǎng)絡(luò)f(x) = h1(x) 所能表示的函數(shù)簇 不一定能被一個(gè)二層的網(wǎng)絡(luò) f(x) = h2(h1(x))所覆蓋,但是使用殘差連接后,f(x) = h1(x)+h2(h1(x))一定可以覆蓋一層的網(wǎng)絡(luò)所表示的函數(shù)簇,只要h2的全部權(quán)重取0即可。
參考:https://zhuanlan.zhihu.com/p/165350103
class ResConnection(nn.Module): '' ' A residual connection with a layer norm. Note the norm is at last according to the paper, but it may be better at first. ' '' def __init__(self, size, dropout, norm_first=True): super(ResConnection, self).__init__() self.norm = LayerNorm(size) self.dropout = nn.Dropout(dropout) self.norm_first = norm_first def forward(self, x, sublayer): 'Apply residual connection to any sublayer with the same size.' if self.norm_first: return x + self.dropout(sublayer(self.norm(x))) else : return self.norm(x + self.dropout(sublayer(x)))
5,單詞嵌入: WordEmbedding(權(quán)重要學(xué)習(xí))用于編碼單詞信息,權(quán)重要學(xué)習(xí),輸出乘了sqrt(d_model)來和位置編碼保持相當(dāng)量級(jí)。
當(dāng)d_model越大的時(shí)候,根據(jù) nn.init.xavier_uniform 初始化策略初始化的權(quán)重取值會(huì)越小。
# 單詞嵌入 class WordEmbedding(nn.Module): def __init__(self, d_model, vocab): super(WordEmbedding, self).__init__() self.embedding = nn.Embedding(vocab, d_model) self.d_model = d_model def forward(self, x): return self.embedding(x) * math.sqrt(self.d_model) #note here, multiply sqrt(d_model)
6,位置編碼:PositionEncoding(直接編碼)PositionEncoding用于編碼位置信息,使用sin和cos函數(shù)直接編碼絕對(duì)位置。
單詞和單詞順序?qū)φZ(yǔ)言意義都非常重要。
'你欠我1000塊錢'和'我欠你1000塊錢'是由完全相同的單詞組成,但由于詞的順序不同,含義截然相反。
在Transformer之前,一般用RNN模型來處理句子序列。
RNN模型本身蘊(yùn)含了對(duì)順序的建模,單詞是按照它們?cè)诰渥又械淖匀豁樞蛞粋€(gè)個(gè)地被RNN單元處理,逐個(gè)地被編碼。
但Transformer是并行地處理句子中的單詞的,缺少單詞的位置信息表征。
為了有效地表征單詞的位置信息,Transformer設(shè)計(jì)了位置編碼 PositionalEncoding,并添加到模型的輸入中。
于是,Transformer 用單詞嵌入(權(quán)重要學(xué)習(xí))向量 和位置編碼(直接編碼)向量 之和 來表示輸入。
如何構(gòu)造位置編碼呢?即如何 把 pos = 0,1,2,3,4,5,... 這樣的位置序列映射成為 一個(gè)一個(gè)的向量呢?
Transformer設(shè)計(jì)了基于正弦函數(shù)和余弦函數(shù)的位置編碼方法。
這種編碼方法有以下幾個(gè)優(yōu)點(diǎn):
1,編碼值分布在[-1,1]之間,這樣的分布對(duì)神經(jīng)網(wǎng)絡(luò)是比較友好的。
2,編碼了絕對(duì)位置信息,對(duì)于0<=pos<=2_pi_10000,每個(gè)pos的位置編碼向量都是不一樣的。
更多位置編碼的討論參考如下博客:《
讓研究人員絞盡腦汁的Transformer位置編碼》
https:///archives/8130
# 位置編碼 class PositionEncoding(nn.Module): 'Implement the PE function.' def __init__(self, d_model, dropout, max_len=5000): super(PositionEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) # Compute the positional encodings once in log space. pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer('pe' , pe) def forward(self, x): x = x + self.pe[:, :x.size(1)] return self.dropout(x)
pe = PositionEncoding(120, 0) z = pe.forward(torch.zeros(1, 100, 120)) df = pd.DataFrame(z[0, :, [0,20,60,110]].data.numpy(),columns = ['dim' +c for c in ['0' ,'20' ,'60' ,'110' ]]) df.insert(0,'x' ,np.arange(100)) px.line(df, x = 'x' ,y = ['dim' +c for c in ['0' ,'20' ,'60' ,'110' ]]).show()
px.imshow(np.squeeze(z.data.numpy()) ,color_continuous_scale='blues' ,width=1000,height=800)
7, 編碼器: TransformerEncoder用于將輸入Sequence編碼成與Sequence等長(zhǎng)的memory向量序列, 由N個(gè)TransformerEncoderLayer堆疊而成
class TransformerEncoderLayer(nn.Module): 'TransformerEncoderLayer is made up of self-attn and feed forward (defined below)' def __init__(self, size, self_attn, feed_forward, dropout): super(TransformerEncoderLayer, self).__init__() self.self_attn = self_attn self.feed_forward = feed_forward self.res_layers = clones(ResConnection(size, dropout), 2) self.size = size def forward(self, x, mask): 'Follow Figure 1 (left) for connections.' x = self.res_layers[0](x, lambda x: self.self_attn(x, x, x, mask)) return self.res_layers[1](x, self.feed_forward) class TransformerEncoder(nn.Module): 'TransformerEncoder is a stack of N TransformerEncoderLayer' def __init__(self, layer, N): super(TransformerEncoder, self).__init__() self.layers = clones(layer, N) self.norm = LayerNorm(layer.size) def forward(self, x, mask): 'Pass the input (and mask) through each layer in turn.' for layer in self.layers: x = layer(x, mask) return self.norm(x) @classmethod def from_config(cls,N=6,d_model=512, d_ff=2048, h=8, dropout=0.1): attn = MultiHeadAttention(h, d_model) ff = PositionwiseFeedForward(d_model, d_ff, dropout) layer = TransformerEncoderLayer(d_model, attn, ff, dropout) return cls(layer,N)
from torchkeras import summary src_embed = nn.Sequential(WordEmbedding(d_model=32, vocab = len(vocab_x)), PositionEncoding(d_model=32, dropout=0.1)) encoder = TransformerEncoder.from_config(N=3,d_model=32, d_ff=128, h=8, dropout=0.1) src_mask = pad_mask(src) memory = encoder(*[src_embed(src),src_mask]) summary(encoder,input_data_args = [src_embed(src),src_mask]);
8,解碼器:TransformerDecoder用于將編碼器編碼的memory向量解碼成另一個(gè)不定長(zhǎng)的向量序列, 由N個(gè)TransformerDecoderLayer堆疊而成
class TransformerDecoderLayer(nn.Module): 'TransformerDecoderLayer is made of self-attn, cross-attn, and feed forward (defined below)' def __init__(self, size, self_attn, cross_attn, feed_forward, dropout): super(TransformerDecoderLayer, self).__init__() self.size = size self.self_attn = self_attn self.cross_attn = cross_attn self.feed_forward = feed_forward self.res_layers = clones(ResConnection(size, dropout), 3) def forward(self, x, memory, src_mask, tgt_mask): 'Follow Figure 1 (right) for connections.' m = memory x = self.res_layers[0](x, lambda x: self.self_attn(x, x, x, tgt_mask)) x = self.res_layers[1](x, lambda x: self.cross_attn(x, m, m, src_mask)) return self.res_layers[2](x, self.feed_forward)
class TransformerDecoder(nn.Module): 'Generic N layer decoder with masking.' def __init__(self, layer, N): super(TransformerDecoder, self).__init__() self.layers = clones(layer, N) self.norm = LayerNorm(layer.size) def forward(self, x, memory, src_mask, tgt_mask): for layer in self.layers: x = layer(x, memory, src_mask, tgt_mask) return self.norm(x) @classmethod def from_config(cls,N=6,d_model=512, d_ff=2048, h=8, dropout=0.1): self_attn = MultiHeadAttention(h, d_model) cross_attn = MultiHeadAttention(h, d_model) ff = PositionwiseFeedForward(d_model, d_ff, dropout) layer = TransformerDecoderLayer(d_model, self_attn, cross_attn, ff, dropout) return cls(layer,N)
from torchkeras import summary mbatch = MaskedBatch(src=src,tgt=tgt,pad=0) src_embed = nn.Sequential(WordEmbedding(d_model=32, vocab = len(vocab_x)), PositionEncoding(d_model=32, dropout=0.1)) encoder = TransformerEncoder.from_config(N=3,d_model=32, d_ff=128, h=8, dropout=0.1) memory = encoder(src_embed(src),mbatch.src_mask) tgt_embed = nn.Sequential(WordEmbedding(d_model=32, vocab = len(vocab_y)), PositionEncoding(d_model=32, dropout=0.1)) decoder = TransformerDecoder.from_config(N=3,d_model=32, d_ff=128, h=8, dropout=0.1) result = decoder.forward(tgt_embed(mbatch.tgt),memory,mbatch.src_mask,mbatch.tgt_mask) summary(decoder,input_data_args = [tgt_embed(mbatch.tgt),memory, mbatch.src_mask,mbatch.tgt_mask]);
decoder.eval() mbatch.tgt[0][1]=8 result = decoder.forward(tgt_embed(mbatch.tgt),memory,mbatch.src_mask,mbatch.tgt_mask) print (torch.sum(result[0][0])) mbatch.tgt[0][1]=7 result = decoder.forward(tgt_embed(mbatch.tgt),memory,mbatch.src_mask,mbatch.tgt_mask) print (torch.sum(result[0][0]))
9,生成器: Generator用于將解碼器解碼輸出的向量序列中的每個(gè)向量逐個(gè)映射成為輸出詞典中各個(gè)詞的取詞概率。一般由一個(gè)Linear層接F.log_softmax構(gòu)成,比較簡(jiǎn)單。接F.log_softmax而不接F.softmax的原因是對(duì)于一些特別小的概率如1e-100,在精度約束條件下,F(xiàn).log_softmax能夠更加準(zhǔn)確地表示其大小。
class Generator(nn.Module): 'Define standard linear + softmax generation step.' def __init__(self, d_model, vocab): super(Generator, self).__init__() self.proj = nn.Linear(d_model, vocab) def forward(self, x): return F.log_softmax(self.proj(x), dim=-1)
generator = Generator(d_model = 32, vocab = len(vocab_y)) log_probs = generator(result) probs = torch.exp(log_probs) print ('output_probs.shape:' ,probs.shape) print ('sum(probs)=1:' ) print (torch.sum(probs,dim = -1)[0]) summary(generator,input_data = result);
10,變形金剛:Transformer用于Seq2Seq轉(zhuǎn)碼,例如用于機(jī)器翻譯,采用EncoderDecoder架構(gòu),由Encoder, Decoder 和 Generator組成
from torch import nn class Transformer(nn.Module): '' ' A standard Encoder-Decoder architecture. Base for this and many other models. ' '' def __init__(self, encoder, decoder, src_embed, tgt_embed, generator): super(Transformer, self).__init__() self.encoder = encoder self.decoder = decoder self.src_embed = src_embed self.tgt_embed = tgt_embed self.generator = generator self.reset_parameters() def forward(self, src, tgt, src_mask, tgt_mask): 'Take in and process masked src and target sequences.' return self.generator(self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)) def encode(self, src, src_mask): return self.encoder(self.src_embed(src), src_mask) def decode(self, memory, src_mask, tgt, tgt_mask): return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask) @classmethod def from_config(cls,src_vocab,tgt_vocab,N=6,d_model=512, d_ff=2048, h=8, dropout=0.1): encoder = TransformerEncoder.from_config(N=N,d_model=d_model, d_ff=d_ff, h=h, dropout=dropout) decoder = TransformerDecoder.from_config(N=N,d_model=d_model, d_ff=d_ff, h=h, dropout=dropout) src_embed = nn.Sequential(WordEmbedding(d_model, src_vocab), PositionEncoding(d_model, dropout)) tgt_embed = nn.Sequential(WordEmbedding(d_model, tgt_vocab), PositionEncoding(d_model, dropout)) generator = Generator(d_model, tgt_vocab) return cls(encoder, decoder, src_embed, tgt_embed, generator) def reset_parameters(self): for p in self.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p)
from torchkeras import summary net = Transformer.from_config(src_vocab = len(vocab_x),tgt_vocab = len(vocab_y), N=2, d_model=32, d_ff=128, h=8, dropout=0.1) mbatch = MaskedBatch(src=src,tgt=tgt,pad=0) summary(net,input_data_args = [mbatch.src,mbatch.tgt,mbatch.src_mask,mbatch.tgt_mask]);
三,訓(xùn)練模型Transformer的訓(xùn)練主要用到了以下兩個(gè)技巧:
1,學(xué)習(xí)率調(diào)度: Learning Rate Scheduler (用于提升模型學(xué)習(xí)穩(wěn)定性。做法是學(xué)習(xí)率先warm up線性增長(zhǎng),再按照 1/sqrt(step) 規(guī)律緩慢下降)
2,標(biāo)簽平滑: Label Smoothing. (用于讓模型更加集中在對(duì)分類錯(cuò)誤的樣本的學(xué)習(xí),而不是擴(kuò)大已經(jīng)分類正確樣本中正負(fù)樣本預(yù)測(cè)差距。做法是將正例標(biāo)簽由1改成0.1,負(fù)例標(biāo)簽由0改成0.9/vocab_size)
介紹了用這兩個(gè)方法封裝的 Optimizer和 Loss 后,我們進(jìn)一步實(shí)現(xiàn)完整訓(xùn)練代碼。
3,完整訓(xùn)練代碼。
1,學(xué)習(xí)率調(diào)度:Learning Rate Scheduler用于提升模型學(xué)習(xí)穩(wěn)定性。
做法是學(xué)習(xí)率先warm up線性增長(zhǎng),再按照 1/sqrt(step) 規(guī)律緩慢下降。
學(xué)習(xí)率的warm up為何有效呢?
一種解釋性觀點(diǎn)是認(rèn)為這能夠讓模型初始學(xué)習(xí)時(shí)參數(shù)平穩(wěn)變化并避免對(duì)開始的幾個(gè)batch數(shù)據(jù)過擬合陷入局部最優(yōu)。
由于剛學(xué)習(xí)時(shí),loss比較大,梯度會(huì)很大,如果學(xué)習(xí)率也很大,兩者相乘會(huì)更大,那么模型參數(shù)會(huì)隨著不同batch數(shù)據(jù)的差異劇烈抖動(dòng),無法有效地學(xué)習(xí),也容易對(duì)開始的幾個(gè)batch數(shù)據(jù)過擬合,后期很難拉回來。
等到模型學(xué)習(xí)了一些時(shí)候,loss變小了,梯度也會(huì)小,學(xué)習(xí)率調(diào)大,兩者相乘也不會(huì)很大,模型依然可以平穩(wěn)有效地學(xué)習(xí)。
后期為何又要讓調(diào)低學(xué)習(xí)率呢?
這是因?yàn)楹笃谀P蚻oss已經(jīng)很小了,在最優(yōu)參數(shù)附近了,如果學(xué)習(xí)率過大,容易在最優(yōu)參數(shù)附近震蕩,無法逼近最優(yōu)參數(shù)。
參考:https://www.zhihu.com/question/338066667
#注1:此處通過繼承方法將學(xué)習(xí)率調(diào)度策略融入Optimizer #注2:NoamOpt中的Noam是論文作者之一的名字 #注3:學(xué)習(xí)率是按照step而非epoch去改變的 class NoamOpt(torch.optim.AdamW): def __init__(self, params, model_size=512, factor=1.0, warmup=4000, lr=0, betas=(0.9, 0.98), eps=1e-9, weight_decay=0, amsgrad=False): super(NoamOpt,self).__init__(params, lr=lr, betas=betas, eps=eps, weight_decay=weight_decay, amsgrad=amsgrad) self._step = 0 self.warmup = warmup self.factor = factor self.model_size = model_size def step(self,closure=None): 'Update parameters and rate' self._step += 1 rate = self.rate() for p in self.param_groups: p['lr' ] = rate super(NoamOpt,self).step(closure=closure) def rate(self, step = None): 'Implement `lrate` above' if step is None: step = self._step return self.factor * \ (self.model_size ** (-0.5) * min(step * self.warmup ** (-1.5),step ** (-0.5))) optimizer = NoamOpt(net.parameters(), &nbnbsp;model_size=net.src_embed[0].d_model, factor=1.0, warmup=400)
import plotly.express as px opts = [NoamOpt(net.parameters(),model_size=512, factor =1, warmup=4000), NoamOpt(net.parameters(),model_size=512, factor=1, warmup=8000), NoamOpt(net.parameters(),model_size=256, factor=1, warmup=4000)] steps = np.arange(1, 20000) rates = [[opt.rate(i) for opt in opts] for i in steps] dfrates = pd.DataFrame(rates,columns = ['512:4000' , '512:8000' , '256:4000' ]) dfrates['steps' ] = steps fig = px.line(dfrates,x='steps' ,y=['512:4000' , '512:8000' , '256:4000' ]) fig.layout.yaxis.title = 'lr' fig
2,標(biāo)簽平滑:Label Smoothing用于讓模型更加集中在對(duì)分類錯(cuò)誤的樣本的學(xué)習(xí),而不是擴(kuò)大已經(jīng)分類正確樣本中正負(fù)樣本預(yù)測(cè)差距。
做法是將正例標(biāo)簽由1改成0.1,負(fù)例標(biāo)簽由0改成0.9/vocab_size
多分類一般用softmax激活函數(shù),要讓模型對(duì)正例標(biāo)簽預(yù)測(cè)值為1是非常困難的,那需要輸出正無窮才可以.
對(duì)負(fù)例標(biāo)簽預(yù)測(cè)值為0也是非常困難的,那需要輸出負(fù)無窮才可以。
但實(shí)際上我們不需要模型那么確信,只要正例標(biāo)簽的預(yù)測(cè)值比負(fù)例標(biāo)簽大就行了。
因此可以做標(biāo)簽平滑,讓模型不必費(fèi)勁地?zé)o限擴(kuò)大分類正確樣本中正負(fù)樣本之間的預(yù)測(cè)差距,而是集中在對(duì)分類錯(cuò)誤的樣本的學(xué)習(xí)。
由于在激活函數(shù)中已經(jīng)采用了F.log_softmax, 所以損失函數(shù)不能用nn.CrossEntropyLoss,而需要使用 nn.NLLoss.
(注:nn.LogSoftmax + nn.NLLLoss = nn.CrossEntropyLoss)
同時(shí)由于使用了標(biāo)簽平滑,采用nn.NLLoss時(shí)損失的最小值無法變成0,需要扣除標(biāo)簽分布本身的熵,損失函數(shù)進(jìn)一步變成 nn.KLDivLoss.
在采用標(biāo)簽平滑的時(shí)候,nn.KLDivLoss和nn.NLLoss的梯度相同,優(yōu)化效果相同,但其最小值是0,更符合我們對(duì)損失的直觀理解。
class LabelSmoothingLoss(nn.Module): 'Implement label smoothing.' def __init__(self, size, padding_idx, smoothing=0.0): #size為詞典大小 super(LabelSmoothingLoss, self).__init__() self.criterion = nn.KLDivLoss(reduction='sum' ) self.padding_idx = padding_idx self.confidence = 1.0 - smoothing self.smoothing = smoothing self.size = size self.true_dist = None def forward(self, x, target): assert x.size(1) == self.size true_dist = x.data.clone() true_dist.fill_(self.smoothing / (self.size - 2)) #預(yù)測(cè)結(jié)果不會(huì)是<SOS> #和<PAD> true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence) true_dist[:, self.padding_idx] = 0 mask = torch.nonzero((target.data == self.padding_idx).int()) if mask.dim() > 0: true_dist.index_fill_(0, mask.squeeze(), 0.0) self.true_dist = true_dist return self.criterion(x, true_dist)
# Example of label smoothing. smooth_loss = LabelSmoothingLoss(5, 0, 0.4) predict = torch.FloatTensor([[1e-10, 0.2, 0.7, 0.1, 1e-10], [1e-10, 0.2, 0.7, 0.1, 1e-10], [1e-10, 0.2, 0.7, 0.1, 1e-10]]) loss = smooth_loss(predict.log(), torch.LongTensor([2, 1, 0])) print ('smoothed target:\n' ,smooth_loss.true_dist,'\n' ) print ('loss:' ,loss) px.imshow(smooth_loss.true_dist,color_continuous_scale='blues' ,height=600,width=1000)
smoothed target: tensor([[0.0000, 0.1333, 0.6000, 0.1333, 0.1333], [0.0000, 0.6000, 0.1333, 0.1333, 0.1333], [0.0000, 0.0000, 0.0000, 0.0000, 0.0000]]) loss: tensor(5.9712)
3,完整訓(xùn)練代碼有了優(yōu)化器和Loss后,我們便可以訓(xùn)練模型了。
我們先整體試算loss和metric,然后再套上torchkeras的訓(xùn)練模版。
#整體流程試算 for src,tgt in dl_train: break mbatch = MaskedBatch(src=src,tgt=tgt,pad = 0) net = Transformer.from_config(src_vocab = len(vocab_x),tgt_vocab = len(vocab_y), N=3, d_model=64, d_ff=128, h=8, dropout=0.1) #loss loss_fn = LabelSmoothingLoss(size=len(vocab_y), padding_idx=0, smoothing=0.2) preds = net.forward(mbatch.src, mbatch.tgt, mbatch.src_mask, mbatch.tgt_mask) preds = preds.reshape(-1, preds.size(-1)) labels = mbatch.tgt_y.reshape(-1) loss = loss_fn(preds, labels)/mbatch.ntokens print ('loss=' ,loss.item()) #metric preds = preds.argmax(dim=-1).view(-1)[labels!=0] labels = labels[labels!=0] acc = (preds==labels).sum()/(labels==labels).sum() print ('acc=' ,acc.item())
loss= 2.1108953952789307acc= 0.08041179925203323
from torchmetrics import Accuracy #使用torchmetrics中的指標(biāo) accuracy = Accuracy(task='multiclass' ,num_classes=len(vocab_y)) accuracy.update(preds,labels) print ('acc=' ,accuracy.compute().item())
acc= 0.08041179925203323
下面使用我們的夢(mèng)中情爐來實(shí)現(xiàn)最優(yōu)雅的訓(xùn)練循環(huán)~
from torchkeras import KerasModel class StepRunner: def __init__(self, net, loss_fn, accelerator=None, stage = 'train' , metrics_dict = None, optimizer = None, lr_scheduler = None ): self.net,self.loss_fn,self.metrics_dict,self.stage = net,loss_fn,metrics_dict,stage self.optimizer,self.lr_scheduler = optimizer,lr_scheduler self.accelerator = accelerator if self.stage=='train' : self.net.train() else : self.net.eval() def __call__(self, batch): src,tgt = batch mbatch = MaskedBatch(src=src,tgt=tgt,pad = 0) #loss with self.accelerator.autocast(): preds = net.forward(mbatch.src, mbatch.tgt, mbatch.src_mask, mbatch.tgt_mask) preds = preds.reshape(-1, preds.size(-1)) labels = mbatch.tgt_y.reshape(-1) loss = loss_fn(preds, labels)/mbatch.ntokens #filter padding preds = preds.argmax(dim=-1).view(-1)[labels!=0] labels = labels[labels!=0] #backward() if self.stage=='train' and self.optimizer is not None: self.accelerator.backward(loss) if self.accelerator.sync_gradients: self.accelerator.clip_grad_norm_(self.net.parameters(), 1.0) self.optimizer.step() if self.lr_scheduler is not None: self.lr_scheduler.step() self.optimizer.zero_grad() all_loss = self.accelerator.gather(loss).sum() all_preds = self.accelerator.gather(preds) all_labels = self.accelerator.gather(labels) #losses (or plain metrics that can be averaged) step_losses = {self.stage+'_loss' :all_loss.item()} step_metrics = {self.stage+'_' +name:metric_fn(all_preds, all_labels).item() for name,metric_fn in self.metrics_dict.items()} if self.stage=='train' : if self.optimizer is not None: step_metrics['lr' ] = self.optimizer.state_dict()['param_groups' ][0]['lr' ] else : step_metrics['lr' ] = 0.0 return step_losses,step_metrics KerasModel.StepRunner = StepRunner
from torchmetrics import Accuracy net = Transformer.from_config(src_vocab = len(vocab_x),tgt_vocab = len(vocab_y), N=5, d_model=64, d_ff=128, h=8, dropout=0.1)
loss_fn = LabelSmoothingLoss(size=len(vocab_y), padding_idx=0, smoothing=0.1) metrics_dict = {'acc' :Accuracy(task='multiclass' ,num_classes=len(vocab_y))} optimizer = NoamOpt(net.parameters(),model_size=64) model = KerasModel(net, loss_fn=loss_fn, metrics_dict=metrics_dict, optimizer = optimizer) model.fit( train_data=dl_train, val_data=dl_val, epochs=100, ckpt_path='checkpoint' , patience=10, monitor='val_acc' , mode='max' , callbacks=None, plot=True )
四,使用模型下面使用貪心法進(jìn)行翻譯推理過程。
和訓(xùn)練過程可以通過掩碼遮擋未來token,從而實(shí)現(xiàn)一個(gè)句子在序列長(zhǎng)度方向并行訓(xùn)練不同。
翻譯推理過程只有先翻譯了前面的內(nèi)容,添加到輸出中,才能夠翻譯后面的內(nèi)容,這個(gè)過程是無法在序列維度并行的。
Decoder&Generator第k位的輸出實(shí)際上對(duì)應(yīng)的是 已知 輸入編碼后的memory和前k位Deocder輸入(解碼序列)
的情況下解碼序列第k+1位取 輸出詞典中各個(gè)詞的概率。
貪心法是獲取解碼結(jié)果的簡(jiǎn)化方案,工程實(shí)踐當(dāng)中一般使用束搜索方法(Beam Search)
參考:《十分鐘讀懂Beam Search》 https://zhuanlan.zhihu.com/p/114669778
def greedy_decode(net, src, src_mask, max_len, start_symbol): net.eval() memory = net.encode(src, src_mask) ys = torch.full((len(src),max_len),start_symbol,dtype = src.dtype).to(src.device) for i in range(max_len-1): out = net.generator(net.decode(memory, src_mask, ys, tril_mask(ys))) ys[:,i+1]=out.argmax(dim=-1)[:,i] return ys def get_raw_words(tensor,vocab_r) ->'str' : words = [vocab_r[i] for i in tensor.tolist()] return words def get_words(tensor,vocab_r) ->'str' : s = '' .join([vocab_r[i] for i in tensor.tolist()]) words = s[:s.find('<EOS>' )].replace('<SOS>' ,'' ) return words def prepare(x,accelerator=model.accelerator): return x.to(accelerator.device)
##解碼翻譯結(jié)果 net = model.net net.eval() net = prepare(net) src,tgt = get_data() src,tgt = prepare(src),prepare(tgt) mbatch = MaskedBatch(src=src.unsqueeze(dim=0),tgt=tgt.unsqueeze(dim=0)) y_pred = greedy_decode(net,mbatch.src,mbatch.src_mask,50,vocab_y['<SOS>' ]) print ('input:' ) print (get_words(mbatch.src[0],vocab_xr),'\n' ) #標(biāo)簽結(jié)果 print ('ground truth:' ) print (get_words(mbatch.tgt[0],vocab_yr),'\n' ) #標(biāo)簽結(jié)果 print ('prediction:' ) print (get_words(y_pred[0],vocab_yr)) #解碼預(yù)測(cè)結(jié)果,原始標(biāo)簽中<PAD>位置的預(yù)測(cè)可以忽略
input: 744905345112863593+7323038062936802655
ground truth: 8067943408049666248
prediction: 8067943408049666248
五,評(píng)估模型我們訓(xùn)練過程中監(jiān)控的acc實(shí)際上是字符級(jí)別的acc,現(xiàn)在我們來計(jì)算樣本級(jí)別的準(zhǔn)確率。
from tqdm.auto import tqdm net = prepare(net) loop = tqdm(range(1,201)) correct = 0 for i in loop: src,tgt = get_data() src,tgt = prepare(src),prepare(tgt) mbatch = MaskedBatch(src=src.unsqueeze(dim=0),tgt=tgt.unsqueeze(dim=0)) y_pred = greedy_decode(net,mbatch.src,mbatch.src_mask,50,vocab_y['<SOS>' ]) inputs = get_words(mbatch.src[0],vocab_xr) #標(biāo)簽結(jié)果 gt = get_words(mbatch.tgt[0],vocab_yr) #標(biāo)簽結(jié)果 preds = get_words(y_pred[0],vocab_yr) #解碼預(yù)測(cè)結(jié)果,原始標(biāo)簽中<PAD>位置的預(yù)測(cè)可以忽略 if preds==gt: correct+=1 loop.set_postfix(acc = correct/i) print ('acc=' ,correct/len(loop))
perfect,基本完美實(shí)現(xiàn)兩數(shù)之和。