一:手写二维卷积的实现
要求:
手写二维卷积的实现,并从至少一个数据集上进行实验,这里我选取了车辆分类数据集(后面的实验都是用的车辆分类数据集),主要根据数据集的大小,手动定义二维卷积操作,如:自定义单通道卷积、自定义多通道卷积、自定义卷积层等。
实验过程:
1.1相关包的导入
1. import torch 2. import numpy as np 3. import random 4. from IPython import display 5. from matplotlib import pyplot as plt 6. import torch.utils.data as Data 7. from PIL import Image 8. import os 9. from torch import nn 10. import torch.optim as optim 11. from torch.nn import init 12. import torch.nn.functional as F 13. import time 14. plt.switch_backend('agg')
1.2读取数据集,并区分数据集和测试集
1. #读取图片数据集 2. def readimg(bus_img,car_img,truck_img): 3. for home, dirs, files in os.walk(Result_Path): 4. for dir in dirs: 5. #判断每一类的第一个 6. cal = 0 7. #每一类的路径 8. curdir = Result_Path + "/" + dir 9. for img in os.listdir(curdir): 10. img = Image.open(Result_Path + "/" + dir + "/"+ img) 11. img = np.array(img,np.float32) 12. #增加一个维度 13. img = np.expand_dims(img, 0) 14. #归一化 15. img = img/255 16. #1*H*W*C转换为1*C*H*W 不共享内存 17. tensor_img = torch.tensor(np.transpose(img, (0, 3, 1,2))) 18. #公交一类 19. if dir == "bus": 20. if cal == 0: 21. bus_img = tensor_img 22. else: 23. bus_img = torch.cat((bus_img,tensor_img),0) 24. elif dir == "car": 25. if cal == 0: 26. car_img = tensor_img 27. else: 28. car_img = torch.cat((car_img,tensor_img),0) 29. elif dir=="truck": 30. if cal == 0: 31. truck_img = tensor_img 32. else: 33. truck_img = torch.cat((truck_img,tensor_img),0) 34. cal = cal + 1 35. return bus_img,car_img,truck_img 36. C ,H ,W = 3,64,64 37. #分别读取三个类别的数据集bus、car、truck 38. bus_img = torch.zeros(1,C,H,W) 39. car_img = torch.zeros(1,C,H,W) 40. truck_img = torch.zeros(1,C,H,W) 41. bus_img,car_img,truck_img = readimg(bus_img,car_img,truck_img) 42. #划分数据集为训练集和测试集,其中每类的后大约25%作为测试集 43. trian_bus_img = bus_img[:int(bus_img.shape[0]*3/4)] 44. trian_car_img = car_img[:int(car_img.shape[0]*3/4)] 45. trian_truck_img = truck_img[:int(truck_img.shape[0]*3/4)] 46. test_bus_img = bus_img[int(bus_img.shape[0]*3/4):] 47. test_car_img = car_img[int(car_img.shape[0]*3/4):] 48. test_truck_img = truck_img[int(truck_img.shape[0]*3/4):] 49. #合并所有类别的训练集和测试集 50. train_img = torch.cat((trian_bus_img,trian_car_img,trian_truck_img),0) 51. print(train_img.shape) 52. print(train_img.shape,file=f) 53. test_img = torch.cat((test_bus_img,test_car_img,test_truck_img),0) 54. print(test_img.shape) 55. print(test_img.shape,file=f) 56. 57. #针对类别定义标签,这里设置bus--0、car--1、truck--2 58. label_bus_img = torch.zeros(bus_img.shape[0]) 59. label_car_img = torch.ones(car_img.shape[0]) 60. label_truck_img = torch.ones(truck_img.shape[0])+1 61. #划分标签为训练集和测试集,其中每类的后大约25%作为测试集的标签 62. train_label_bus_img = label_bus_img[:int(label_bus_img.shape[0]*3/4)] 63. train_label_car_img = label_car_img[:int(label_car_img.shape[0]*3/4)] 64. train_label_truck_img = label_truck_img[:int(label_truck_img.shape[0]*3/4)] 65. test_label_bus_img = label_bus_img[int(label_bus_img.shape[0]*3/4):] 66. test_label_car_img = label_car_img[int(label_car_img.shape[0]*3/4):] 67. test_label_truck_img = label_truck_img[int(label_truck_img.shape[0]*3/4):] 68. #合并所有类别的训练集和测试集的标签 69. train_label = torch.cat((train_label_bus_img,train_label_car_img,train_label_truck_img),0) 70. print(train_label.shape) 71. print(train_label.shape,file=f) 72. test_label = torch.cat((test_label_bus_img,test_label_car_img,test_label_truck_img),0) 73. print(test_label.shape) 74. print(test_label.shape,file=f)
注:分别读取处理后的图片的文件夹的图片,然后分别读取到tensor张量,然后分别选取每一类的25%作为测试集,最后合并所有的训练集和测试集,然后分别根据其大小定义相应的标签。
1.3将数据集利用DataLoader进行迭代读取
1. #定义相关参数 2. batch_size = 32 3. num_classes = 3 4. lr = 0.001 5. epochs = 5 6. device = torch.device("cpu") 7. #利用DataLoader组合迭代读取数据 8. # 将训练数据的特征和标签组合 9. dataset = Data.TensorDataset(train_img, train_label) 10. # 把 dataset 放入 DataLoader 11. train_iter = Data.DataLoader( 12. dataset=dataset, # torch TensorDataset format 13. batch_size=batch_size, # mini batch size 14. shuffle=True, # 是否打乱数据 (训练集一般需要进行打乱) 15. num_workers=1, # 多线程来读数据, 注意在Windows下需要设置为0 16. ) 17. # 将测试数据的特征和标签组合 18. dataset = Data.TensorDataset(test_img, test_label) 19. # 把 dataset 放入 DataLoader 20. test_iter = Data.DataLoader( 21. dataset=dataset, # torch TensorDataset format 22. batch_size=batch_size, # mini batch size 23. shuffle=True, # 是否打乱数据 (训练集一般需要进行打乱) 24. num_workers=1, # 多线程来读数据, 注意在Windows下需要设置为0 25. )
注:Logistic回归的模型定义,以及二元交叉熵的损失函数和优化函数。需要注意的是矩阵相乘利用的是torch.mm,而矩阵对应位置相乘是利用的.mul,另外利用view()函数统一为同型张量。
1.4自定义通道卷积
1. #自定义单通道卷积 2. def corr2d(X,K): 3. ''''' 4. X:输入,shape (batch_size,H,W) 5. K:卷积核,shape (k_h,k_w) 6. 单通道 7. ''' 8. batch_size,H,W = X.shape 9. k_h, k_w = K.shape 10. #初始化结果矩阵 11. Y = torch.zeros((batch_size,H - k_h + 1,W- k_w + 1)) 12. for i in range(Y.shape[1]): 13. for j in range(Y.shape [2]): 14. Y[:,i,j] = (X[:,i:i+k_h,j:j+k_w]* K).sum() 15. return Y 16. 17. #自定义多通道卷积 18. def corr2d_mu1ti_in(X, K): 19. #输入X:维度(batch_size,C_in,H, W) 20. #卷积核K:维度(C_in,k_h,k_w) 21. #输出:维度(batch_size,H_out,W_out) 22. 23. #先计算第一通道 24. res = corr2d(X[:,0,:,:], K[0,:,:]) 25. for i in range(1, X.shape[1]): 26. #按通道相加 27. res += corr2d(X[:,i,:,:], K[i,:,:]) 28. return res 29. 30. #自定义多个多通道卷积 31. def corr2d_multi_in_out(X, K): 32. # X: shape (batch_size,C_in,H,W) 33. # K: shape (C_out,C_in,h,w) 34. # Y: shape(batch_size,C_out,H_out,W_out) 35. return torch.stack([corr2d_mu1ti_in(X, k) for k in K],dim=1)
注:这部分主要借鉴ppt内容,手写的卷及操作。
1.5自定义卷积层
1. 自定义卷积层 2. class MyConv2D(nn.Module): 3. def __init__(self,in_channels, out_channels,kernel_size): 4. super(MyConv2D,self).__init__() 5. #初始化卷积层的2个参数:卷积核、偏差 6. #isinstance判断类型 7. if isinstance(kernel_size,int): 8. kernel_size = (kernel_size,kernel_size) 9. self.weight = nn.Parameter(torch.randn((out_channels, in_channels) + kernel_size)) 10. self.bias = nn.Parameter(torch.randn(out_channels,1,1)) 11. def forward(self,x): 12. ''''' 13. x:输入图片,维度(batch_size,C_in,H,W) 14. ''' 15. return corr2d_multi_in_out(x,self.weight) + self.bias
注:这里,初始化卷积层的参数:卷积核、偏差。然后调用之前的卷积通道进行卷积操作。
1.6添加自定义卷积层到模块中
1. #添加自定义卷积层到模块中 2. class MyConvModule(nn.Module): 3. def __init__(self): 4. super(MyConvModule,self).__init__() 5. #定义一层卷积层 6. self.conv = nn.Sequential( 7. MyConv2D(in_channels = 3,out_channels = 32,kernel_size = 3), 8. nn.BatchNorm2d(32), 9. # inplace-选择是否进行覆盖运算 10. nn.ReLU(inplace=True) 11. ) 12. #输出层,将通道数变为分类数量 13. self.fc = nn.Linear(32,num_classes) 14. 15. def forward(self,x): 16. #图片经过一层卷积,输出维度变为(batch_size,C_out,H,W) 17. out = self.conv(x) 18. #使用平均池化层将图片的大小变为1x1,第二个参数为最后输出的长和宽(这里默认相等了)64-3/1 + 1 =62 19. out = F.avg_pool2d(out,62) 20. #将张量out从shape batchx32x1x1 变为 batch x32 21. out = out.squeeze() 22. #输入到全连接层将输出的维度变为3 23. out = self.fc(out) 24. return out
注:将自定义卷积层添加到模块中,完成一层卷积以及平均池化等操作,连接全连接层进行分类输出。
1.7初始化模型、定义损失函数和优化器
1. #初始化模型 2. net = MyConvModule().to(device) 3. #使用多元交叉熵损失函数 4. criterion = nn.CrossEntropyLoss() 5. #使用Adam优化器 6. optimizer = optim.Adam(net.parameters(),lr = lr)
注:这里选取的是交叉熵损失函数以及Adam优化器。
1.8 进行训练迭代和测试迭代函数定义
1. def train_epoch(net, data_loader, device): 2. 3. net.train() #指定当前为训练模式 4. train_batch_num = len(data_loader) #记录共有多少个batch 5. total_1oss = 0 #记录Loss 6. correct = 0 #记录共有多少个样本被正确分类 7. sample_num = 0 #记录样本总数 8. 9. #遍历每个batch进行训练 10. for batch_idx, (data,target) in enumerate (data_loader): 11. #将图片放入指定的device中 12. data = data.to(device).float() 13. #将图片标签放入指定的device中 14. target = target.to(device).long() 15. #将当前梯度清零 16. optimizer.zero_grad() 17. #使用模型计算出结果 18. output = net(data) 19. #计算损失 20. loss = criterion(output, target.squeeze()) 21. #进行反向传播 22. loss.backward() 23. optimizer.step() 24. #累加loss 25. total_1oss += loss.item( ) 26. #找出每个样本值最大的idx,即代表预测此图片属于哪个类别 27. prediction = torch.argmax(output, 1) 28. #统计预测正确的类别数量 29. correct += (prediction == target).sum().item() 30. #累加当前的样本总数 31. sample_num += len(prediction) 32. #计算平均oss与准确率 33. loss = total_1oss / train_batch_num 34. acc = correct / sample_num 35. return loss, acc 36. 37. 38. # In[21]: 39. 40. 41. def test_epoch(net, data_loader, device): 42. net.eval() #指定当前模式为测试模式 43. test_batch_num = len(data_loader) 44. total_loss = 0 45. correct = 0 46. sample_num = 0 47. #指定不进行梯度变化 48. with torch.no_grad(): 49. for batch_idx, (data, target) in enumerate(data_loader): 50. data = data.to(device).float() 51. target = target.to(device).long() 52. output = net(data) 53. loss = criterion(output, target) 54. total_loss += loss.item( ) 55. prediction = torch.argmax(output, 1) 56. correct += (prediction == target).sum().item() 57. sample_num += len(prediction) 58. loss = total_loss / test_batch_num 59. acc = correct / sample_num 60. return loss,acc
注:这里分别是训练函数、测试函数的迭代定义,主要是借鉴ppt的,分别调用模型,输出在训练集和测试集上的损失和精确度。
1.9开始训练
1. #存储每一个epoch的loss与acc的变化,便于后面可视化 2. train_loss_list = [] 3. train_acc_list = [] 4. test_loss_list = [] 5. test_acc_list = [] 6. time_list = [] 7. timestart = time.clock() 8. #进行训练 9. for epoch in range(epochs): 10. #每一个epoch的开始时间 11. epochstart = time.clock() 12. 13. #在训练集上训练 14. train_loss, train_acc = train_epoch(net,data_loader=train_iter, device=device ) 15. #在测试集上验证 16. test_loss, test_acc = test_epoch(net,data_loader=test_iter, device=device) 17. 18. #每一个epoch的结束时间 19. elapsed = (time.clock() - epochstart) 20. #保存各个指际 21. train_loss_list.append(train_loss) 22. train_acc_list.append(train_acc ) 23. test_loss_list.append(test_loss) 24. test_acc_list.append(test_acc) 25. time_list.append(elapsed) 26. print('epoch %d, train_loss %.6f,test_loss %.6f,train_acc %.6f,test_acc %.6f,Time used %.6fs'%(epoch+1, train_loss,test_loss,train_acc,test_acc,elapsed)) 27. #计算总时间 28. timesum = (time.clock() - timestart) 29. print('The total time is %fs',timesum)
注:因为是在ubutun服务器上跑的,对于图片和输出这里都通过文件操作保存到本地文件夹下的txt和jpg文件上了,另外对于其速度比较慢,这里只是跑了5个epoch,来表明程序的正确性。
二:使用torch下的二维卷积的实现
要求:
这里与上面的手写二维卷积除了模型的定义部分以及最后添加了绘图等其他的都是一样的,当然参数设置有些不同,主要在实验结果区分,所以我这里主要针对模型定义以及绘图做相关介绍。
2.1torch.nn定义模型
1. #pytorch封装卷积层 2. class ConvModule(nn.Module): 3. def __init__(self): 4. super(ConvModule,self).__init__() 5. #定义三层卷积层 6. self.conv = nn.Sequential( 7. #第一层 8. nn.Conv2d(in_channels = 3,out_channels = 32,kernel_size = 3 , stride = 1,padding=0), 9. nn.BatchNorm2d(32), 10. # inplace-选择是否进行覆盖运算 11. nn.ReLU(inplace=True), 12. #第二层 13. nn.Conv2d(in_channels = 32,out_channels = 64,kernel_size = 3 , stride = 1,padding=0), 14. nn.BatchNorm2d(64), 15. # inplace-选择是否进行覆盖运算 16. nn.ReLU(inplace=True), 17. #第三层 18. nn.Conv2d(in_channels = 64,out_channels = 128,kernel_size = 3 , stride = 1,padding=0), 19. nn.BatchNorm2d(128), 20. # inplace-选择是否进行覆盖运算 21. nn.ReLU(inplace=True) 22. ) 23. #输出层,将通道数变为分类数量 24. self.fc = nn.Linear(128,num_classes) 25. 26. def forward(self,x): 27. #图片经过三层卷积,输出维度变为(batch_size,C_out,H,W) 28. out = self.conv(x) 29. #使用平均池化层将图片的大小变为1x1,第二个参数为最后输出的长和宽(这里默认相等了)(64-3)/1 + 1 =62 (62-3)/1+1 =60 (60-3)/1+1 =58 30. out = F.avg_pool2d(out,58) 31. #将张量out从shape batchx128x1x1 变为 batch x128 32. out = out.squeeze() 33. #输入到全连接层将输出的维度变为3 34. out = self.fc(out) 35. return out
注:因为torch.nn模块的方便性以及速度比较快,这里定义的模型的卷积层一共有三层因为输入是6464,经过上面的三层卷积,其大小变为5858,所以平均池化的大小需要是58。
2.2 实现绘图操作
1. x = np.linspace(0,len(train_loss_list),len(train_loss_list)) 2. plt.plot(x,train_loss_list,label="train_loss",linewidth=1.5) 3. plt.plot(x,test_loss_list,label="test_loss",linewidth=1.5) 4. plt.xlabel("epoch") 5. plt.ylabel("loss") 6. plt.legend() 7. #plt.show() 8. plt.savefig('2loss.jpg') 9. plt.clf() 10. 11. # In[ ]: 12. 13. 14. x = np.linspace(0,len(train_loss_list),len(train_loss_list)) 15. plt.plot(x,train_acc_list,label="train_acc",linewidth=1.5) 16. plt.plot(x,test_acc_list,label="test_acc",linewidth=1.5) 17. plt.xlabel("epoch") 18. plt.ylabel("acc") 19. plt.legend() 20. #plt.show() 21. plt.savefig('2acc.jpg')
注:根据之前训练集和测试集的训练loss和准确度进行绘图,在这里通过plt.savefig将其保存到了本地