Transformer 一起动手编码学原理

本文涉及的产品
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
交互式建模 PAI-DSW,每月250计算时 3个月
模型训练 PAI-DLC,5000CU*H 3个月
简介: 学习Transformer,快来跟着作者动手写一个。

作为工程同学,学习Transformer中,不动手写一个,总感觉理解不扎实。纸上得来终觉浅,绝知此事要躬行,抽空多debug几遍!

注:不涉及算法解释,仅是从工程代码实现去加强理解。

一、准备知识


以预测二手房价格的模型,先来理解机器学习中的核心概念。

1.1、手撸版

1、准备训练数据

# 样本数
num_examples = 1000
# 特征数:房屋大小、新旧
num_input = 2
# 线性回归模型,2个W参数值:2,-3.4
true_w = [2, -3.4]
# 1个b参数值
true_b = 4.2

# 随机生成样本:特征值
features = torch.randn(num_examples, num_input, dtype=torch.float32)
# print(features[0]) 

# 随机生成样本:y值
labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b
# print(labels[0])
# 随机生成一批数据,给y值再随机化下
_temp = torch.tensor(np.random.normal(0, 0.01, size=labels.size()), dtype=torch.float32)
labels = labels + _temp
# print(labels[0])

2、定义模型

# 模型参数 w:w1、w2,b:b,设了初始值
w = torch.tensor(np.random.normal(0, 0.01, (num_input, 1)), dtype=torch.float32)
b = torch.zeros(1, dtype=torch.float32)
# 设置要求计算梯度
w.requires_grad_(requires_grad=True)
b.requires_grad_(requires_grad=True)

# 线性回归模型:定义 torch.mm(X, w) + b,mm是矩阵乘法(m:multiply缩写)
def linreg(X, w, b):
    return torch.mm(X, w) + b

# 定义损失函数:y_pred=预测值、y=真实值,最简单的 平方误差
def squared_loss(y_pred, y):
    return (y_pred - y.view(y_pred.size())) ** 2 / 2

# 定义梯度下降算法:sgd
# params是参数(data 是参数值、grad 是梯度值),lr是学习率,batch是数量
# 为什么要除batch?因为这个梯度值,是在batch个数据上累加算的
def sgd(params, lr, batch):
    for param in params:
        # 注意这里更改param时用的param.data
        param.data -= lr * param.grad / batch

3、训练模型

# 初始学习率
lr = 0.03
# 训练轮次
epoch = 5
# 网络:一层,2个输入(2个参数 w1、w2),1个b参数
net = linreg
# 损失函数
loss = squared_loss
# 1个批次大小
batch_size = 10

# 训练模型一共需要epoch个迭代周期
for epoch in range(epoch):
    # 在每一个迭代周期中,所有样本都要跑,从 0~1000,每个batch=10
    for X, y in data_iter(batch_size, features, labels):
        # ls是这个batch的损失求和
        ls = loss(net(X, w, b), y).sum()
        # 求梯度值
        ls.backward()
        # 更新参数
        sgd([w, b], lr, batch_size)
        # 不要忘了梯度清零,下一次要用
        w.grad.data.zero_()
        b.grad.data.zero_()

    # 在计算这轮训练完后,打印 loss,可以观察在不断变小
    train_l = loss(net(features, w, b), labels)
    print('epoch %d, loss %f' % (epoch + 1, train_l.mean().item()))
辅助函数:by batch & 随机,读取样本数据
# 定义随机读取数据 by batch
def data_iter(batch, features, labels):
    nums = len(features)
    # 生成 0-1000 数组
    indices = list(range(nums))
    # 打乱下标的读取顺序
    random.shuffle(indices)
    # 生成:start=0,stop=1000,step=batch=10
    for i in range(0, nums, batch):
        # 截取一段下标,size=batch
        t_ind = indices[i: min(i + batch, num_examples)]  # 最后一次可能不足一个batch
        # 转换为torch要求格式:tensor
        j = torch.LongTensor(t_ind)
        # 按照下标从向量中找
        # yield:生成一个迭代器返回,有点类似闭包,每调用一次返回一次,且下次调用时会从上次暂停的位置继续
        yield features.index_select(0, j), labels.index_select(0, j)

1.2、pytorch版


1、准备训练数据

同上

2、定义模型

# 定义模型:线性回归
class LinearNet(nn.Module):
    def __init__(self, n_feature):
        super(LinearNet, self).__init__()
        # 线性规划:n_feature=几个w,1=1个b
        self.linear = nn.Linear(n_feature, 1)

    # forward 定义【前向传播】
    def forward(self, x):
        y = self.linear(x)
        return y

# 实例化模型,初始化参数值
net = LinearNet(num_input)
init.normal_(net[0].weight, mean=0, std=0.01)
init.constant_(net[0].bias, val=0)

