我想针对一个医学影像数据集训练一个CNN模型,网络结构如下:
数据集地址是:Labeled Optical Coherence Tomography (OCT) and Chest X-Ray Images for Classification
Files下的ChestXRay2017.zip和OCT2017.tar.gz,以ChestXRay2017.zip中的测试数据集举例,数据集分为两个类别,每个类别下分别存放有许多影像图像,长这个样子:
下面我们来构造网络并训练该数据集:
import torch.utils.data as Data import numpy as np # numpy==1.19.2 import torch # pytorch==1.8.1 import torch.nn as nn import torchvision # torchvision==0.9.1 import torchvision.transforms as transforms import matplotlib.pyplot as plt # matplotlib==3.3.4 # 超参数 num_epochs = 30 # 训练epoch lr = 0.01 # 步长 image_size = 32 # 图像规格化大小 num_classes = 2 # 分类类别数, chest_xray=2, oct=4 device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 训练设备,有GPU就用GPU,没就用CPU # 图像的归一化参数 data_mean = (0.5, 0.5, 0.5) data_std = (0.5, 0.5, 0.5) # 读入的图像数据转换为torch能处理的格式 data_transform = transforms.Compose([ transforms.ToTensor(), # 矩阵转为tensor transforms.Resize((image_size, image_size)), # 尺寸规格化 transforms.Normalize(data_mean, data_std)]) # 灰度归一化 # 要转换oct还是chest_xray模型只需要替换为对方的数据集地址即可 # 训练集和测试集 train_dataset = torchvision.datasets.ImageFolder(root='./data/chest_xray/train', transform=data_transform) test_dataset = torchvision.datasets.ImageFolder(root='./data/chest_xray/test', transform=data_transform) # 模型存储地址 model_save_path = './chest_xray_model' # Data loader: 数据迭代器 train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=100, shuffle=True) test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=100, shuffle=False) # CNN网络 class ThdCNN(nn.Module): def __init__(self, image_size, num_classes): super(ThdCNN, self).__init__() # 卷积层1 # nn.Conv2d:维度变换(3,32,32) --> (6,32,32),这里的32是超参数image_size # nn.MaxPool2d:维度变换(6,32,32) --> (6,16,16) self.conv1 = nn.Sequential( nn.Conv2d(in_channels=3, out_channels=6, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(num_features=6), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2), ) # 卷积层2 # nn.Conv2d:维度变换(6,16,16) --> (16,16,16) # nn.MaxPool2d:维度变换(16,16,16) --> (16,8,8) self.conv2 = nn.Sequential( nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(num_features=16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2), ) # 全连接层 3层 # 维度变换(16,8,8) --> 120 self.fc1 = nn.Linear(in_features=16 * 8 * 8, out_features=120) # 维度变换120 --> 84 self.fc2 = nn.Linear(in_features=120, out_features=84) # 维度变换84 --> 2,这里的2是超参数num_classes self.fc3 = nn.Linear(in_features=84, out_features=num_classes) def forward(self, x): x = self.conv1(x) x = self.conv2(x) # view(x.size(0), -1): 改变tensor尺寸-> (N ,H , W) 到 (N, H*W) x = x.view(x.size(0), -1) output = self.fc1(x) output = self.fc2(output) output = self.fc3(output) return output ''' ThdCNN( (conv1): Sequential( (0): Conv2d(3, 6, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2)) (1): BatchNorm2d(6, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) (2): ReLU() (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) ) (conv2): Sequential( (0): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2)) (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) (2): ReLU() (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) ) (fc1): Linear(in_features=1024, out_features=120, bias=True) (fc2): Linear(in_features=120, out_features=84, bias=True) (fc3): Linear(in_features=84, out_features=4, bias=True) ) ''' def fit(model, num_epochs, optimizer, device): """ 进行训练和测试,训练结束后显示损失和准确曲线,并保存模型 参数: model: CNN 网络 num_epochs: 训练epochs数量 optimizer: 损失函数优化器 """ loss_func = nn.CrossEntropyLoss() # 定义损失函数 model.to(device) loss_func.to(device) losses = [] accs = [] for epoch in range(num_epochs): print('Epoch {}/{}:'.format(epoch + 1, num_epochs)) # 训练 loss = train(model, train_loader, loss_func, optimizer, device) losses.append(loss) # 测试 accuracy = evaluate(model, test_loader, device) accs.append(accuracy) # 损失函数曲线和准确率曲线 show_curve(losses, "train loss") show_curve(accs, "test accuracy") # 存储模型,两种方法 save_model(model.state_dict(), model_save_path) # save_model(model, model_save_path) def train(model, train_loader, loss_func, optimizer, device): """ 训练模块 model: CNN 网络 train_loader: 训练数据的Dataloader loss_func: 损失函数 device: 训练所用的设备 """ total_loss = 0 # 小批次训练 for i, (images, targets) in enumerate(train_loader): # print("train OK!") images = images.to(device) targets = targets.to(device) # 前向传播 outputs = model(images) loss = loss_func(outputs, targets) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() # 每迭代100次输出一次损失 if (i + 1) % 100 == 0: print("Step [{}/{}] Train Loss: {:.4f}" .format(i + 1, len(train_loader), loss.item())) return total_loss / len(train_loader) def evaluate(model, val_loader, device): """ 测试模块,这里使用test代替validation model: CNN 网络 val_loader: 验证集的Dataloader device: 训练所用的设备 return:分类准确率 """ # evaluate the model model.eval() # context-manager that disabled gradient computation with torch.no_grad(): correct = 0 total = 0 for i, (images, targets) in enumerate(val_loader): # print("evaluate OK!") images = images.to(device) targets = targets.to(device) outputs = model(images) _, predicted = torch.max(outputs.data, dim=1) # 若要单独验证某一图像的类型,可以在这里输出predicted # print(predicted) correct += (predicted == targets).sum().item() total += targets.size(0) accuracy = correct / total print('Accuracy on Test Set: {:.4f} %'.format(100 * accuracy)) return accuracy def save_model(model, save_path): # 存储模型 torch.save(model, save_path) def show_curve(ys, title): ''' 画图 :param ys: 损失或准确率数据 :param title: 标题 :return: ''' print("curve OK!") x = np.array(range(len(ys))) y = np.array(ys) plt.plot(x, y, c='b') plt.axis() plt.title('{} curve'.format(title)) plt.xlabel('epoch') plt.ylabel('{}'.format(title)) plt.show() # 声明和定义神经网络 thdcnn = ThdCNN(image_size, num_classes) # 定义优化器 optimizer = torch.optim.Adam(thdcnn.parameters(), lr=lr) # 开始训练 fit(thdcnn, num_epochs, optimizer, device)
训练并测试完毕后输出的图像: