Transformer 一起动手编码学原理

本文涉及的产品
模型训练 PAI-DLC,100CU*H 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
交互式建模 PAI-DSW,每月250计算时 3个月
简介: 学习Transformer,快来跟着作者动手写一个。

作为工程同学,学习Transformer中,不动手写一个,总感觉理解不扎实。纸上得来终觉浅,绝知此事要躬行,抽空多debug几遍!

注:不涉及算法解释,仅是从工程代码实现去加强理解。

一、准备知识


以预测二手房价格的模型,先来理解机器学习中的核心概念。

1.1、手撸版

1、准备训练数据

# 样本数
num_examples = 1000
# 特征数:房屋大小、新旧
num_input = 2
# 线性回归模型,2个W参数值:2,-3.4
true_w = [2, -3.4]
# 1个b参数值
true_b = 4.2

# 随机生成样本:特征值
features = torch.randn(num_examples, num_input, dtype=torch.float32)
# print(features[0]) 

# 随机生成样本:y值
labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b
# print(labels[0])
# 随机生成一批数据,给y值再随机化下
_temp = torch.tensor(np.random.normal(0, 0.01, size=labels.size()), dtype=torch.float32)
labels = labels + _temp
# print(labels[0])

2、定义模型

# 模型参数 w:w1、w2,b:b,设了初始值
w = torch.tensor(np.random.normal(0, 0.01, (num_input, 1)), dtype=torch.float32)
b = torch.zeros(1, dtype=torch.float32)
# 设置要求计算梯度
w.requires_grad_(requires_grad=True)
b.requires_grad_(requires_grad=True)

# 线性回归模型:定义 torch.mm(X, w) + b,mm是矩阵乘法(m:multiply缩写)
def linreg(X, w, b):
    return torch.mm(X, w) + b

# 定义损失函数:y_pred=预测值、y=真实值,最简单的 平方误差
def squared_loss(y_pred, y):
    return (y_pred - y.view(y_pred.size())) ** 2 / 2

# 定义梯度下降算法:sgd
# params是参数(data 是参数值、grad 是梯度值),lr是学习率,batch是数量
# 为什么要除batch?因为这个梯度值,是在batch个数据上累加算的
def sgd(params, lr, batch):
    for param in params:
        # 注意这里更改param时用的param.data
        param.data -= lr * param.grad / batch

3、训练模型

# 初始学习率
lr = 0.03
# 训练轮次
epoch = 5
# 网络:一层,2个输入(2个参数 w1、w2),1个b参数
net = linreg
# 损失函数
loss = squared_loss
# 1个批次大小
batch_size = 10

# 训练模型一共需要epoch个迭代周期
for epoch in range(epoch):
    # 在每一个迭代周期中,所有样本都要跑,从 0~1000,每个batch=10
    for X, y in data_iter(batch_size, features, labels):
        # ls是这个batch的损失求和
        ls = loss(net(X, w, b), y).sum()
        # 求梯度值
        ls.backward()
        # 更新参数
        sgd([w, b], lr, batch_size)
        # 不要忘了梯度清零,下一次要用
        w.grad.data.zero_()
        b.grad.data.zero_()

    # 在计算这轮训练完后,打印 loss,可以观察在不断变小
    train_l = loss(net(features, w, b), labels)
    print('epoch %d, loss %f' % (epoch + 1, train_l.mean().item()))
辅助函数:by batch & 随机,读取样本数据
# 定义随机读取数据 by batch
def data_iter(batch, features, labels):
    nums = len(features)
    # 生成 0-1000 数组
    indices = list(range(nums))
    # 打乱下标的读取顺序
    random.shuffle(indices)
    # 生成:start=0,stop=1000,step=batch=10
    for i in range(0, nums, batch):
        # 截取一段下标,size=batch
        t_ind = indices[i: min(i + batch, num_examples)]  # 最后一次可能不足一个batch
        # 转换为torch要求格式:tensor
        j = torch.LongTensor(t_ind)
        # 按照下标从向量中找
        # yield:生成一个迭代器返回,有点类似闭包,每调用一次返回一次,且下次调用时会从上次暂停的位置继续
        yield features.index_select(0, j), labels.index_select(0, j)

1.2、pytorch版


1、准备训练数据

同上

2、定义模型

# 定义模型:线性回归
class LinearNet(nn.Module):
    def __init__(self, n_feature):
        super(LinearNet, self).__init__()
        # 线性规划:n_feature=几个w,1=1个b
        self.linear = nn.Linear(n_feature, 1)

    # forward 定义【前向传播】
    def forward(self, x):
        y = self.linear(x)
        return y

# 实例化模型,初始化参数值
net = LinearNet(num_input)
init.normal_(net[0].weight, mean=0, std=0.01)
init.constant_(net[0].bias, val=0)

# 定义损失函数
loss = nn.MSELoss()

# 定义梯度下降算法(lr是学习率)
optimizer = optim.SGD(net.parameters(), lr=0.03)

3、训练模型

num_epochs = 3
for epoch in range(1, num_epochs + 1):
    for X, y in data_iter:
        output = net(X)
        # 算损失值
        l_sum = loss(output, y.view(-1, 1))
        # 更新梯度
        l_sum.backward()
        # 更新w和b
        optimizer.step()
        # 梯度清零
        optimizer.zero_grad()
    print('epoch %d, loss: %f' % (epoch, l_sum.item()))

batch 读取数据

# 读取数据 by batch & 随机
batch_size = 10
data_set = Data.TensorDataset(features, labels)
data_iter = Data.DataLoader(data_set, batch_size, shuffle=True)

在用torch框架,进一步要理解:

a、nn.Module 和 forward() 函数:表示神经网络中的一个层,覆写 init 和 forward 方法(提问:实现2层网络怎么做?)

b、debug 观察下数据变化,和 手撸版 对比着去理解


二、手写Transformer

image.png

上面这是一个"逻辑"示意图。印象中,第1次看时,以为 "inputs & outputs" 是并行计算、但上面又有依赖,很糊涂。

这不是一个技术架构图,上图有很多概念,attention、multi-head等。先尝试转换为面向对象的类图,如下:

image.png

2.1、例子:翻译

# 定义词典
source_vocab = {'E': 0, '我': 1, '吃': 2, '肉': 3}
target_vocab = {'E': 0, 'I': 1, 'eat': 2, 'meat': 3, 'S': 4}

# 样本数据
encoder_input = torch.LongTensor([[1, 2, 3, 0]]).to(device)  # 我 吃 肉 E, E代表结束词
decoder_input = torch.LongTensor([[4, 1, 2, 3]]).to(device)  # S I eat meat, S代表开始词, 并右移一位,用于并行训练
target = torch.LongTensor([[1, 2, 3, 0]]).to(device)  # I eat meat E, 翻译目标

2.2、定义模型


按照上面类图,我们来一点点实现

1、Attention

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, attn_mask):
        # Q、K、V,此时是已经乘过 W(q)、W(k)、W(v) 矩阵
        # 如下图,但不用一个个算,矩阵乘法一次搞定
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)
        # 遮盖区的值设为近0,表示E结尾 or decoder 自我顺序遮盖,注意力丢弃
        scores.masked_fill_(attn_mask, -1e9)
        # softmax后(遮盖区变为0)
        attn = nn.Softmax(dim=-1)(scores)
        # 乘积意义:给V带上了注意力信息。prob就是下图z(矩阵计算不用在v1+v2)。
        prob = torch.matmul(attn, V)
        return prob

image.png

2、MultiHeadAttention

通用变量定义

d_model = 6  # embedding size
# d_model = 3  # embedding size
d_ff = 12  # feedforward nerual network  dimension
d_k = d_v = 3  # dimension of k(same as q) and v
n_heads = 2  # number of heads in multihead attention
# n_heads = 1  # number of heads in multihead attention【注:为debug更简单,可以先改为1个head】
p_drop = 0.1  # propability of dropout
device = "cpu"

注1:按惯性会想,会有多个head、串行循环计算,不是,多个head是一个张量输入

注2:FF 全连接、残差连接、归一化,35、38 行业代码,pytorch框架带来的简化

class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        self.n_heads = n_heads
        self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False)
        self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
        self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
        self.fc = nn.Linear(d_v * n_heads, d_model, bias=False)  # ff 全连接
        self.layer_norm = nn.LayerNorm(d_model)  # normal 归一化

    def forward(self, input_Q, input_K, input_V, attn_mask):
        # input_Q:1*4*6,每批1句 * 每句4个词 * 每词6长度编码
        # residual 先临时保存下:原始值,后面做残差连接加法
        residual, batch = input_Q, input_Q.size(0)

        # 乘上 W 矩阵。注:W 就是要训练的参数
        # 注意:维度从2维变成3维,增加 head 维度,也是一次性并行计算
        Q = self.W_Q(input_Q)  # 乘以 W(6*6) 变为 1*4*6
        Q = Q.view(batch, -1, n_heads, d_k).transpose(1, 2)  # 切开为2个Head 变为 1*2*4*3 1批 2个Head 4词 3编码
        K = self.W_K(input_K).view(batch, -1, n_heads, d_k).transpose(1, 2)
        V = self.W_V(input_V).view(batch, -1, n_heads, d_v).transpose(1, 2)

        # 1*2*4*4,2个Head的4*4,最后一列为true
        # 因为最后一列是 E 结束符
        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)

        # 返回1*2*4*3,2个头,4*3为带上关注关系的4词
        prob = ScaledDotProductAttention()(Q, K, V, attn_mask)

        # 把2头重新拼接起来,变为 1*4*6
        prob = prob.transpose(1, 2).contiguous()
        prob = prob.view(batch, -1, n_heads * d_v).contiguous()

        # 全连接层:对多头注意力的输出进行线性变换,从而更好地提取信息
        output = self.fc(prob)

        # 残差连接 & 归一化
        res = self.layer_norm(residual + output) # return 1*4*6
        return res

3、Encoder

在 attention 概念中,有很关键的 "遮盖" 概念,先不细究,你debug一遍会更理解

def get_attn_pad_mask(seq_q, seq_k):  # 本质是结尾E做注意力遮盖,返回 1*4*4,最后一列为True
    batch, len_q = seq_q.size()  # 1, 4
    batch, len_k = seq_k.size()  # 1, 4
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)  # 为0则为true,变为f,f,f,true,意思是把0这个结尾标志为true
    return pad_attn_mask.expand(batch, len_q, len_k)  # 扩展为1*4*4,最后一列为true,表示抹掉结尾对应的注意力


def get_attn_subsequent_mask(seq):  # decoder的自我顺序注意力遮盖,右上三角形区为true的遮盖
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
    subsequent_mask = np.triu(np.ones(attn_shape), k=1)
    subsequent_mask = torch.from_numpy(subsequent_mask)
    return subsequent_mask
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.source_embedding = nn.Embedding(len(source_vocab), d_model)
        self.attention = MultiHeadAttention()

    def forward(self, encoder_input):
        # input 1 * 4,1句话4个单词
        # 1 * 4 * 6,将每个单词的整数字编码扩展到6个浮点数编码
        embedded = self.source_embedding(encoder_input)
        # 1 * 4 * 4 矩阵,最后一列为true,表示忽略结尾词的注意力机制
        mask = get_attn_pad_mask(encoder_input, encoder_input)
        # 1*4*6,带上关注力的4个词矩阵
        encoder_output = self.attention(embedded, embedded, embedded, mask)
        return encoder_output

4、Decoder

