Transformer

  1. 介绍
  2. Transformer
  3. Code讲解
    1. 数据部分
    2. 模型部分
    3. 训练
  4. Reference

介绍

  Transformer自从去年成为一大热门,图像领域也有了广泛应用,但是这个通用模型一直没有接触过,趁着这次的机会,打算从开始学一下Transformer这个模型。主要是基于两个一个是论文Attention is All You Need,以及pytorch的tutorial

  整篇博客大致是第二部分讲解宏观,即Transformer的来源和解决的问题,第三部分以WikiText2的例子来讲解一个简单Transformer模型的构建过程,如果我写了第四部分,那第四部分肯定是讲解Transformer在视觉中的应用。

Transformer


Transformer架构

  在讲Transformer之前,先说一下是Attention这种机制,利用机器学习解决某个问题时,我们都是利用某个模型将原始数据映射到一个高维特征空间(这个过程可能是升维,也可能是降维,也有可能是先升后降)中,使得该任务在该空间下可以被更好地分类、回归,从传统机器学习的SVM、PCA等,到现在大热的深度学习模型,都是这样的思路,唯一不同的,就是我们得到这个高维空间的过程和能力,在CV领域,这个过程是基于CNN等模型实现的,得到对于原始图片的编码向量;在NLP领域,这个过程基于RNN、LSTM等方法实现,得到词向量,这就是encoder的过程。

  那么,这个提取特征的过程,我们想要保留什么,就决定了我们使用怎样的方法去得到特征向量。以计算机视觉发展为例,最一开始,对于像素点的简单数值统计得到了一张图像的特征向量;后来我们想要获得图像的特征信息(点、线、面等),就有了各种算子;再之后,深度学习出现,从任意函数拟合的多层感知机到CNN,这种机制,也是为了获得更详细的局部信息,和更全面的全局信息(CNN局部信息捕捉,ResNet和DesenNet跳连保证信息既有全局也有局部),在有限维度下,我们的特征向量能保留的关键信息越多,模型能力也就越强。

  但是,与图像不同,NLP的处理除了原始数据到特征空间的映射(encoder),以及特征空间到目标空间的映射(decoder),和图像有个很重要的区别在于,NLP的上下文的关系更加重要,图像利用卷积得到对图像更高层次的理解,不需要再次利用低纬度的点线面特征(不考虑Transfer Learning),但是,NLP的上下文关系依然是很重要的点,比如在一个翻译任务中,look out是关心照顾,还是小心注意,取决于后面是什么(You should look out for yourself from now on. /Look out, he’s got a gun),因此,除了表征源空间到特征空间的映射,源空间本身之间的编码关系如何表征,也是一个问题,但是在过去的模型中,LSTM、RNN和CNN都有对原始输入编码和解码,却缺少了源空间本身的编码和表征,尤其是当源空间中两个词的距离更长的时候,这种能力就变得非常微弱了。

  为了解决距离较远的词组之间的信息传递和关系(当然,也解决了同一句子序列中不同语素的关注度),Attention应运而生。那么,什么是Attention呢?以最基本的Seq2Seq模型为例:

  • 假设一个句子S由单词序列$[w_0, w_1, …, w_n]$

    1. 利用某种嵌入方法(embedding)将每个单词$w_i$编码成一个向量$v_i$。

    2. 解码时,使用学习到的注意力权重$a_i$对1中得到的所有单词向量做加权线性组合$\sum_i a_i v_i$.

    3. 在decoder进行下一个词的预测时,使用2中得到的线性组合。

  对上述过程的一种理解是,我们希望在我们学习最终的特征空间时,不只是利用单个词的特征向量,而是将每个词的特征向量变成对于不同单词的权重组合(保留单词之间的影响力),同时这样也避免了过去输入和输出向量必须统一的问题,现在可以做成任意长度输入和任意长度输出。

  有了Attention做基础之后,我们现在来看Transformer的重点Self-Attention:


Self-Attention过程

  简单来说,Self-Attention就是前面简单Attention的复杂版本,利用了三个向量Q,K,V计算相互之间的相似度,具体而言,在实现过程中,利用embedding实现了从单词到词向量的映射,之后利用该词向量分别乘三个矩阵$W^Q, W^k,W^V$(训练参数)得到三个向量$Q, K, V$。基于以上计算过程,得到最终的Self-Attention的输出,这里的Q和K的相似度用来度量相关性,V表示每个向量的分量大小。

  Multi-head是Transformer中提出的另一改进策略,简单来说就是利用多个Self-Attention捕捉不同特征,之后将多个Self-Attention的输出拼接到一起,做后续网络的输入,在这个过程中,就是输入到多层前馈神经网络,进行计算。

  Positional Encoding则是对输入词向量位置编码的函数,在原文中是利用了正弦函数和余弦函数实现的。

Code讲解

  在弄懂了理论部分之后,这部分讲解基于pytorch的Transformer实现。Transformer最初是用在NLP领域的,因此这一部分也以NLP的一个小应用讲解,与Pytorch不同,这里从数据开始讲起,再到模型,最后是模型的训练。

数据部分

  首先,是数据的准备,这里涉及到很多NLP的基本代码,因此也一并讲解,主要是torchtext模块中的一些通用函数和代码。

from torchtext.datasets import WikiText2
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

train_iter = WikiText2(split='train')
tokenizer = get_tokenizer('basic_english')
'''
这里的tokenizer是一个‘basic_english’的分词器
>>> import torchtext
>>> from torchtext.data import get_tokenizer
>>> tokenizer = get_tokenizer("basic_english")
>>> tokens = tokenizer("You can now install TorchText using pip!")
>>> tokens
>>> ['you', 'can', 'now', 'install', 'torchtext', 'using', 'pip', '!']
'''

#这里将整个数据集制作成一张词表vocab
vocab = build_vocab_from_iterator(map(tokenizer, train_iter), specials=['<unk>'])
#这里是如果查词超过词表,就返回‘<unk>’对应的序号0
vocab.set_default_index(vocab['<unk>'])

def data_process(raw_text_iter: dataset.IterableDataset) -> Tensor:
    """将原始文本处理成一个tensor"""
    data = [torch.tensor(vocab(tokenizer(item)), dtype=torch.long) for item in raw_text_iter]
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))

train_iter, val_iter, test_iter = WikiText2()
#将原始数据连接成一个长向量,即这里的train_data
train_data = data_process(train_iter)
val_data = data_process(val_iter)
test_data = data_process(test_iter)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def batchify(data: Tensor, bsz: int) -> Tensor:
    """
    注意:这里有一个很明确的变化,是先view再转置(.t())
    这样的话,输出的维度是纵向连续,而不是一行连续
    同时,输出的维度变成了[N // bsz, bsz]
    """
    seq_len = data.size(0) // bsz
    data = data[:seq_len * bsz]
    data = data.view(bsz, seq_len).t().contiguous()
    return data.to(device)

batch_size = 20
eval_batch_size = 10
train_data = batchify(train_data, batch_size)  # shape [seq_len, batch_size]
val_data = batchify(val_data, eval_batch_size)
test_data = batchify(test_data, eval_batch_size)

  到这里,我们就将原始的文本转换成了一个tensor,唯一需要注意的是,这里的词表顺序如下图,而不是理解意义上的横向连续:


词表转换前后的格式

  将单词从字符转变成tensor之后,因为训练需要输入和输出,因此我们需要将上述格式转换成对应的(input, label)的格式。

bptt = 35
def get_batch(source: Tensor, i: int) -> Tuple[Tensor, Tensor]:
    """
    输入:一个tensor,[full_seq_len, batch_size]
    输出:一个data和对应的target,[seq_len, batch_size]和[seq_len * batch_size]
    """
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].reshape(-1)
    return data, target

模型部分

  除了数据,我们还需要给出我们的模型,即Transformer的结构:

import math
from typing import Tuple

import torch
from torch import nn, Tensor
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset

class TransformerModel(nn.Module):

    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, ntoken)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor) -> Tensor:
        """
        Args:
            src: Tensor, shape [seq_len, batch_size]
            src_mask: Tensor, shape [seq_len, seq_len]

        Returns:
            output Tensor of shape [seq_len, batch_size, ntoken]
        """
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output)
        return output


def generate_square_subsequent_mask(sz: int) -> Tensor:
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

  Transformer之前有一个位置编码,和之前说的一样,用的是正弦和余弦函数

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

训练

  模型和数据都有了,就可以进行训练,首先是设置模型的超参数:

ntokens = len(vocab)  # size of vocabulary
emsize = 200  # embedding dimension
d_hid = 200  # dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 2  # number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 2  # number of heads in nn.MultiheadAttention
dropout = 0.2  # dropout probability
model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)

  然后,就是和图像类似的训练过程:

import copy
import time

criterion = nn.CrossEntropyLoss()
lr = 5.0  # learning rate
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

def train(model: nn.Module) -> None:
    model.train()  # turn on train mode
    total_loss = 0.
    log_interval = 200
    start_time = time.time()
    src_mask = generate_square_subsequent_mask(bptt).to(device)

    num_batches = len(train_data) // bptt
    for batch, i in enumerate(range(0, train_data.size(0) - 1, bptt)):
        data, targets = get_batch(train_data, i)
        batch_size = data.size(0)
        if batch_size != bptt:  # only on last batch
            src_mask = src_mask[:batch_size, :batch_size]
        output = model(data, src_mask)
        loss = criterion(output.view(-1, ntokens), targets)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        if batch % log_interval == 0 and batch > 0:
            lr = scheduler.get_last_lr()[0]
            ms_per_batch = (time.time() - start_time) * 1000 / log_interval
            cur_loss = total_loss / log_interval
            ppl = math.exp(cur_loss)
            print(f'| epoch {epoch:3d} | {batch:5d}/{num_batches:5d} batches | '
                  f'lr {lr:02.2f} | ms/batch {ms_per_batch:5.2f} | '
                  f'loss {cur_loss:5.2f} | ppl {ppl:8.2f}')
            total_loss = 0
            start_time = time.time()

def evaluate(model: nn.Module, eval_data: Tensor) -> float:
    model.eval()  # turn on evaluation mode
    total_loss = 0.
    src_mask = generate_square_subsequent_mask(bptt).to(device)
    with torch.no_grad():
        for i in range(0, eval_data.size(0) - 1, bptt):
            data, targets = get_batch(eval_data, i)
            batch_size = data.size(0)
            if batch_size != bptt:
                src_mask = src_mask[:batch_size, :batch_size]
            output = model(data, src_mask)
            output_flat = output.view(-1, ntokens)
            total_loss += batch_size * criterion(output_flat, targets).item()
    return total_loss / (len(eval_data) - 1)

  寻找最好的模型,并保存

best_val_loss = float('inf')
epochs = 3
best_model = None

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(model)
    val_loss = evaluate(model, val_data)
    val_ppl = math.exp(val_loss)
    elapsed = time.time() - epoch_start_time
    print('-' * 89)
    print(f'| end of epoch {epoch:3d} | time: {elapsed:5.2f}s | '
          f'valid loss {val_loss:5.2f} | valid ppl {val_ppl:8.2f}')
    print('-' * 89)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model = copy.deepcopy(model)

    scheduler.step()

  测试最好的模型

test_loss = evaluate(best_model, test_data)
test_ppl = math.exp(test_loss)
print('=' * 89)
print(f'| End of training | test loss {test_loss:5.2f} | '
      f'test ppl {test_ppl:8.2f}')
print('=' * 89)

  注意,以上实现过程好像缺少了output到self-attention那部分,不确定是为什么,可能是语言预测的这个任务,不需要对输出做embedding?

Reference

[1] Attention is All You Need

[2] Pytorch_tutorial: Language Modeling with nn.Transformer and TorchText

[3] 深度学习中的注意力模型(2017版)


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以其他方式联系。

💰

×

Help us with donation