# 定义损失函数
loss = nn.MSELoss()

# 定义梯度下降算法(lr是学习率)
optimizer = optim.SGD(net.parameters(), lr=0.03)

3、训练模型

num_epochs = 3
for epoch in range(1, num_epochs + 1):
    for X, y in data_iter:
        output = net(X)
        # 算损失值
        l_sum = loss(output, y.view(-1, 1))
        # 更新梯度
        l_sum.backward()
        # 更新w和b
        optimizer.step()
        # 梯度清零
        optimizer.zero_grad()
    print('epoch %d, loss: %f' % (epoch, l_sum.item()))

batch 读取数据

# 读取数据 by batch & 随机
batch_size = 10
data_set = Data.TensorDataset(features, labels)
data_iter = Data.DataLoader(data_set, batch_size, shuffle=True)

在用torch框架,进一步要理解:

a、nn.Module 和 forward() 函数:表示神经网络中的一个层,覆写 init 和 forward 方法(提问:实现2层网络怎么做?)

b、debug 观察下数据变化,和 手撸版 对比着去理解


二、手写Transformer

image.png

上面这是一个"逻辑"示意图。印象中,第1次看时,以为 "inputs & outputs" 是并行计算、但上面又有依赖,很糊涂。

这不是一个技术架构图,上图有很多概念,attention、multi-head等。先尝试转换为面向对象的类图,如下:

image.png

2.1、例子:翻译

# 定义词典
source_vocab = {'E': 0, '我': 1, '吃': 2, '肉': 3}
target_vocab = {'E': 0, 'I': 1, 'eat': 2, 'meat': 3, 'S': 4}

# 样本数据
encoder_input = torch.LongTensor([[1, 2, 3, 0]]).to(device)  # 我 吃 肉 E, E代表结束词
decoder_input = torch.LongTensor([[4, 1, 2, 3]]).to(device)  # S I eat meat, S代表开始词, 并右移一位,用于并行训练
target = torch.LongTensor([[1, 2, 3, 0]]).to(device)  # I eat meat E, 翻译目标

2.2、定义模型


按照上面类图,我们来一点点实现

1、Attention

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, attn_mask):
        # Q、K、V,此时是已经乘过 W(q)、W(k)、W(v) 矩阵
        # 如下图,但不用一个个算,矩阵乘法一次搞定
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)
        # 遮盖区的值设为近0,表示E结尾 or decoder 自我顺序遮盖,注意力丢弃
        scores.masked_fill_(attn_mask, -1e9)
        # softmax后(遮盖区变为0)
        attn = nn.Softmax(dim=-1)(scores)
        # 乘积意义:给V带上了注意力信息。prob就是下图z(矩阵计算不用在v1+v2)。
        prob = torch.matmul(attn, V)
        return prob

image.png

2、MultiHeadAttention

通用变量定义

d_model = 6  # embedding size
# d_model = 3  # embedding size
d_ff = 12  # feedforward nerual network  dimension
d_k = d_v = 3  # dimension of k(same as q) and v
n_heads = 2  # number of heads in multihead attention
# n_heads = 1  # number of heads in multihead attention【注:为debug更简单,可以先改为1个head】
p_drop = 0.1  # propability of dropout
device = "cpu"

注1:按惯性会想,会有多个head、串行循环计算,不是,多个head是一个张量输入

注2:FF 全连接、残差连接、归一化,35、38 行业代码,pytorch框架带来的简化

class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        self.n_heads = n_heads
        self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False)
        self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
        self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
        self.fc = nn.Linear(d_v * n_heads, d_model, bias=False)  # ff 全连接
        self.layer_norm = nn.LayerNorm(d_model)  # normal 归一化

    def forward(self, input_Q, input_K, input_V, attn_mask):
        # input_Q:1*4*6,每批1句 * 每句4个词 * 每词6长度编码
        # residual 先临时保存下:原始值,后面做残差连接加法
        residual, batch = input_Q, input_Q.size(0)

        # 乘上 W 矩阵。注:W 就是要训练的参数
        # 注意:维度从2维变成3维,增加 head 维度,也是一次性并行计算
        Q = self.W_Q(input_Q)  # 乘以 W(6*6) 变为 1*4*6
        Q = Q.view(batch, -1, n_heads, d_k).transpose(1, 2)  # 切开为2个Head 变为 1*2*4*3 1批 2个Head 4词 3编码
        K = self.W_K(input_K).view(batch, -1, n_heads, d_k).transpose(1, 2)
        V = self.W_V(input_V).view(batch, -1, n_heads, d_v).transpose(1, 2)

        # 1*2*4*4,2个Head的4*4,最后一列为true
        # 因为最后一列是 E 结束符
        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)

        # 返回1*2*4*3,2个头,4*3为带上关注关系的4词
        prob = ScaledDotProductAttention()(Q, K, V, attn_mask)

        # 把2头重新拼接起来,变为 1*4*6
        prob = prob.transpose(1, 2).contiguous()
        prob = prob.view(batch, -1, n_heads * d_v).contiguous()

        # 全连接层:对多头注意力的输出进行线性变换,从而更好地提取信息
        output = self.fc(prob)

        # 残差连接 & 归一化
        res = self.layer_norm(residual + output) # return 1*4*6
        return res

3、Encoder

在 attention 概念中,有很关键的 "遮盖" 概念,先不细究,你debug一遍会更理解

def get_attn_pad_mask(seq_q, seq_k):  # 本质是结尾E做注意力遮盖,返回 1*4*4,最后一列为True
    batch, len_q = seq_q.size()  # 1, 4
    batch, len_k = seq_k.size()  # 1, 4
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)  # 为0则为true,变为f,f,f,true,意思是把0这个结尾标志为true
    return pad_attn_mask.expand(batch, len_q, len_k)  # 扩展为1*4*4,最后一列为true,表示抹掉结尾对应的注意力


def get_attn_subsequent_mask(seq):  # decoder的自我顺序注意力遮盖,右上三角形区为true的遮盖
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
    subsequent_mask = np.triu(np.ones(attn_shape), k=1)
    subsequent_mask = torch.from_numpy(subsequent_mask)
    return subsequent_mask
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.source_embedding = nn.Embedding(len(source_vocab), d_model)
        self.attention = MultiHeadAttention()

    def forward(self, encoder_input):
        # input 1 * 4,1句话4个单词
        # 1 * 4 * 6,将每个单词的整数字编码扩展到6个浮点数编码
        embedded = self.source_embedding(encoder_input)
        # 1 * 4 * 4 矩阵,最后一列为true,表示忽略结尾词的注意力机制
        mask = get_attn_pad_mask(encoder_input, encoder_input)
        # 1*4*6,带上关注力的4个词矩阵
        encoder_output = self.attention(embedded, embedded, embedded, mask)
        return encoder_output

4、Decoder

class Decoder(nn.Module):

    def __init__(self):
        super(Decoder, self).__init__()
        self.target_embedding = nn.Embedding(len(target_vocab), d_model)
        self.attention = MultiHeadAttention()

    # 三入参形状分别为 1*4, 1*4, 1*4*6,前两者未被embedding,注意后面这个是 encoder_output
    def forward(self, decoder_input, encoder_input, encoder_output):
        # 编码为1*4*6
        decoder_embedded = self.target_embedding(decoder_input)

        # 1*4*4 全为false,表示没有结尾词
        decoder_self_attn_mask = get_attn_pad_mask(decoder_input, decoder_input)
        # 1*4*4 右上三角区为1,其余为0
        decoder_subsequent_mask = get_attn_subsequent_mask(decoder_input)
        # 1*4*4 右上三角区为true,其余为false
        decoder_self_mask = torch.gt(decoder_self_attn_mask + decoder_subsequent_mask, 0)

        # 1*4*6 带上注意力的4词矩阵【注:decoder里面,第1个attention】
        decoder_output = self.attention(decoder_embedded, decoder_embedded, decoder_embedded, decoder_self_mask)

        # 1*4*4 最后一列为true,表示E结尾词
        decoder_encoder_attn_mask = get_attn_pad_mask(decoder_input, encoder_input)
        # 输入均为 1*4*6,Q表示"S I eat meat"、K表示"我吃肉E"、V表示 "我吃肉E"
        #【注:decoder里面,第2个attention】
        decoder_output = self.attention(decoder_output, encoder_output, encoder_output, decoder_encoder_attn_mask)
        return decoder_output

5、Transformer

class Transformer(nn.Module):
    def __init__(self):
        super(Transformer, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
        self.fc = nn.Linear(d_model, len(target_vocab), bias=False)

    def forward(self, encoder_input, decoder_input):
        # 入 1*4,出 1*4*6,作用:"我吃肉E",并带上三词间的关注力信息
        encoder_output = self.encoder(encoder_input)
        # 入 1*4, 1*4, 1*4*6=encoder_output
        decoder_output = self.decoder(decoder_input, encoder_input, encoder_output)
        # 预测出4个词,每个词对应到词典中5个词的概率,如下
        # tensor([[[ 0.0755, -0.2646,  0.1279, -0.3735, -0.2351],[-1.2789,  0.6237, -0.6452,  1.1632,  0.6479]]]
        decoder_logits = self.fc(decoder_output)
        res = decoder_logits.view(-1, decoder_logits.size(-1))
        return res

2.3、训练模型

model = Transformer().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-1)

for epoch in range(10):
    # 输出4*5,代表预测出4个词,每个词对应到词典中5个词的概率
    output = model(encoder_input, decoder_input)  
    # 和目标词 I eat meat E做差异计算
    loss = criterion(output, target.view(-1))  
    print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss))
    # 这个3个操作:清零梯度、算法梯度、更新参数
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

