实验目的:
实现了一个多层双向LSTM模型,并用于训练一个时间序列预测任务
训练程序:
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import torch import torch.nn as nn import torch.optim as optim import numpy as np from torch.utils.data import DataLoader, TensorDataset # 定义多层双向LSTM模型 class MultiLayerBiLSTM(nn.Module): def __init__(self, input_size, hidden_size, output_size, num_layers): super(MultiLayerBiLSTM, self).__init__() self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, bidirectional=True) self.avg_pool = nn.AdaptiveAvgPool1d(1) # 自适应平均池化层 self.fc = nn.Linear(hidden_size * 2, output_size) def forward(self, x): output, _ = self.lstm(x.unsqueeze(1)) # 添加时间步维度 output = output.permute(0, 2, 1) # 将时间步维度移到第2维 output = self.avg_pool(output) output = output.squeeze(2) # 去掉平均池化的维度1 output = self.fc(output) return output # 训练函数 def train_model(data_folder, model_save_path): # 遍历数据文件夹中的所有CSV文件 train_data = [] for filename in os.listdir(data_folder): if filename.endswith(".csv"): file_path = os.path.join(data_folder, filename) df = np.genfromtxt(file_path, delimiter=',', skip_header=1) # 使用numpy加载数据 train_data.append(df) # 合并所有CSV文件为一个大的训练数据集 train_data_combined = np.vstack(train_data) # 提取特征和标签 X = train_data_combined[:, 1:4] # Time列不作为特征 y = train_data_combined[:, 1:4] # 使用相同的坐标作为标签 # 将数据转换为PyTorch张量 X_tensor = torch.tensor(X, dtype=torch.float32) y_tensor = torch.tensor(y, dtype=torch.float32) # 数据分批处理 batch_size = 32 dataset = TensorDataset(X_tensor, y_tensor) dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) # 定义模型和优化器 input_size = 3 hidden_size = 64 output_size = 3 num_layers = 2 # 设置多层LSTM model = MultiLayerBiLSTM(input_size, hidden_size, output_size, num_layers) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 使用GPU加速 model.to(device) optimizer = optim.Adam(model.parameters(), lr=0.001) criterion = nn.MSELoss() # 训练模型 num_epochs = 100 for epoch in range(num_epochs): total_loss = 0 for inputs, targets in dataloader: inputs, targets = inputs.to(device), targets.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets) loss.backward() optimizer.step() total_loss += loss.item() print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(dataloader):.4f}') # 保存模型 torch.save(model.state_dict(), model_save_path) print(f'Model saved as {model_save_path}') # 运行训练函数并保存模型 train_data_folder = 'data/' # 指定包含训练数据集的文件夹路径 model_save_path = 'model/model.pt' train_model(train_data_folder, model_save_path)
程序解释:
这段代码是一个使用PyTorch实现的多层双向LSTM模型,并用于训练的程序。下面我来逐行解释代码的功能: 1. `#!/usr/bin/env python`: 这是一个Shebang行,它告诉系统使用哪个解释器来执行该脚本。在这里,它指定使用env命令查找Python解释器。 2. `# -*- coding: utf-8 -*-`: 这是指定Python源文件的字符编码为UTF-8,确保脚本中的中文等非ASCII字符可以被正确解析。 3. 导入所需的Python库和模块: - `os`: 提供与操作系统交互的功能,这里用于遍历数据文件夹中的CSV文件。 - `torch`: PyTorch的主要库,用于构建和训练神经网络。 - `torch.nn`: 包含神经网络的层和损失函数等。 - `torch.optim`: 提供优化算法,如Adam等。 - `numpy`: 用于数值计算和数据处理。 - `torch.utils.data`: 提供数据加载和处理的实用工具。 4. 定义了一个多层双向LSTM模型 `MultiLayerBiLSTM`: - 该模型继承自`nn.Module`,是PyTorch中构建神经网络模型的基类。 - 模型的初始化函数 `__init__` 接收输入特征大小、隐藏层大小、输出大小和LSTM层数作为参数,然后创建LSTM、自适应平均池化层和全连接层。 - 模型的前向传播函数 `forward` 接收输入张量 x,将其传入LSTM层并进行平均池化和全连接操作,最后返回输出。 5. `train_model(data_folder, model_save_path)`: 这是训练函数的定义。它接收数据文件夹路径 `data_folder` 和模型保存路径 `model_save_path` 作为输入。 6. 在函数内部,代码首先遍历数据文件夹中的所有CSV文件,将它们加载到一个列表 `train_data` 中。 7. 然后将所有CSV文件的数据合并为一个大的训练数据集 `train_data_combined`。 8. 接下来,从训练数据集中提取特征和标签,其中 X 是输入特征,y 是标签。 9. 将数据转换为PyTorch张量,即 `X_tensor` 和 `y_tensor`。 10. 将数据划分为批次,并创建数据加载器 `dataloader`。 11. 定义模型和优化器:模型使用上面定义的 `MultiLayerBiLSTM` 类来创建,优化器使用Adam算法进行参数优化。 12. 开始模型训练,将模型移动到合适的设备(GPU或CPU),然后在每个epoch中遍历数据加载器,计算损失并执行反向传播更新参数。 13. 打印每个epoch的平均损失。 14. 训练完成后,将模型保存到指定的 `model_save_path`。 15. 最后,指定数据文件夹路径 `train_data_folder` 和模型保存路径 `model_save_path`,运行训练函数 `train_model`。
预测程序:
#!/usr/bin/env python # -*- coding: utf-8 -*- import torch import torch.nn as nn import pandas as pd # 定义多层双向LSTM模型 class MultiLayerBiLSTM(nn.Module): def __init__(self, input_size, hidden_size, output_size, num_layers): super(MultiLayerBiLSTM, self).__init__() self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, bidirectional=True) self.avg_pool = nn.AdaptiveAvgPool1d(1) # 自适应平均池化层 self.fc = nn.Linear(hidden_size * 2, output_size) def forward(self, x): output, _ = self.lstm(x) output = output.permute(0, 2, 1) # 将时间步维度移到第2维 output = self.avg_pool(output) output = output.squeeze(2) # 去掉平均池化的维度1 output = self.fc(output) return output # 加载模型函数 def load_model(model_path, input_size, hidden_size, output_size, num_layers): model = MultiLayerBiLSTM(input_size, hidden_size, output_size, num_layers) model.load_state_dict(torch.load(model_path)) model.eval() return model # 预测函数 def predict_next_five_frames(model, data_path, num_history_frames=20, num_prediction_frames=5): # 读取预测数据集 df = pd.read_csv(data_path) X = df[['X', 'Y', 'Z']].values # 只选择空间坐标,不包含时间信息 # 将数据转换为PyTorch张量 X_tensor = torch.tensor(X, dtype=torch.float32) # 历史输入(最后20帧数据) history_input = X_tensor[-num_history_frames:] # 进行预测 with torch.no_grad(): predicted_coordinates = [] for i in range(num_prediction_frames): output = model(history_input.unsqueeze(0)) predicted_coordinates.append(output.squeeze().tolist()) # 将当前预测结果添加到历史输入中,作为下一次预测的输入 history_input = torch.cat((history_input[1:], output), dim=0) # 输出预测结果 print("Predicted coordinates (X, Y, Z) for the next 5 frames:") for coord in predicted_coordinates: print(coord) # 模型参数 input_size = 3 hidden_size = 64 output_size = 3 num_layers = 2 # 预测数据集路径 predict_data_path = 'data/data_input.csv' # 预训练的模型路径 pretrained_model_path = 'model/model.pt' # 加载模型 model = load_model(pretrained_model_path, input_size, hidden_size, output_size, num_layers) # 进行预测 predict_next_five_frames(model, predict_data_path)