自编码器
自动编码器是一种无监督的深度学习算法,它学习输入数据的编码表示,然后重新构造与输出相同的输入。它由编码器和解码器两个网络组成。编码器将高维输入压缩成低维潜在代码(也称为潜在代码或编码空间) ,以从中提取最相关的信息,而解码器则解压缩编码数据并重新创建原始输入。
这种架构的目标是在编码时最大化信息并最小化重构误差。但是重构误差是什么?它的名字也是重构损失,通常是输入为实值时重构输入与原始输入之间的均方误差。如果输入数据是分类数据,则使用的损失函数是交叉熵损失。
在 Pytorch 中的实现
1. 导入库和 MNIST 数据集
我们可以使用库 torchvision 导入数据集。我们下载训练和测试数据集,并将图像数据集转换为 Tensor。我们不需要对图像进行标准化,因为数据集包含彩色图像。在我们将训练数据集划分为训练集和验证集之后,random_split 为这两个集提供了一个随机分区。DataLoader 用于为训练集、验证集和测试集创建数据加载器,这些数据加载器被分成小批量。 batchsize 是模型训练期间一次迭代中使用的样本数。
import matplotlib.pyplot as plt # plotting library import numpy as np # this module is useful to work with numerical arrays import pandas as pd import random import torch import torchvision from torchvision import transforms from torch.utils.data import DataLoader,random_split from torch import nn import torch.nn.functional as F import torch.optim as optim data_dir = 'dataset' train_dataset = torchvision.datasets.MNIST(data_dir, train=True, download=True) test_dataset = torchvision.datasets.MNIST(data_dir, train=False, download=True) train_transform = transforms.Compose([ transforms.ToTensor(), ]) test_transform = transforms.Compose([ transforms.ToTensor(), ]) train_dataset.transform = train_transform test_dataset.transform = test_transform m=len(train_dataset) train_data, val_data = random_split(train_dataset, [int(m-m*0.2), int(m*0.2)]) batch_size=256 train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size) valid_loader = torch.utils.data.DataLoader(val_data, batch_size=batch_size) test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size,shuffle=True)
2. 定义卷积自编码器
在这里,我们定义了卷积层的自动编码器。它将由两个类组成: 一个用于编码器,另一个用于解码器。该编码器将包含三个卷积层和两个完全连接层。增加了一些批量规范层作为规范层。解码器将具有相同的架构,但顺序是相反的。
class Encoder(nn.Module): def __init__(self, encoded_space_dim,fc2_input_dim): super().__init__() ### Convolutional section self.encoder_cnn = nn.Sequential( nn.Conv2d(1, 8, 3, stride=2, padding=1), nn.ReLU(True), nn.Conv2d(8, 16, 3, stride=2, padding=1), nn.BatchNorm2d(16), nn.ReLU(True), nn.Conv2d(16, 32, 3, stride=2, padding=0), nn.ReLU(True) ) ### Flatten layer self.flatten = nn.Flatten(start_dim=1) ### Linear section self.encoder_lin = nn.Sequential( nn.Linear(3 * 3 * 32, 128), nn.ReLU(True), nn.Linear(128, encoded_space_dim) ) def forward(self, x): x = self.encoder_cnn(x) x = self.flatten(x) x = self.encoder_lin(x) return x class Decoder(nn.Module): def __init__(self, encoded_space_dim,fc2_input_dim): super().__init__() self.decoder_lin = nn.Sequential( nn.Linear(encoded_space_dim, 128), nn.ReLU(True), nn.Linear(128, 3 * 3 * 32), nn.ReLU(True) ) self.unflatten = nn.Unflatten(dim=1, unflattened_size=(32, 3, 3)) self.decoder_conv = nn.Sequential( nn.ConvTranspose2d(32, 16, 3, stride=2, output_padding=0), nn.BatchNorm2d(16), nn.ReLU(True), nn.ConvTranspose2d(16, 8, 3, stride=2, padding=1, output_padding=1), nn.BatchNorm2d(8), nn.ReLU(True), nn.ConvTranspose2d(8, 1, 3, stride=2, padding=1, output_padding=1) ) def forward(self, x): x = self.decoder_lin(x) x = self.unflatten(x) x = self.decoder_conv(x) x = torch.sigmoid(x) return x
3.初始化Loss函数和优化器
我们需要在训练自动编码器之前定义构建块:
· torch.device 使用 GPU 等硬件加速器训练模型
· 将移动到设备的 Encoder 和 Decoder 网络
· nn.MSEloss 和 torch.optim.Adam
### Define the loss function loss_fn = torch.nn.MSELoss() ### Define an optimizer (both for the encoder and the decoder!) lr= 0.001 ### Set the random seed for reproducible results torch.manual_seed(0) ### Initialize the two networks d = 4 #model = Autoencoder(encoded_space_dim=encoded_space_dim) encoder = Encoder(encoded_space_dim=d,fc2_input_dim=128) decoder = Decoder(encoded_space_dim=d,fc2_input_dim=128) params_to_optimize = [ {'params': encoder.parameters()}, {'params': decoder.parameters()} ] optim = torch.optim.Adam(params_to_optimize, lr=lr, weight_decay=1e-05) # Check if the GPU is available device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") print(f'Selected device: {device}') # Move both the encoder and the decoder to the selected device encoder.to(device) decoder.to(device)
4. 训练和评估模型
我们定义了一个函数来训练 AE 模型。首先,我们将输入图像传递给编码器。稍后,将编码数据传递给编码器,然后我们使用 loss_fn(x_hat,x) 计算重建损失。在清除梯度以不累积其他值后,我们执行反向传播,最后通过调用 opt.step() 计算梯度。
### Training function def train_epoch_den(encoder, decoder, device, dataloader, loss_fn, optimizer,noise_factor=0.3): # Set train mode for both the encoder and the decoder encoder.train() decoder.train() train_loss = [] # Iterate the dataloader (we do not need the label values, this is unsupervised learning) for image_batch, _ in dataloader: # with "_" we just ignore the labels (the second element of the dataloader tuple) # Move tensor to the proper device image_noisy = add_noise(image_batch,noise_factor) image_noisy = image_noisy.to(device) # Encode data encoded_data = encoder(image_noisy) # Decode data decoded_data = decoder(encoded_data) # Evaluate loss loss = loss_fn(decoded_data, image_noisy) # Backward pass optimizer.zero_grad() loss.backward() optimizer.step() # Print batch loss print('\t partial train loss (single batch): %f' % (loss.data)) train_loss.append(loss.detach().cpu().numpy()) return np.mean(train_loss)
创建训练函数后,我们定义一个函数来评估模型的性能。和以前一样,我们将图像传递给编码器。编码后的图像被传递给解码器。然后,我们将所有图像批次和重建存储到两个不同的列表中,这将用于计算测试损失。
### Testing function def test_epoch(encoder, decoder, device, dataloader, loss_fn): # Set evaluation mode for encoder and decoder encoder.eval() decoder.eval() with torch.no_grad(): # No need to track the gradients # Define the lists to store the outputs for each batch conc_out = [] conc_label = [] for image_batch, _ in dataloader: # Move tensor to the proper device image_batch = image_batch.to(device) # Encode data encoded_data = encoder(image_batch) # Decode data decoded_data = decoder(encoded_data) conc_out.append(decoded_data.cpu()) conc_label.append(image_batch.cpu()) # Create a single tensor with all the values in the lists conc_out = torch.cat(conc_out) conc_label = torch.cat(conc_label) # Evaluate global loss val_loss = loss_fn(conc_out, conc_label) return val_loss.data
我们还希望在训练的每个时期看到重建的图像。目标是了解自动编码器如何从输入图像中学习。
def plot_ae_outputs(encoder,decoder,n=5): plt.figure(figsize=(10,4.5)) for i in range(n): ax = plt.subplot(2,n,i+1) img = test_dataset[i][0].unsqueeze(0).to(device) encoder.eval() decoder.eval() with torch.no_grad(): rec_img = decoder(encoder(img)) plt.imshow(img.cpu().squeeze().numpy(), cmap='gist_gray') ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) if i == n//2: ax.set_title('Original images') ax = plt.subplot(2, n, i + 1 + n) plt.imshow(rec_img.cpu().squeeze().numpy(), cmap='gist_gray') ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) if i == n//2: ax.set_title('Reconstructed images') plt.show()
让我们将测试代码分解成小块:
· test_dataset[i][0].unsqueeze(0) 用于从测试数据集中提取第 i 个图像,然后 在0 轴上增加 1 维。需要此步骤将图像传递给自动编码器。
· decoder(encoder(img))用于获取重建图像
· plt.imshow(img.cpu().squeeze().numpy()) 用于绘制原始图像。squeeze()删除之前添加的维度,这对于可视化图像至关重要。numpy() 将张量转换为 ndarray,这是函数 plt.imshow 接受的唯一对象类型。numpy() 将张量对象的副本返回到 CPU 内存中。
现在我们终于可以开始在训练集上训练模型并在验证集上对其进行评估了。
num_epochs = 30 diz_loss = {'train_loss':[],'val_loss':[]} for epoch in range(num_epochs): train_loss =train_epoch(encoder,decoder,device, train_loader,loss_fn,optim) val_loss = test_epoch(encoder,decoder,device,test_loader,loss_fn) print('\n EPOCH {}/{} \t train loss {} \t val loss {}'.format(epoch + 1, num_epochs,train_loss,val_loss)) diz_loss['train_loss'].append(train_loss) diz_loss['val_loss'].append(val_loss) plot_ae_outputs(encoder,decoder,n=5)
可以注意到自动编码器能够在 30 个 epoch 后很好地重建图像,即使存在一些缺陷。但是由于这个模型真的很简单,所以它的表现非常好。现在模型已经训练完毕,我们要对测试集进行最终评估:
test_epoch(encoder,decoder,device,test_loader,loss_fn).item()
我们还可以观察重建损失如何随着epoch的推移而减少:
# Plot losses plt.figure(figsize=(10,8)) plt.semilogy(diz_loss['train_loss'], label='Train') plt.semilogy(diz_loss['val_loss'], label='Valid') plt.xlabel('Epoch') plt.ylabel('Average Loss') #plt.grid() plt.legend() #plt.title('loss') plt.show()
5. 从潜在代码生成新样本
为了从潜在代码生成新图像,我们定义了一个从潜在空间均匀采样的函数。这些样本将被传递到解码器,解码器将创建重建的图像。
def plot_reconstructed(decoder, r0=(-5, 10), r1=(-10, 5), n=12): plt.figure(figsize=(20,8.5)) w = 28 img = np.zeros((n*w, n*w)) for i, y in enumerate(np.linspace(*r1, n)): for j, x in enumerate(np.linspace(*r0, n)): z = torch.Tensor([[x, y]]).to(device) x_hat = decoder(z) x_hat = x_hat.reshape(28, 28).to('cpu').detach().numpy() img[(n-1-i)*w:(n-1-i+1)*w, j*w:(j+1)*w] = x_hat plt.imshow(img, extent=[*r0, *r1], cmap='gist_gray') plot_reconstructed(decoder, r0=(-1, 1), r1=(-1, 1))
要绘制这些重建图,我们需要知道潜在空间的范围,您可以在下面的潜在空间可视化部分中看到。我们可以观察到,在图的左下角,数字没有意义。实际上,点 (-1,-1) 处的潜在空间是空的。
6. 用 t-SNE 可视化潜在空间
之后我们可以观察动态可视化以查看自编码器学习到的潜在空间。首先,我们使用测试集创建编码样本。
encoded_samples = [] for sample in tqdm(test_dataset): img = sample[0].unsqueeze(0).to(device) label = sample[1] # Encode image encoder.eval() with torch.no_grad(): encoded_img = encoder(img) # Append to list encoded_img = encoded_img.flatten().cpu().numpy() encoded_sample = {f"Enc. Variable {i}": enc for i, enc in enumerate(encoded_img)} encoded_sample['label'] = label encoded_samples.append(encoded_sample) encoded_samples = pd.DataFrame(encoded_samples) encoded_samples
让我们使用颇有技巧的表达库绘制潜在的空间表示:
import plotly.express as px px.scatter(encoded_samples, x='Enc. Variable 0', y='Enc. Variable 1', color=encoded_samples.label.astype(str), opacity=0.7)
从这个图中,我们看到相似的数字聚集在一起。例如“4”与“9”和“5”重叠。
为了让表示更容易阅读,我们可以应用称为 t-SNE 的降维来可视化二维空间中的潜在代码。出于这个原因,我们将固定组件的数量等于 2。
from sklearn.manifold import TSNE tsne = TSNE(n_components=2) tsne_results = tsne.fit_transform(encoded_samples.drop(['label'],axis=1)) fig = px.scatter(tsne_results, x=0, y=1, color=encoded_samples.label.astype(str), labels={'0': 'tsne-2d-one', '1': 'tsne-2d-two'}) fig.show()
可以看到它清楚地区分了一个数字。有一些例外,点属于其他类别,但与之前的表示相比,t-SNE 仍然是一个改进。
总结
恭喜!你已经学会了实现卷积自编码器。自编码器提供了一种压缩图像并提取最重要信息的方法。该模型还有许多扩展以提高性能,其中一些是降噪自动编码器、变分自动编码器和生成对抗网络。