扩散模型是目前大部分AIGC生图模型的基座,其本质是用神经网络学习从高斯噪声逐步恢复图像的过程,本文用python代码从零开始构建了一个简单的扩散模型。
理论部分
DDPM(Denoising Diffusion Probabilistic Models) 是一种在生成对抗网络等技术的基础上发展起来的新型概率模型去噪扩散模型,与其他生成模型(如归一化流、GANs或VAEs)相比并不是那么复杂,DDPM由两部分组成:
- 一个固定的前向传播的过程,它会逐渐将高斯噪声添加到图像中,直到最终得到纯噪声
- 一种可学习的反向去噪扩散过程,训练神经网络以从纯噪声开始逐渐对图像进行去噪
▐ 前向过程
前向扩散过程,其本质上是一个不断加噪声的过程。如下图所示,在猫的图片中多次增加高斯噪声直至图片变成随机噪音矩阵。可以看到,对于初始数据,我们设置K步的扩散步数,每一步增加一定的噪声,如果我们设置的K足够大,那么我们就能够将初始数据转化成随机噪音矩阵。
具体推理验证可参考:http://www.egbenz.com/#/my_article/12
▐ 训练过程
反向生成过程和前向扩散过程相反,是一个不断去噪的过程。神经网络从一个随机高斯噪声矩阵开始通过扩散模型的Inference过程不断预测并去除噪声。
实践部分
▐ 环境包
我们将首先安装并导入所需的库。
!pip install -q -U einops datasets matplotlib tqdm import math from inspect import isfunction from functools import partial %matplotlib inline import matplotlib.pyplot as plt from tqdm.auto import tqdm from einops import rearrange, reduce from einops.layers.torch import Rearrange import torch from torch import nn, einsum import torch.nn.functional as F
▐ 加噪声
下面是一些周期性的函数,这段代码定义了几种不同的函数,每个函数都用于计算深度学习中的beta调度(scheduling)。Beta调度主要用于控制噪声添加的程度,具体代码如下:
import torch # cosine_beta_schedule函数用于创建一个余弦退火beta调度。 # 这种调度方法基于余弦函数,并且可以调整随时间的衰减速率。 def cosine_beta_schedule(timesteps, s=0.008): steps = timesteps + 1 # 计算总的步数,需要比时间步多一个,以便计算alpha的累积乘积 x = torch.linspace(0, timesteps, steps) # 创建从0到timesteps的均匀分布的张量 # 计算alpha的累积乘积,使用一个余弦变换,并平方来计算当前步的alpha值 alphas_cumprod = torch.cos(((x / timesteps) + s) / (1 + s) * torch.pi * 0.5) ** 2 alphas_cumprod = alphas_cumprod / alphas_cumprod[0] # 归一化,确保初始值为1 betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1]) # 计算每个时间步的beta值 return torch.clip(betas, 0.0001, 0.9999) # 对beta值进行裁剪,避免过大或过小 # linear_beta_schedule函数用于创建一个线性退火beta调度。 # 这意味着beta值将从beta_start线性增加到beta_end。 def linear_beta_schedule(timesteps): beta_start = 0.0001 # 定义起始beta值 beta_end = 0.02 # 定义结束beta值 return torch.linspace(beta_start, beta_end, timesteps) # 创建一个线性分布的beta值数组 # quadratic_beta_schedule函数用于创建一个二次退火beta调度。 # 这意味着beta值将根据二次函数变化。 def quadratic_beta_schedule(timesteps): beta_start = 0.0001 # 定义起始beta值 beta_end = 0.02 # 定义结束beta值 # 创建一个线性分布的数组,然后将其平方以生成二次分布,最后再次平方以计算beta值 return torch.linspace(beta_start**0.5, beta_end**0.5, timesteps) ** 2 # sigmoid_beta_schedule函数用于创建一个sigmoid退火beta调度。 # 这意味着beta值将根据sigmoid函数变化,这是一种常见的激活函数。 def sigmoid_beta_schedule(timesteps): beta_start = 0.0001 # 定义起始beta值 beta_end = 0.02 # 定义结束beta值 betas = torch.linspace(-6, 6, timesteps) # 创建一个从-6到6的线性分布,用于sigmoid函数的输入 # 应用sigmoid函数,并根据beta_start和beta_end调整其范围和位置 return torch.sigmoid(betas) * (beta_end - beta_start) + beta_start
下面是噪声采样函数,其中extract 函数的作用是从预先计算的张量中提取适合当前时间步 t 的值。sqrt_alphas_cumprod 和 sqrt_one_minus_alphas_cumprod 应该是分别与时间关联的平方根累积乘积和其补数的平方根累积乘积,这两个张量中包含了不同时间步下噪声扩散的缩放系数。sqrt_alphas_cumprod_t * x_start 计算了经过时间步 t 缩放的原始数据,而 sqrt_one_minus_alphas_cumprod_t * noise 计算了同样经过时间步 t 缩放的噪声。两者相加得到的是在时间步 t 时刻的扩散数据。在扩散模型中,通过反向扩散过程(生成过程)来学习这种加噪声的逆过程,从而可以生成新的数据样本。
# import torch # 假设在代码的其他部分已经导入了torch库 # 定义前向扩散函数 # x_start: 初始数据,例如一批图像 # t: 扩散的时间步,表示当前的扩散阶段 # noise: 可选参数,如果提供,则使用该噪声数据;否则,将生成新的随机噪声 def q_sample(x_start, t, noise=None): if noise is None: noise = torch.randn_like(x_start) # 如果未提供噪声,则生成一个与x_start形状相同的随机噪声张量 # 提取对应于时间步t的α的累积乘积的平方根 sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape) # 提取对应于时间步t的1-α的累积乘积的平方根 sqrt_one_minus_alphas_cumprod_t = extract( sqrt_one_minus_alphas_cumprod, t, x_start.shape ) # 返回前向扩散的结果,该结果是初始数据和噪声的线性组合 # 系数sqrt_alphas_cumprod_t和sqrt_one_minus_alphas_cumprod_t分别用于缩放初始数据和噪声 return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise
测试如下:
# take time step for noise in [10,20,40,80 100]: t = torch.tensor([40]) get_noisy_image(x_start, t)
▐ 核心残差网络
下面是残差网络的实现代码,Block 类是一个包含卷积、归一化、激活函数的标准神经网络层。ResnetBlock 类构建了一个残差块(residual block),这是深度残差网络(ResNet)的关键特性,它通过学习输入和输出的差异来提高网络性能。在 ResnetBlock 中,可选的 time_emb 参数和内部的 mlp 允许该Block处理与时间相关的特征。
import torch.nn as nn from einops import rearrange # 假设已经导入了einops库中的rearrange函数 from torch_utils import exists # 假设已经定义了exists函数,用于检查对象是否存在 # 定义一个基础的Block类,该类将作为神经网络中的一个基本构建模块 class Block(nn.Module): def __init__(self, dim, dim_out, groups=8): super().__init__() # 一个2D卷积层,卷积核大小为3x3,边缘填充为1,从输入维度dim到输出维度dim_out self.proj = nn.Conv2d(dim, dim_out, 3, padding=1) # GroupNorm层用于归一化,分组数为groups self.norm = nn.GroupNorm(groups, dim_out) # 使用SiLU(也称为Swish)作为激活函数 self.act = nn.SiLU() def forward(self, x, scale_shift=None): x = self.proj(x) # 应用卷积操作 x = self.norm(x) # 应用归一化操作 # 如果scale_shift参数存在,则对归一化后的数据进行缩放和位移操作 if exists(scale_shift): scale, shift = scale_shift x = x * (scale + 1) + shift x = self.act(x) # 应用激活函数 return x # 返回处理后的数据 # 定义一个ResnetBlock类,用于构建残差网络中的基本块 class ResnetBlock(nn.Module): """https://arxiv.org/abs/1512.03385""" def __init__(self, dim, dim_out, *, time_emb_dim=None, groups=8): super().__init__() # 如果time_emb_dim存在,定义一个小型的多层感知器(MLP)网络 self.mlp = ( nn.Sequential(nn.SiLU(), nn.Linear(time_emb_dim, dim_out)) if exists(time_emb_dim) else None ) # 定义两个顺序的基础Block模块 self.block1 = Block(dim, dim_out, groups=groups) self.block2 = Block(dim_out, dim_out, groups=groups) # 如果输入维度dim和输出维度dim_out不同,则使用1x1卷积进行维度调整 # 否则使用Identity层(相当于不做任何处理) self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity() def forward(self, x, time_emb=None): h = self.block1(x) # 通过第一个Block模块 # 如果存在时间嵌入向量time_emb且存在mlp模块,则将其应用到h上 if exists(self.mlp) and exists(time_emb): time_emb = self.mlp(time_emb) # 通过MLP网络 # 重整time_emb的形状以匹配h的形状,并将结果加到h上 h = rearrange(time_emb, "b c -> b c 1 1") + h h = self.block2(h) # 通过第二个Block模块 return h + self.res_conv(x) # 将Block模块的输出与调整维度后的原始输入x相加并返回
▐ 注意力机制
DDPM的作者把大名鼎鼎的注意力机制加在卷积层之间。注意力机制是Transformer架构的基础模块(参考:Vaswani et al., 2017),Transformer在AI各个领域,NLP,CV等等都取得了巨大的成功,这里Phil Wang实现了两个变种版本,一个是普通的多头注意力(用在了transformer中),另一种是线性注意力机制(参考:Shen et al.,2018),和普通的注意力在时间和存储的二次的增长相比,这个版本是线性增长的。
SelfAttention可以将输入图像的不同部分(像素或图像Patch)进行交互,从而实现特征的整合和全局上下文的引入,能够让模型建立捕捉图像全局关系的能力,有助于模型理解不同位置的像素之间的依赖关系,以更好地理解图像的语义。
在此基础上,SelfAttention还能减少平移不变性问题,SelfAttention模块可以在不考虑位置的情况下捕捉特征之间的关系,因此具有一定的平移不变性。
参考:Vaswani et al., 2017 地址:https://arxiv.org/abs/1706.03762
参考:Shen et al.,2018 地址:https://arxiv.org/abs/1812.01243
import torch from torch import nn from einops import rearrange import torch.nn.functional as F # 定义一个标准的多头注意力(Multi-Head Attention)机制的类 class Attention(nn.Module): def __init__(self, dim, heads=4, dim_head=32): super().__init__() # 根据维度的倒数平方根来缩放查询(Query)向量 self.scale = dim_head ** -0.5 # 头的数量(多头中的"多") self.heads = heads # 计算用于多头注意力的隐藏层维度 hidden_dim = dim_head * heads # 定义一个卷积层将输入的特征映射到QKV(查询、键、值)空间 self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False) # 定义一个卷积层将多头注意力的输出映射回原特征空间 self.to_out = nn.Conv2d(hidden_dim, dim, 1) def forward(self, x): # 获取输入的批量大小、通道数、高度和宽度 b, c, h, w = x.shape # 使用to_qkv卷积层得到QKV,并将其分离为三个组件 qkv = self.to_qkv(x).chunk(3, dim=1) # 将QKV重排并缩放查询向量 q, k, v = map( lambda t: rearrange(t, "b (h c) x y -> b h c (x y)", h=self.heads), qkv ) q = q * self.scale # 使用爱因斯坦求和约定计算查询和键之间的相似度得分 sim = einsum("b h d i, b h d j -> b h i j", q, k) # 从相似度得分中减去最大值以提高数值稳定性 sim = sim - sim.amax(dim=-1, keepdim=True).detach() # 应用Softmax函数获取注意力权重 attn = sim.softmax(dim=-1) # 使用注意力权重对值进行加权 out = einsum("b h i j, b h d j -> b h i d", attn, v) # 将输出重新排列回原始的空间形状 out = rearrange(out, "b h (x y) d -> b (h d) x y", x=h, y=w) # 返回通过输出卷积层的结果 return self.to_out(out) # 定义一个线性注意力(Linear Attention)机制的类 class LinearAttention(nn.Module): def __init__(self, dim, heads=4, dim_head=32): super().__init__() # 根据维度的倒数平方根来缩放查询(Query)向量 self.scale = dim_head ** -0.5 # 头的数量 self.heads = heads # 计算用于多头注意力的隐藏层维度 hidden_dim = dim_head * heads # 定义一个卷积层将输入的特征映射到QKV空间 self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False) # 定义一个顺序容器包含卷积层和组归一化层将输出映射回原特征空间 self.to_out = nn.Sequential(nn.Conv2d(hidden_dim, dim, 1), nn.GroupNorm(1, dim)) def forward(self, x): # 获取输入的批量大小、通道数、高度和宽度 b, c, h, w = x.shape # 使用to_qkv卷积层得到QKV,并将其分离为三个组件 qkv = self.to_qkv(x).chunk(3, dim=1) # 将QKV重排,应用Softmax函数并缩放查询向量 q, k, v = map( lambda t: rearrange(t, "b (h c) x y -> b h c (x y)", h=self.heads), qkv ) q = q.softmax(dim=-2) k = k.softmax(dim=-1) q = q * self.scale # 计算上下文矩阵,是键和值的加权组合 context = torch.einsum("b h d n, b h e n -> b h d e", k, v) # 使用上下文矩阵和查询计算最终的注意力输出 out = torch.einsum("b h d e, b h d n -> b h e n", context, q) # 将输出重新排列回原始的空间形状 out = rearrange(out, "b h c (x y) -> b (h c) x y", h=self.heads, x=h, y=w) # 返回经过输出顺序容器处理的结果 return self.to_out(out)
手动实现一个扩散模型DDPM(下):https://developer.aliyun.com/article/1480703