class Decoder(nn.Module):

    def __init__(self):
        super(Decoder, self).__init__()
        self.target_embedding = nn.Embedding(len(target_vocab), d_model)
        self.attention = MultiHeadAttention()

    # 三入参形状分别为 1*4, 1*4, 1*4*6,前两者未被embedding,注意后面这个是 encoder_output
    def forward(self, decoder_input, encoder_input, encoder_output):
        # 编码为1*4*6
        decoder_embedded = self.target_embedding(decoder_input)

        # 1*4*4 全为false,表示没有结尾词
        decoder_self_attn_mask = get_attn_pad_mask(decoder_input, decoder_input)
        # 1*4*4 右上三角区为1,其余为0
        decoder_subsequent_mask = get_attn_subsequent_mask(decoder_input)
        # 1*4*4 右上三角区为true,其余为false
        decoder_self_mask = torch.gt(decoder_self_attn_mask + decoder_subsequent_mask, 0)

        # 1*4*6 带上注意力的4词矩阵【注:decoder里面,第1个attention】
        decoder_output = self.attention(decoder_embedded, decoder_embedded, decoder_embedded, decoder_self_mask)

        # 1*4*4 最后一列为true,表示E结尾词
        decoder_encoder_attn_mask = get_attn_pad_mask(decoder_input, encoder_input)
        # 输入均为 1*4*6,Q表示"S I eat meat"、K表示"我吃肉E"、V表示 "我吃肉E"
        #【注:decoder里面,第2个attention】
        decoder_output = self.attention(decoder_output, encoder_output, encoder_output, decoder_encoder_attn_mask)
        return decoder_output

5、Transformer

class Transformer(nn.Module):
    def __init__(self):
        super(Transformer, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
        self.fc = nn.Linear(d_model, len(target_vocab), bias=False)

    def forward(self, encoder_input, decoder_input):
        # 入 1*4,出 1*4*6,作用:"我吃肉E",并带上三词间的关注力信息
        encoder_output = self.encoder(encoder_input)
        # 入 1*4, 1*4, 1*4*6=encoder_output
        decoder_output = self.decoder(decoder_input, encoder_input, encoder_output)
        # 预测出4个词,每个词对应到词典中5个词的概率,如下
        # tensor([[[ 0.0755, -0.2646,  0.1279, -0.3735, -0.2351],[-1.2789,  0.6237, -0.6452,  1.1632,  0.6479]]]
        decoder_logits = self.fc(decoder_output)
        res = decoder_logits.view(-1, decoder_logits.size(-1))
        return res

2.3、训练模型

model = Transformer().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-1)

for epoch in range(10):
    # 输出4*5,代表预测出4个词,每个词对应到词典中5个词的概率
    output = model(encoder_input, decoder_input)  
    # 和目标词 I eat meat E做差异计算
    loss = criterion(output, target.view(-1))  
    print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss))
    # 这个3个操作:清零梯度、算法梯度、更新参数
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

2.4、使用模型

# 预测目标是5个单词
target_len = len(target_vocab)  
# 1*4*6 输入"我吃肉E",先算【自注意力】
encoder_output = model.encoder(encoder_input)  
# 1*5 全是0,表示EEEEE
decoder_input = torch.zeros(1, target_len).type_as(encoder_input.data)  
# 表示S开始字符
next_symbol = 4  

# 5个单词逐个预测【注意:是一个个追加词,不断往后预测的】
for i in range(target_len):  
    # 譬如i=0第一轮,decoder输入为SEEEE,第二轮为S I EEE,把预测 I 给拼上去,继续循环
    decoder_input[0][i] = next_symbol  
    # decoder 输出
    decoder_output = model.decoder(decoder_input, encoder_input, encoder_output)
    # 负责将解码器的输出映射到目标词汇表,每个元素表示对应目标词汇的分数
    # 取出最大的五个词的下标,譬如[1, 3, 3, 3, 3] 表示 i,meat,meat,meat,meat
    logits = model.fc(decoder_output).squeeze(0)
    prob = logits.max(dim=1, keepdim=False)[1]
    next_symbol = prob.data[i].item()  # 只取当前i

    for k, v in target_vocab.items():
        if v == next_symbol:
            print('第', i, '轮:', k)
            break

    if next_symbol == 0:  # 遇到结尾了,那就完成翻译
        break

参考资料:

1、https://jalammar.github.io/illustrated-transformer/

2、http://nlp.seas.harvard.edu/annotated-transformer/

3、gpt4:学习过程中有很多疑惑,真是一个好老师


作者 | 伯涵