2.4、使用模型

# 预测目标是5个单词
target_len = len(target_vocab)  
# 1*4*6 输入"我吃肉E",先算【自注意力】
encoder_output = model.encoder(encoder_input)  
# 1*5 全是0,表示EEEEE
decoder_input = torch.zeros(1, target_len).type_as(encoder_input.data)  
# 表示S开始字符
next_symbol = 4  

# 5个单词逐个预测【注意:是一个个追加词,不断往后预测的】
for i in range(target_len):  
    # 譬如i=0第一轮,decoder输入为SEEEE,第二轮为S I EEE,把预测 I 给拼上去,继续循环
    decoder_input[0][i] = next_symbol  
    # decoder 输出
    decoder_output = model.decoder(decoder_input, encoder_input, encoder_output)
    # 负责将解码器的输出映射到目标词汇表,每个元素表示对应目标词汇的分数
    # 取出最大的五个词的下标,譬如[1, 3, 3, 3, 3] 表示 i,meat,meat,meat,meat
    logits = model.fc(decoder_output).squeeze(0)
    prob = logits.max(dim=1, keepdim=False)[1]
    next_symbol = prob.data[i].item()  # 只取当前i

    for k, v in target_vocab.items():
        if v == next_symbol:
            print('第', i, '轮:', k)
            break

    if next_symbol == 0:  # 遇到结尾了,那就完成翻译
        break

参考资料:

1、https://jalammar.github.io/illustrated-transformer/

2、http://nlp.seas.harvard.edu/annotated-transformer/

3、gpt4:学习过程中有很多疑惑,真是一个好老师


作者 | 伯涵

来源 | 阿里云开发者公众号

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
相关文章
|
机器学习/深度学习 算法 搜索推荐
一文读懂FM算法优势,并用python实现!(附代码)
介绍 我仍然记得第一次遇到点击率预测问题时的情形,在那之前,我一直在学习数据科学,对自己取得的进展很满意,在机器学习黑客马拉松活动中也开始建立了自信,并决定好好迎接不同的挑战。 为了做得更好,我购买了一台内存16GB,i7处理器的机器,但是当我看到数据集的时候却感到非常不安,解压缩之后的数据大概有50GB - 我不知道基于这样的数据集要怎样进行点击率预测。
15172 0
|
2月前
|
机器学习/深度学习 自然语言处理 并行计算
一文快速读懂Transformer
Transformer模型近年来成为自然语言处理(NLP)领域的焦点,其强大的特征提取能力和并行计算优势在众多任务中取得显著效果。本文详细解读Transformer的原理,包括自注意力机制和编码器-解码器结构,并提供基于PyTorch的代码演示,展示了其在文本分类等任务中的应用。
|
6月前
|
机器学习/深度学习 自然语言处理
一文搞懂Transformer的位置编码
一文搞懂Transformer的位置编码
1453 2
|
6月前
|
机器学习/深度学习 自然语言处理 运维
大模型开发:解释自编码器以及它们在表示学习中的作用。
自编码器是一种神经网络,用于无监督学习中的数据降维和压缩,由编码器和解码器组成,学习低维稀疏表示。它们分为收缩、正则和变分类型,常用于图像重构、聚类、机器翻译等任务,能生成类似训练数据的新样本。自编码器在特征学习和多种任务中展现强大能力。
128 7
|
6月前
|
机器学习/深度学习 存储 人工智能
一文搞懂 Transformer 工作原理 !!
一文搞懂 Transformer 工作原理 !!
189 0
|
6月前
|
机器学习/深度学习 存储 算法
【轻量化:实操】动手实现神经网络中的裁枝操作(附演示代码&yolo系列)
【轻量化:实操】动手实现神经网络中的裁枝操作(附演示代码&yolo系列)
184 1
|
6月前
|
自然语言处理 Python
BERT模型基本理念、工作原理、配置讲解(图文解释)
BERT模型基本理念、工作原理、配置讲解(图文解释)
773 0
|
机器学习/深度学习 人工智能 算法
一文读懂强化学习:RL全面解析与Pytorch实战
一文读懂强化学习:RL全面解析与Pytorch实战
463 0
|
机器学习/深度学习 自然语言处理 计算机视觉
图解BERT:通俗的解释BERT是如何工作的
图解BERT:通俗的解释BERT是如何工作的
493 0
图解BERT:通俗的解释BERT是如何工作的
|
算法 计算机视觉
熟练掌握CV中最基础的概念:图像特征,看这篇万字的长文就够了(三)
熟练掌握CV中最基础的概念:图像特征,看这篇万字的长文就够了(三)
257 0
熟练掌握CV中最基础的概念:图像特征,看这篇万字的长文就够了(三)