一:手写循环神经网络的实现
实验: 手动实现循环神经网络RNN,并从至少一个数据集上进行实验,这里我选取了高速公路传感器数据PEMS04(后面的实验都是用的高速公路传感器数据),主要根据数据集的大小以及特征维度,手动实现循环神经网络,包括输入层、隐藏层、输出层,其中注意的是下一层的输入是本身和上一层的隐藏层的同时输入,最后的RNN的返回值为最后一步的隐藏状态,以及每一步的输出状态。
实验目的: 利用手动实现的循环神经网络RNN,利用高速公路车流量数据集,学习回归模型,使得该模型可以很好的根据历史的车流量数据预测未来车流量。
实验算法和原理: 因为是回归模型,所以使用MSE,这里我在测试集上则分别使用了RMSE、MAE、MAPE,梯度更新使用的是Adam优化器。
数据集处理:
在这里统一说一下车流辆回归数据集的处理操作:读取npz文件,只获取一个传感器的所有数据,然后对其归一化操作,最后划分数据集合的80%、20%分别作为训练集和测试集。
1. #读取数据集,进行划分 2. def sliding_window(seq,window_size): 3. result = [] 4. for i in range(len(seq)- window_size): 5. result.append(seq[i: i+window_size]) 6. return result 7. 8. data = np.load("./实验4-数据/高速公路传感器数据/PEMS04/PEMS04.npz") 9. #因为数据集过大,这里只取了第一个传感器的数据 10. data = data["data"][:,0:1,0:1] 11. #归一化 12. dmin,dmax = data.min(),data.max() 13. data = (data - dmin) / (dmax - dmin) 14. sensordata_num,sensor_num,_ = data.shape 15. train_set,test_set = [],[] 16. for i in range(sensor_num) : 17. train_seq = data[:int(sensordata_num*0.8),i,:] 18. test_seq = data[int(sensordata_num*0.8):,i,:] 19. train_set += sliding_window(train_seq,window_size=13) 20. test_set += sliding_window(test_seq,window_size=13) 21. train_set,test_set= np.array(train_set).squeeze(), np.array(test_set).squeeze() 22. print(train_set.shape,test_set.shape) 23. print(train_set,test_set)
实验过程:
1.1相关包的导入
1. import torch 2. import numpy as np 3. import random 4. from IPython import display 5. from matplotlib import pyplot as plt 6. import torch.utils.data as Data 7. from PIL import Image 8. import os 9. from torch import nn 10. import torch.optim as optim 11. from torch.nn import init 12. import torch.nn.functional as F 13. import time 14. import pandas as pd 15. from sklearn.utils import shuffle 16. import math 17. from sklearn.metrics import mean_squared_error as mse, mean_absolute_error as mae 18. #plt.switch_backend('agg')
1.2数据集处理读取,见上
1.3手动定义RNN模型
1. class MyRNN(nn.Module): 2. def __init__(self, input_size, hidden_size, output_size): 3. """ 4. :param input_size: 指定输入数据的维度。例如,对于简单的时间序列预测问题,每一步的输入均为一个采样值,因此input_size=1. 5. :param hidden_size: 指定隐藏状态的维度。这个值并不受输入和输出控制,但会影响模型的容量。 6. :param output_size: 指定输出数据的维度。此值取决于具体的预测要求。例如,对简单的时间序列预测问题,output_size=1. 7. """ 8. super().__init__() 9. self.hidden_size = hidden_size 10. 11. # 可学习参数的维度设置,可以类比一下全连接网络的实现。其维度取决于输入数据的维度,以及指定的隐藏状态维度。 12. self.w_h = nn.Parameter(torch.rand(input_size, hidden_size)) 13. self.u_h = nn.Parameter(torch.rand(hidden_size, hidden_size)) 14. self.b_h = nn.Parameter(torch.zeros(hidden_size)) 15. 16. self.w_y = nn.Parameter(torch.rand(hidden_size, output_size)) 17. self.b_y = nn.Parameter(torch.zeros(output_size)) 18. 19. # 准备激活函数。Dropout函数可选。 20. self.tanh = nn.Tanh() 21. self.leaky_relu = nn.LeakyReLU() 22. 23. # 可选:使用性能更好的参数初始化函数 24. for param in self.parameters(): 25. if param.dim() > 1: 26. nn.init.xavier_uniform_(param) 27. 28. def forward(self, x): 29. """ 30. :param x: 输入序列。一般来说,此输入包含三个维度:batch,序列长度,以及每条数据的特征。 31. """ 32. batch_size = x.size(0) 33. seq_len = x.size(1) 34. 35. # 初始化隐藏状态,一般设为全0。由于是内部新建的变量,需要同步设备位置。 36. h = torch.zeros(batch_size, self.hidden_size).to(x.device) 37. # RNN实际上只能一步一步处理序列。因此需要用循环迭代。 38. y_list = [] 39. for i in range(seq_len): 40. h = self.tanh(torch.matmul(x[:, i, :], self.w_h) + 41. torch.matmul(h, self.u_h) + self.b_h) # (batch_size, hidden_size) 42. y = self.leaky_relu(torch.matmul(h, self.w_y) + self.b_y) # (batch_size, output_size) 43. y_list.append(y) 44. # 一般来说,RNN的返回值为最后一步的隐藏状态,以及每一步的输出状态。 45. return h, torch.stack(y_list, dim=1)
1.4初始化模型、定义优化器
1. device = 'cpu' 2. model = MyRNN(input_size=1, hidden_size=32, output_size=1).to(device) 3. loss_func = nn.MSELoss() 4. optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
1.5手动定义mape函数
1. def mape(y_true, y_pred): 2. y_true, y_pred = np.array(y_true), np.array(y_pred) 3. non_zero_index = (y_true > 0) 4. y_true = y_true[non_zero_index] 5. y_pred = y_pred[non_zero_index] 6. 7. mape = np.abs((y_true - y_pred) / y_true) 8. mape[np.isinf(mape)] = 0 9. return np.mean(mape) * 100
1.6手动实现next_batch函数
1. def next_batch(data, batch_size): 2. data_length = len(data) 3. num_batches = math.ceil(data_length / batch_size) 4. for batch_index in range(num_batches): 5. start_index = batch_index * batch_size 6. end_index = min((batch_index + 1) * batch_size, data_length) 7. yield data[start_index:end_index]
1.7开始训练和测试
1. t1. train_log = [] 2. test_log = [] 3. #开始时间 4. timestart = time.time() 5. trained_batches = 0 #记录多少个batch 6. for epoch in range(100): 7. 8. total_1oss = 0 #记录Loss 9. for batch in next_batch(shuffle(train_set), batch_size=128): 10. #每一个batch的开始时间 11. batchstart = time.time() 12. 13. batch = torch.from_numpy(batch).float().to(device) # (batch, seq_len) 14. # 使用短序列的前12个值作为历史,最后一个值作为预测值。 15. x, label = batch[:, :12], batch[:, -1] 16. hidden, out = model(x.unsqueeze(-1)) 17. prediction = out[:, -1, :].squeeze(-1) # (batch) 18. loss = loss_func(prediction, label) 19. optimizer.zero_grad() 20. loss.backward() 21. optimizer.step() 22. #correct += (prediction == label).sum().item() 23. #累加loss 24. #total_1oss += loss.item( ) 25. trained_batches += 1 26. #计算平均oss与准确率 27. #train_loss = total_1oss / train_batch_num 28. #train_log.append(train_loss) 29. # 每训练一定数量的batch,就在测试集上测试模型效果。 30. #if trained_batches % 100 == 0: 31. train_log.append(loss.detach().cpu().numpy().tolist()); 32. train_batch_time = (time.time() - batchstart) 33. print('batch %d, train_loss %.6f,Time used %.6fs'%(trained_batches, loss,train_batch_time)) 34. print('batch %d, train_loss %.6f,Time used %.6fs'%(trained_batches, loss,train_batch_time),file=f) 35. 36. 37. # 每训练一定数量的batch,就在测试集上测试模型效果。 38. if trained_batches % 100 == 0: 39. #每一个batch的开始时间 40. batch_test_start = time.time() 41. #在每个epoch上测试 42. all_prediction = [] 43. for batch in next_batch(test_set, batch_size=128): 44. batch = torch.from_numpy(batch).float().to(device) # (batch, seq_len) 45. x, label = batch[:, :12], batch[:, -1] 46. hidden, out = model(x.unsqueeze(-1)) 47. #hidden, out = model(batch) 48. prediction = out[:, -1, :].squeeze(-1) # (batch) 49. all_prediction.append(prediction.detach().cpu().numpy()) 50. 51. all_prediction = np.concatenate(all_prediction) 52. all_label = test_set[:, -1] 53. # 没有进行反归一化操作。 54. #all_prediction = denormalize(all_prediction) 55. #all_label = denormalize(all_label) 56. # 计算测试指标。 57. rmse_score = math.sqrt(mse(all_label, all_prediction)) 58. mae_score = mae(all_label, all_prediction) 59. mape_score = mape(all_label, all_prediction) 60. test_log.append([rmse_score, mae_score, mape_score]) 61. test_batch_time = (time.time() - batch_test_start) 62. print('***************************test_batch %d, test_rmse_loss %.6f,test_mae_loss %.6f,test_mape_loss %.6f,Time used %.6fs'%(trained_batches, rmse_score,mae_score,mape_score,test_batch_time)) 63. print('***************************test_batch %d, test_rmse_loss %.6f,test_mae_loss %.6f,test_mape_loss %.6f,Time used %.6fs'%(trained_batches, rmse_score,mae_score,mape_score,test_batch_time),file=f) 64. #计算总时间 65. timesum = (time.time() - timestart) 66. print('The total time is %fs'%(timesum)) 67. print('The total time is %fs'%(timesum),file=f)
1.8 绘制train_loss的曲线图
1. #train_loss曲线 2. x = np.linspace(0,len(train_log),len(train_log)) 3. plt.plot(x,train_log,label="train_loss",linewidth=1.5) 4. #plt.plot(x_test,test_log[:,0],label="test_rmse_loss",linewidth=1.5) 5. #plt.plot(x_test,test_log[:,1],label="test_mae_loss",linewidth=1.5) 6. #plt.plot(x_test,test_log[:,2],label="test_mape_loss",linewidth=1.5) 7. plt.xlabel("number of batches") 8. plt.ylabel("loss") 9. plt.legend() 10. plt.show() 11. plt.savefig('1.1manualRNNtrainloss.jpg') 12. #plt.clf()
1.9分别绘制测试集的rmse、mae、mape的曲线图
1. #test_loss曲线 2. x_test= np.linspace(0,len(test_log),len(test_log)) 3. test_log = np.array(test_log) 4. plt.plot(x_test,test_log[:,0],label="test_rmse_loss",linewidth=1.5) 5. plt.xlabel("number of batches*100") 6. plt.ylabel("loss") 7. plt.legend() 8. plt.show() 9. plt.savefig('1.1manualRNNtestrmseloss.jpg') 10. #plt.clf() 11. 12. #test_loss曲线 13. x_test= np.linspace(0,len(test_log),len(test_log)) 14. test_log = np.array(test_log) 15. plt.plot(x_test,test_log[:,1],label="test_mae_loss",linewidth=1.5) 16. plt.xlabel("number of batches*100") 17. plt.ylabel("loss") 18. plt.legend() 19. plt.show() 20. plt.savefig('1.1manualRNNtestrmaeloss.jpg') 21. #plt.clf() 22. 23. #test_loss曲线 24. x_test= np.linspace(0,len(test_log),len(test_log)) 25. test_log = np.array(test_log) 26. plt.plot(x_test,test_log[:,2],label="test_mape_loss",linewidth=1.5) 27. plt.xlabel("number of batches*100") 28. plt.ylabel("loss") 29. plt.legend() 30. plt.show() 31. plt.savefig('1.1manualRNNtestrmapeloss.jpg') 32. #plt.clf()
其他如利用torch.nn实现和Pytorch实现LSTM、Pytorch实现GRU以及实验的代码和数据集,