来源 | 阿里云开发者公众号

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
相关文章
|
2月前
|
机器学习/深度学习 PyTorch 算法框架/工具
聊一聊计算机视觉中常用的注意力机制以及Pytorch代码实现
本文介绍了几种常用的计算机视觉注意力机制及其PyTorch实现,包括SENet、CBAM、BAM、ECA-Net、SA-Net、Polarized Self-Attention、Spatial Group-wise Enhance和Coordinate Attention等,每种方法都附有详细的网络结构说明和实验结果分析。通过这些注意力机制的应用,可以有效提升模型在目标检测任务上的性能。此外,作者还提供了实验数据集的基本情况及baseline模型的选择与实验结果,方便读者理解和复现。
67 0
聊一聊计算机视觉中常用的注意力机制以及Pytorch代码实现
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
Transformer图解以及相关的概念解析
前言 transformer是目前NLP甚至是整个深度学习领域不能不提到的框架,同时大部分LLM也是使用其进行训练生成模型,所以transformer几乎是目前每一个机器人开发者或者人工智能开发者不能越过的一个框架。接下来本文将从顶层往下去一步步掀开transformer的面纱。 transformer概述 Transformer模型来自论文Attention Is All You Need。 在论文中最初是为了提高机器翻译的效率,它使用了Self-Attention机制和Position Encoding去替代RNN。后来大家发现Self-Attention的效果很好,并且在其它的地
|
2月前
|
机器学习/深度学习 存储 自然语言处理
深度学习入门:循环神经网络------RNN概述,词嵌入层,循环网络层及案例实践!(万字详解!)
深度学习入门:循环神经网络------RNN概述,词嵌入层,循环网络层及案例实践!(万字详解!)
|
7月前
|
机器学习/深度学习 自然语言处理
一文搞懂Transformer的位置编码
一文搞懂Transformer的位置编码
1786 2
|
7月前
|
机器学习/深度学习 自然语言处理 运维
大模型开发:解释自编码器以及它们在表示学习中的作用。
自编码器是一种神经网络,用于无监督学习中的数据降维和压缩,由编码器和解码器组成,学习低维稀疏表示。它们分为收缩、正则和变分类型,常用于图像重构、聚类、机器翻译等任务,能生成类似训练数据的新样本。自编码器在特征学习和多种任务中展现强大能力。
145 7
|
7月前
|
机器学习/深度学习 存储 人工智能
一文搞懂 Transformer 工作原理 !!
一文搞懂 Transformer 工作原理 !!
200 0
|
7月前
|
机器学习/深度学习 存储 算法
【轻量化:实操】动手实现神经网络中的裁枝操作(附演示代码&yolo系列)
【轻量化:实操】动手实现神经网络中的裁枝操作(附演示代码&yolo系列)
195 1
|
机器学习/深度学习 移动开发 人工智能
自编码器26页综述论文:概念、图解和应用
自编码器26页综述论文:概念、图解和应用
134 0
|
机器学习/深度学习 数据挖掘 PyTorch
# 【深度学习】:《PyTorch入门到项目实战》(十一):卷积层
>之前已经介绍了基本的神经网络知识以及一些处理过拟合欠拟合的概念。现在我们正式进入卷积神经网络的学习。CNN是⼀类强⼤的、为处理图像数据⽽设计的神经⽹络。基于卷积神经⽹络架构的模型在计算机视觉领域中已经占主导地位,当今⼏乎所有的图像识别、⽬标检测或语义分割相关的学术竞赛和商业应⽤都以这种⽅法为基础。对于计算机视觉而言,面临的一个重大挑战就是数据的输入可能会很大。例如,我们有一张64$\times$ 64的图片,假设通道数为3,那么它是数据量相当于是一个$64\times 64\times 3=12288$的特征向量。当我们要操作更大的图片时候,需要进行卷积计算,它是卷积神经网络中非常重要的一部
 # 【深度学习】:《PyTorch入门到项目实战》(十一):卷积层
|
机器学习/深度学习 自然语言处理
小白总结Transformer模型要点(一)(下)
本文主要总结了Transformer模型的要点,包含模型架构各部分组成和原理、常见问题汇总、模型具体实现和相关拓展学习。
小白总结Transformer模型要点(一)(下)
下一篇
DataWorks