自动编码器是一种无监督的深度学习算法,它学习输入数据的编码表示,然后重新构造与输出相同的输入。它由编码器和解码器两个网络组成。编码器将高维输入压缩成低维潜在代码(也称为潜在代码或编码空间) ,以从中提取最相关的信息,而解码器则解压缩编码数据并重新创建原始输入。
在 Pytorch 中的实现
1. 导入库和 MNIST 数据集
我们可以使用库 torchvision 导入数据集。我们下载训练和测试数据集,并将图像数据集转换为 Tensor。我们不需要对图像进行标准化,因为数据集包含彩色图像。在我们将训练数据集划分为训练集和验证集之后,random_split 为这两个集提供了一个随机分区。DataLoader 用于为训练集、验证集和测试集创建数据加载器,这些数据加载器被分成小批量。 batchsize 是模型训练期间一次迭代中使用的样本数。
import matplotlib.pyplot as plt # plotting library import numpy as np # this module is useful to work with numerical arrays import pandas as pd import random import torch import torchvision from torchvision import transforms from torch.utils.data import DataLoader,random_split from torch import nn import torch.nn.functional as F import torch.optim as optim data_dir = 'dataset' train_dataset = torchvision.datasets.MNIST(data_dir, train=True, download=True) test_dataset = torchvision.datasets.MNIST(data_dir, train=False, download=True) train_transform = transforms.Compose([ transforms.ToTensor(), ]) test_transform = transforms.Compose([ transforms.ToTensor(), ]) train_dataset.transform = train_transform test_dataset.transform = test_transform m=len(train_dataset) train_data, val_data = random_split(train_dataset, [int(m-m*0.2), int(m*0.2)]) batch_size=256 train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size) valid_loader = torch.utils.data.DataLoader(val_data, batch_size=batch_size) test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size,shuffle=True)
2. 定义卷积自编码器
在这里,我们定义了卷积层的自动编码器。它将由两个类组成: 一个用于编码器,另一个用于解码器。该编码器将包含三个卷积层和两个完全连接层。增加了一些批量规范层作为规范层。解码器将具有相同的架构,但顺序是相反的。
class Encoder(nn.Module): def __init__(self, encoded_space_dim,fc2_input_dim): super().__init__() ### Convolutional section self.encoder_cnn = nn.Sequential( nn.Conv2d(1, 8, 3, stride=2, padding=1), nn.ReLU(True), nn.Conv2d(8, 16, 3, stride=2, padding=1), nn.BatchNorm2d(16), nn.ReLU(True), nn.Conv2d(16, 32, 3, stride=2, padding=0), nn.ReLU(True) ) ### Flatten layer self.flatten = nn.Flatten(start_dim=1) ### Linear section self.encoder_lin = nn.Sequential( nn.Linear(3 * 3 * 32, 128), nn.ReLU(True), nn.Linear(128, encoded_space_dim) ) def forward(self, x): x = self.encoder_cnn(x) x = self.flatten(x) x = self.encoder_lin(x) return x class Decoder(nn.Module): def __init__(self, encoded_space_dim,fc2_input_dim): super().__init__() self.decoder_lin = nn.Sequential( nn.Linear(encoded_space_dim, 128), nn.ReLU(True), nn.Linear(128, 3 * 3 * 32), nn.ReLU(True) ) self.unflatten = nn.Unflatten(dim=1, unflattened_size=(32, 3, 3)) self.decoder_conv = nn.Sequential( nn.ConvTranspose2d(32, 16, 3, stride=2, output_padding=0), nn.BatchNorm2d(16), nn.ReLU(True), nn.ConvTranspose2d(16, 8, 3, stride=2, padding=1, output_padding=1), nn.BatchNorm2d(8), nn.ReLU(True), nn.ConvTranspose2d(8, 1, 3, stride=2, padding=1, output_padding=1) ) def forward(self, x): x = self.decoder_lin(x) x = self.unflatten(x) x = self.decoder_conv(x) x = torch.sigmoid(x) return x
· torch.device 使用 GPU 等硬件加速器训练模型
· 将移动到设备的 Encoder 和 Decoder 网络
· nn.MSEloss 和 torch.optim.Adam
### Define the loss function loss_fn = torch.nn.MSELoss() ### Define an optimizer (both for the encoder and the decoder!) lr= 0.001 ### Set the random seed for reproducible results torch.manual_seed(0) ### Initialize the two networks d = 4 #model = Autoencoder(encoded_space_dim=encoded_space_dim) encoder = Encoder(encoded_space_dim=d,fc2_input_dim=128) decoder = Decoder(encoded_space_dim=d,fc2_input_dim=128) params_to_optimize = [ {'params': encoder.parameters()}, {'params': decoder.parameters()} ] optim = torch.optim.Adam(params_to_optimize, lr=lr, weight_decay=1e-05) # Check if the GPU is available device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") print(f'Selected device: {device}') # Move both the encoder and the decoder to the selected device encoder.to(device) decoder.to(device)
4. 训练和评估模型
我们定义了一个函数来训练 AE 模型。首先,我们将输入图像传递给编码器。稍后,将编码数据传递给编码器,然后我们使用 loss_fn(x_hat,x) 计算重建损失。在清除梯度以不累积其他值后,我们执行反向传播,最后通过调用 opt.step() 计算梯度。
### Training function def train_epoch_den(encoder, decoder, device, dataloader, loss_fn, optimizer,noise_factor=0.3): # Set train mode for both the encoder and the decoder encoder.train() decoder.train() train_loss = [] # Iterate the dataloader (we do not need the label values, this is unsupervised learning) for image_batch, _ in dataloader: # with "_" we just ignore the labels (the second element of the dataloader tuple) # Move tensor to the proper device image_noisy = add_noise(image_batch,noise_factor) image_noisy = image_noisy.to(device) # Encode data encoded_data = encoder(image_noisy) # Decode data decoded_data = decoder(encoded_data) # Evaluate loss loss = loss_fn(decoded_data, image_noisy) # Backward pass optimizer.zero_grad() loss.backward() optimizer.step() # Print batch loss print('\t partial train loss (single batch): %f' % (loss.data)) train_loss.append(loss.detach().cpu().numpy()) return np.mean(train_loss)
### Testing function def test_epoch(encoder, decoder, device, dataloader, loss_fn): # Set evaluation mode for encoder and decoder encoder.eval() decoder.eval() with torch.no_grad(): # No need to track the gradients # Define the lists to store the outputs for each batch conc_out = [] conc_label = [] for image_batch, _ in dataloader: # Move tensor to the proper device image_batch = image_batch.to(device) # Encode data encoded_data = encoder(image_batch) # Decode data decoded_data = decoder(encoded_data) conc_out.append(decoded_data.cpu()) conc_label.append(image_batch.cpu()) # Create a single tensor with all the values in the lists conc_out = torch.cat(conc_out) conc_label = torch.cat(conc_label) # Evaluate global loss val_loss = loss_fn(conc_out, conc_label) return val_loss.data
def plot_ae_outputs(encoder,decoder,n=5): plt.figure(figsize=(10,4.5)) for i in range(n): ax = plt.subplot(2,n,i+1) img = test_dataset[i][0].unsqueeze(0).to(device) encoder.eval() decoder.eval() with torch.no_grad(): rec_img = decoder(encoder(img)) plt.imshow(img.cpu().squeeze().numpy(), cmap='gist_gray') ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) if i == n//2: ax.set_title('Original images') ax = plt.subplot(2, n, i + 1 + n) plt.imshow(rec_img.cpu().squeeze().numpy(), cmap='gist_gray') ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) if i == n//2: ax.set_title('Reconstructed images') plt.show()
· test_dataset[i][0].unsqueeze(0) 用于从测试数据集中提取第 i 个图像,然后 在0 轴上增加 1 维。需要此步骤将图像传递给自动编码器。
· decoder(encoder(img))用于获取重建图像
· plt.imshow(img.cpu().squeeze().numpy()) 用于绘制原始图像。squeeze()删除之前添加的维度,这对于可视化图像至关重要。numpy() 将张量转换为 ndarray,这是函数 plt.imshow 接受的唯一对象类型。numpy() 将张量对象的副本返回到 CPU 内存中。
num_epochs = 30 diz_loss = {'train_loss':[],'val_loss':[]} for epoch in range(num_epochs): train_loss =train_epoch(encoder,decoder,device, train_loader,loss_fn,optim) val_loss = test_epoch(encoder,decoder,device,test_loader,loss_fn) print('\n EPOCH {}/{} \t train loss {} \t val loss {}'.format(epoch + 1, num_epochs,train_loss,val_loss)) diz_loss['train_loss'].append(train_loss) diz_loss['val_loss'].append(val_loss) plot_ae_outputs(encoder,decoder,n=5)
可以注意到自动编码器能够在 30 个 epoch 后很好地重建图像,即使存在一些缺陷。但是由于这个模型真的很简单,所以它的表现非常好。现在模型已经训练完毕,我们要对测试集进行最终评估:
# Plot losses plt.figure(figsize=(10,8)) plt.semilogy(diz_loss['train_loss'], label='Train') plt.semilogy(diz_loss['val_loss'], label='Valid') plt.xlabel('Epoch') plt.ylabel('Average Loss') #plt.grid() plt.legend() #plt.title('loss') plt.show()
5. 从潜在代码生成新样本
def plot_reconstructed(decoder, r0=(-5, 10), r1=(-10, 5), n=12): plt.figure(figsize=(20,8.5)) w = 28 img = np.zeros((n*w, n*w)) for i, y in enumerate(np.linspace(*r1, n)): for j, x in enumerate(np.linspace(*r0, n)): z = torch.Tensor([[x, y]]).to(device) x_hat = decoder(z) x_hat = x_hat.reshape(28, 28).to('cpu').detach().numpy() img[(n-1-i)*w:(n-1-i+1)*w, j*w:(j+1)*w] = x_hat plt.imshow(img, extent=[*r0, *r1], cmap='gist_gray') plot_reconstructed(decoder, r0=(-1, 1), r1=(-1, 1))
要绘制这些重建图,我们需要知道潜在空间的范围,您可以在下面的潜在空间可视化部分中看到。我们可以观察到,在图的左下角,数字没有意义。实际上,点 (-1,-1) 处的潜在空间是空的。
6. 用 t-SNE 可视化潜在空间
encoded_samples = [] for sample in tqdm(test_dataset): img = sample[0].unsqueeze(0).to(device) label = sample[1] # Encode image encoder.eval() with torch.no_grad(): encoded_img = encoder(img) # Append to list encoded_img = encoded_img.flatten().cpu().numpy() encoded_sample = {f"Enc. Variable {i}": enc for i, enc in enumerate(encoded_img)} encoded_sample['label'] = label encoded_samples.append(encoded_sample) encoded_samples = pd.DataFrame(encoded_samples) encoded_samples
import plotly.express as px px.scatter(encoded_samples, x='Enc. Variable 0', y='Enc. Variable 1', color=encoded_samples.label.astype(str), opacity=0.7)
为了让表示更容易阅读,我们可以应用称为 t-SNE 的降维来可视化二维空间中的潜在代码。出于这个原因,我们将固定组件的数量等于 2。
from sklearn.manifold import TSNE tsne = TSNE(n_components=2) tsne_results = tsne.fit_transform(encoded_samples.drop(['label'],axis=1)) fig = px.scatter(tsne_results, x=0, y=1, color=encoded_samples.label.astype(str), labels={'0': 'tsne-2d-one', '1': 'tsne-2d-two'}) fig.show()
可以看到它清楚地区分了一个数字。有一些例外,点属于其他类别,但与之前的表示相比,t-SNE 仍然是一个改进。