ConvNeXt网络介绍
今年(2022)一月份,Facebook AI Research和UC Berkeley一起发表了一篇文章A ConvNet for the 2020s,在文章中提出了ConvNeXt纯卷积神经网络,它对标的是2021年非常火的Swin Transformer,通过一系列实验比对,在相同的FLOPs下,ConvNeXt相比Swin Transformer拥有更快的推理速度以及更高的准确率,在ImageNet 22K上ConvNeXt-XL达到了87.8%的准确率,参看下图。
原论文链接:https://arxiv.org/pdf/2201.03545.pdf
ConvNeXt在ResNet50模型的基础上,仿照Swin Transformer的结构进行改进而得到的纯卷积模型下面就介绍关于ConvNeXt网络的改进之处。
1.Macro design
- 改变stage compute ratio
Swin-T的比例是1:1:3:1 Swin-L的比例是1:1:9:1作者将ResNet50中的堆叠次数由(3, 4, 6, 3)调整成(3, 3, 9, 3),作者将stem(降采样层)换成卷积核大小为4,步距为4的卷积层。对于更大的模型,也跟进了Swin所使用的1:1:9:1。下图可以看出ResNet网络和Swin-Transformer不同网络的堆叠次数。这里改变stage compute ratio后模型准确率从78.8提高到79.4
2. 使用Patchify的stem
从Vision Transformer开始,为了将图片转化为token,图片都会先被分割成一个一个的patch,而在传统ResNet中stem层是使用一个stride=2的7x7卷积加最大池化层。作者将stem换成卷积核大小为4,步距为4的卷积层,准确率从79.4提高到79.5。
2.ResNeXt-ify
使用ResNeXt中的深度卷积
ResNeXt相比普通的ResNet而言在FLOPs以及 accuracy之间做到了更好的平衡。这里作者采用的是更激进的depthwise convolution(即分组数等于输入通道数),准确率从79.5到80.5。
3.使用Inverted bottleneck(反瓶颈结构)
MLP模块非常像MobileNetV2中的Inverted Bottleneck模块,即两头细中间粗。然后修改反瓶颈中卷积层位置,Moving up depthwise conv layer,将depthwise conv模块上移,原来是 1x1 conv -> depthwise conv -> 1x1 conv现在变成depthwise conv -> 1x1 conv -> 1x1 conv,准确率从80.5提高到80.6。
4.使用更大的卷积核
由于Swin-T中使用了7x7卷积核,这一步主要是为了对齐比较。又因为inverted bottleneck放大了中间卷积层的缘故,直接替换会导致参数量增大,因而作者把dw conv的位置进行了调整,放到了反瓶颈的开头。最终结果相近,说明在7x7在相同参数量下效果是一致的。将depthwise conv的卷积核大小由3x3改成了7x7,准确率从80.6到80.6。
5.Micro designs(其它小调整)
1.Replacing ReLU with GELU( 用GELU激活函数替换ReLU),准确率从80.6到80.6。
2.Fewer activation functions,(减少激活层数量),由于Transformer中只使用了一个激活层,因此在设计上进行了效仿,结果发现只在block中的两个1x1卷积之间使用一层激活层,其他地方不适用,反而带来了0.7个点的提升。这说明太频繁地做非线性投影对于网络特征的信息传递实际上是有害的,准确率从80.6到81.3。
3.Fewer normalization layers( 减少归一化层数量),基于跟减少激活层相同的逻辑,由于Transformer中BN层很少,本文也只保留了1x1卷积之前的一层BN,而两个1x1卷积层之间甚至没有使用归一化层,准确率从81.3到81.4。
4.Substituting BN with LN(用LN替换BN),由于Transformer中使用了LN,且一些研究发现BN会对网络性能带来一些负面影响,本文将所有的BN替换为LN,准确率从81.4到81.5。
5.Separate downsampling layers( 单独的下采样层),标准ResNet的下采样层通常是stride=2的3x3卷积,对于有残差结构的block则在短路连接中使用stride=2的1x1卷积,这使得CNN的下采样层基本与其他层保持了相似的计算策略。而Swin-T中的下采样层是单独的,因此本文用stride=2的2x2卷积进行模拟。又因为这样会使训练不稳定,因此在每个下采样层前面增加了LN来稳定训练,准确率从81.5到82.0。
整体来说,ConvNeXt-Tiny模型表示在下图,训练使用AdamW优化器。
总结:
ConvNeXt是一个向transformer网络靠拢的cnn模型,从作者的实验看出,每一点精度的提升都是经过大量的实验。
模型以及训练代码
训练在5分类的花数据集上在ConvNeXt-Tiny模型上准确率为92.3,在ConvNeXt-Tiny模型上准确率为93.2(可以更高,这里只训练了20epochs)
1.模型代码(model.py)
""" original code from facebook research: https://github.com/facebookresearch/ConvNeXt """ import torch import torch.nn as nn import torch.nn.functional as F def drop_path(x, drop_prob: float = 0., training: bool = False): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use 'survival rate' as the argument. """ if drop_prob == 0. or not training: return x keep_prob = 1 - drop_prob shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device) random_tensor.floor_() # binarize output = x.div(keep_prob) * random_tensor return output class DropPath(nn.Module): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). """ def __init__(self, drop_prob=None): super(DropPath, self).__init__() self.drop_prob = drop_prob def forward(self, x): return drop_path(x, self.drop_prob, self.training) class LayerNorm(nn.Module): r""" LayerNorm that supports two data formats: channels_last (default) or channels_first. The ordering of the dimensions in the inputs. channels_last corresponds to inputs with shape (batch_size, height, width, channels) while channels_first corresponds to inputs with shape (batch_size, channels, height, width). """ def __init__(self, normalized_shape, eps=1e-6, data_format="channels_last"): super().__init__() self.weight = nn.Parameter(torch.ones(normalized_shape), requires_grad=True) self.bias = nn.Parameter(torch.zeros(normalized_shape), requires_grad=True) self.eps = eps self.data_format = data_format if self.data_format not in ["channels_last", "channels_first"]: raise ValueError(f"not support data format '{self.data_format}'") self.normalized_shape = (normalized_shape,) def forward(self, x: torch.Tensor) -> torch.Tensor: if self.data_format == "channels_last": return F.layer_norm(x, self.normalized_shape, self.weight, self.bias, self.eps) elif self.data_format == "channels_first": # [batch_size, channels, height, width] mean = x.mean(1, keepdim=True) var = (x - mean).pow(2).mean(1, keepdim=True) x = (x - mean) / torch.sqrt(var + self.eps) x = self.weight[:, None, None] * x + self.bias[:, None, None] return x class Block(nn.Module): r""" ConvNeXt Block. There are two equivalent implementations: (1) DwConv -> LayerNorm (channels_first) -> 1x1 Conv -> GELU -> 1x1 Conv; all in (N, C, H, W) (2) DwConv -> Permute to (N, H, W, C); LayerNorm (channels_last) -> Linear -> GELU -> Linear; Permute back We use (2) as we find it slightly faster in PyTorch Args: dim (int): Number of input channels. drop_rate (float): Stochastic depth rate. Default: 0.0 layer_scale_init_value (float): Init value for Layer Scale. Default: 1e-6. """ def __init__(self, dim, drop_rate=0., layer_scale_init_value=1e-6): super().__init__() self.dwconv = nn.Conv2d(dim, dim, kernel_size=7, padding=3, groups=dim) # depthwise conv self.norm = LayerNorm(dim, eps=1e-6, data_format="channels_last") self.pwconv1 = nn.Linear(dim, 4 * dim) # pointwise/1x1 convs, implemented with linear layers self.act = nn.GELU() self.pwconv2 = nn.Linear(4 * dim, dim) self.gamma = nn.Parameter(layer_scale_init_value * torch.ones((dim,)), requires_grad=True) if layer_scale_init_value > 0 else None self.drop_path = DropPath(drop_rate) if drop_rate > 0. else nn.Identity() def forward(self, x: torch.Tensor) -> torch.Tensor: shortcut = x x = self.dwconv(x) x = x.permute(0, 2, 3, 1) # [N, C, H, W] -> [N, H, W, C] x = self.norm(x) x = self.pwconv1(x) x = self.act(x) x = self.pwconv2(x) if self.gamma is not None: x = self.gamma * x x = x.permute(0, 3, 1, 2) # [N, H, W, C] -> [N, C, H, W] x = shortcut + self.drop_path(x) return x class ConvNeXt(nn.Module): r""" ConvNeXt A PyTorch impl of : `A ConvNet for the 2020s` - https://arxiv.org/pdf/2201.03545.pdf Args: in_chans (int): Number of input image channels. Default: 3 num_classes (int): Number of classes for classification head. Default: 1000 depths (tuple(int)): Number of blocks at each stage. Default: [3, 3, 9, 3] dims (int): Feature dimension at each stage. Default: [96, 192, 384, 768] drop_path_rate (float): Stochastic depth rate. Default: 0. layer_scale_init_value (float): Init value for Layer Scale. Default: 1e-6. head_init_scale (float): Init scaling value for classifier weights and biases. Default: 1. """ def __init__(self, in_chans: int = 3, num_classes: int = 1000, depths: list = None, dims: list = None, drop_path_rate: float = 0., layer_scale_init_value: float = 1e-6, head_init_scale: float = 1.): super().__init__() self.downsample_layers = nn.ModuleList() # stem and 3 intermediate downsampling conv layers stem = nn.Sequential(nn.Conv2d(in_chans, dims[0], kernel_size=4, stride=4), LayerNorm(dims[0], eps=1e-6, data_format="channels_first")) self.downsample_layers.append(stem) # 对应stage2-stage4前的3个downsample for i in range(3): downsample_layer = nn.Sequential(LayerNorm(dims[i], eps=1e-6, data_format="channels_first"), nn.Conv2d(dims[i], dims[i+1], kernel_size=2, stride=2)) self.downsample_layers.append(downsample_layer) self.stages = nn.ModuleList() # 4 feature resolution stages, each consisting of multiple blocks dp_rates = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))] cur = 0 # 构建每个stage中堆叠的block for i in range(4): stage = nn.Sequential( *[Block(dim=dims[i], drop_rate=dp_rates[cur + j], layer_scale_init_value=layer_scale_init_value) for j in range(depths[i])] ) self.stages.append(stage) cur += depths[i] self.norm = nn.LayerNorm(dims[-1], eps=1e-6) # final norm layer self.head = nn.Linear(dims[-1], num_classes) self.apply(self._init_weights) self.head.weight.data.mul_(head_init_scale) self.head.bias.data.mul_(head_init_scale) def _init_weights(self, m): if isinstance(m, (nn.Conv2d, nn.Linear)): nn.init.trunc_normal_(m.weight, std=0.2) nn.init.constant_(m.bias, 0) def forward_features(self, x: torch.Tensor) -> torch.Tensor: for i in range(4): x = self.downsample_layers[i](x) x = self.stages[i](x) return self.norm(x.mean([-2, -1])) # global average pooling, (N, C, H, W) -> (N, C) def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.forward_features(x) x = self.head(x) return x def convnext_tiny(num_classes: int): # https://dl.fbaipublicfiles.com/convnext/convnext_tiny_1k_224_ema.pth model = ConvNeXt(depths=[3, 3, 9, 3], dims=[96, 192, 384, 768], num_classes=num_classes) return model def convnext_small(num_classes: int): # https://dl.fbaipublicfiles.com/convnext/convnext_small_1k_224_ema.pth model = ConvNeXt(depths=[3, 3, 27, 3], dims=[96, 192, 384, 768], num_classes=num_classes) return model def convnext_base(num_classes: int): # https://dl.fbaipublicfiles.com/convnext/convnext_base_1k_224_ema.pth # https://dl.fbaipublicfiles.com/convnext/convnext_base_22k_224.pth model = ConvNeXt(depths=[3, 3, 27, 3], dims=[128, 256, 512, 1024], num_classes=num_classes) return model def convnext_large(num_classes: int): # https://dl.fbaipublicfiles.com/convnext/convnext_large_1k_224_ema.pth # https://dl.fbaipublicfiles.com/convnext/convnext_large_22k_224.pth model = ConvNeXt(depths=[3, 3, 27, 3], dims=[192, 384, 768, 1536], num_classes=num_classes) return model def convnext_xlarge(num_classes: int): # https://dl.fbaipublicfiles.com/convnext/convnext_xlarge_22k_224.pth model = ConvNeXt(depths=[3, 3, 27, 3], dims=[256, 512, 1024, 2048], num_classes=num_classes) return model
2.训练代码(包括train.py和utils.py)
train.py
import json import os import argparse import time import torch import torch.optim as optim from torch.utils.tensorboard import SummaryWriter from torchvision import transforms, datasets from model import convnext_tiny as create_model from utils import create_lr_scheduler, get_params_groups, train_one_epoch, evaluate,plot_class_preds def main(args): os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" device = torch.device(args.device if torch.cuda.is_available() else "cpu") print(args) # 创建定义文件夹以及文件 filename = 'record.txt' save_path = 'runs' path_num = 1 while os.path.exists(save_path + f'{path_num}'): path_num += 1 save_path = save_path + f'{path_num}' os.mkdir(save_path) f = open(save_path + "/" + filename, 'w') f.write("{}\n".format(args)) # print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/') # 实例化SummaryWriter对象 # ####################################### tb_writer = SummaryWriter(log_dir=save_path + "/flower_experiment") if os.path.exists(save_path + "/weights") is False: os.makedirs(save_path + "/weights") img_size = 224 data_transform = { "train": transforms.Compose([transforms.RandomResizedCrop(img_size), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]), "val": transforms.Compose([transforms.Resize(int(img_size * 1.143)), transforms.CenterCrop(img_size), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])} # 实例化训练数据集 train_data_set = datasets.ImageFolder(root=os.path.join(args.data_path, "train"), transform=data_transform["train"]) # 实例化验证数据集 val_data_set = datasets.ImageFolder(root=os.path.join(args.data_path, "val"), transform=data_transform["val"]) # 生成class_indices.json文件,包括有模型对应的序列号 # ####################################### classes_list = train_data_set.class_to_idx cla_dict = dict((val, key) for key, val in classes_list.items()) # write dict into json file json_str = json.dumps(cla_dict, indent=4) with open('class_indices.json', 'w') as json_file: json_file.write(json_str) batch_size = args.batch_size nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers print('Using {} dataloader workers every process'.format(nw)) train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=nw) val_loader = torch.utils.data.DataLoader(val_data_set, batch_size=batch_size, shuffle=False, pin_memory=True, num_workers=nw) model = create_model(num_classes=args.num_classes).to(device) # Write the model into tensorboard # ####################################### init_img = torch.zeros((1, 3, 224, 224), device=device) tb_writer.add_graph(model, init_img) if args.weights != "": assert os.path.exists(args.weights), "weights file: '{}' not exist.".format(args.weights) # weights_dict = torch.load(args.weights, map_location=device) weights_dict = torch.load(args.weights, map_location=device)["model"] # 删除有关分类类别的权重 for k in list(weights_dict.keys()): if "head" in k: del weights_dict[k] print(model.load_state_dict(weights_dict, strict=False)) if args.freeze_layers: for name, para in model.named_parameters(): # 除head外,其他权重全部冻结 if "head" not in name: para.requires_grad_(False) else: print("training {}".format(name)) # pg = [p for p in model.parameters() if p.requires_grad] pg = get_params_groups(model, weight_decay=args.wd) optimizer = optim.AdamW(pg, lr=args.lr, weight_decay=args.wd) lr_scheduler = create_lr_scheduler(optimizer, len(train_loader), args.epochs, warmup=True, warmup_epochs=1) best_acc = 0.0 for epoch in range(args.epochs): # 计时器time_start time_start = time.time() # train train_loss, train_acc = train_one_epoch(model=model, optimizer=optimizer, data_loader=train_loader, device=device, epoch=epoch, lr_scheduler=lr_scheduler) # validate val_loss, val_acc = evaluate(model=model, data_loader=val_loader, device=device, epoch=epoch) time_end = time.time() f.write("[epoch {}] train_loss: {:.3f},train_acc:{:.3f},val_loss:{:.3f},val_acc:{:.3f},Spend_time:{:.3f}S" .format(epoch + 1, train_loss, train_acc, val_loss, val_acc, time_end - time_start)) f.flush() # add Training results into tensorboard # ####################################### tags = ["train_loss", "train_acc", "val_loss", "val_acc", "learning_rate"] tb_writer.add_scalar(tags[0], train_loss, epoch) tb_writer.add_scalar(tags[1], train_acc, epoch) tb_writer.add_scalar(tags[2], val_loss, epoch) tb_writer.add_scalar(tags[3], val_acc, epoch) tb_writer.add_scalar(tags[4], optimizer.param_groups[0]["lr"], epoch) # add figure into tensorboard # ####################################### fig = plot_class_preds(net=model, images_dir=r"D:/other/ClassicalModel/data/flower_datas/plot_img", transform=data_transform["val"], num_plot=6, device=device) if fig is not None: tb_writer.add_figure("predictions vs. actuals", figure=fig, global_step=epoch) if val_acc > best_acc: best_acc = val_acc f.write(',save best model') torch.save(model.state_dict(), save_path + "/weights/bestmodel.pth") f.write('\n') f.close() if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--num_classes', type=int, default=5) parser.add_argument('--epochs', type=int, default=1) parser.add_argument('--batch-size', type=int, default=4) parser.add_argument('--lr', type=float, default=5e-4) parser.add_argument('--wd', type=float, default=5e-2) parser.add_argument('--data-path', type=str, default=r"D:/other/ClassicalModel/data/flower_datas") parser.add_argument('--weights', type=str, # default=r"D:/other/ClassicalModel/ConvNext/weights_pre/convnext_tiny_1k_224_ema.pth", help='initial weights path') # 是否冻结head以外所有权重 parser.add_argument('--freeze-layers', type=bool, default=False) parser.add_argument('--device', default='cuda:0', help='device id (i.e. 0 or 0,1 or cpu)') opt = parser.parse_args() main(opt)
注意:其中带有很多#的行的代码,可以注释掉
utils.py
import os import sys import json import math import torch from PIL import Image from matplotlib import pyplot as plt from tqdm import tqdm def train_one_epoch(model, optimizer, data_loader, device, epoch, lr_scheduler): model.train() loss_function = torch.nn.CrossEntropyLoss() accu_loss = torch.zeros(1).to(device) # 累计损失 accu_num = torch.zeros(1).to(device) # 累计预测正确的样本数 optimizer.zero_grad() sample_num = 0 data_loader = tqdm(data_loader, file=sys.stdout) for step, data in enumerate(data_loader): images, labels = data sample_num += images.shape[0] pred = model(images.to(device)) pred_classes = torch.max(pred, dim=1)[1] accu_num += torch.eq(pred_classes, labels.to(device)).sum() loss = loss_function(pred, labels.to(device)) loss.backward() accu_loss += loss.detach() data_loader.desc = "[train epoch {}] loss: {:.3f}, acc: {:.3f}, lr: {:.5f}".format( epoch+1, accu_loss.item() / (step + 1), accu_num.item() / sample_num, optimizer.param_groups[0]["lr"] ) if not torch.isfinite(loss): print('WARNING: non-finite loss, ending training ', loss) sys.exit(1) optimizer.step() optimizer.zero_grad() # update lr lr_scheduler.step() return accu_loss.item() / (step + 1), accu_num.item() / sample_num @torch.no_grad() def evaluate(model, data_loader, device, epoch): loss_function = torch.nn.CrossEntropyLoss() model.eval() accu_num = torch.zeros(1).to(device) # 累计预测正确的样本数 accu_loss = torch.zeros(1).to(device) # 累计损失 sample_num = 0 data_loader = tqdm(data_loader, file=sys.stdout) for step, data in enumerate(data_loader): images, labels = data sample_num += images.shape[0] pred = model(images.to(device)) pred_classes = torch.max(pred, dim=1)[1] accu_num += torch.eq(pred_classes, labels.to(device)).sum() loss = loss_function(pred, labels.to(device)) accu_loss += loss data_loader.desc = "[valid epoch {}] loss: {:.3f}, acc: {:.3f}".format( epoch+1, accu_loss.item() / (step + 1), accu_num.item() / sample_num ) return accu_loss.item() / (step + 1), accu_num.item() / sample_num def create_lr_scheduler(optimizer, num_step: int, epochs: int, warmup=True, warmup_epochs=1, warmup_factor=1e-3, end_factor=1e-6): assert num_step > 0 and epochs > 0 if warmup is False: warmup_epochs = 0 def f(x): """ 根据step数返回一个学习率倍率因子, 注意在训练开始之前,pytorch会提前调用一次lr_scheduler.step()方法 """ if warmup is True and x <= (warmup_epochs * num_step): alpha = float(x) / (warmup_epochs * num_step) # warmup过程中lr倍率因子从warmup_factor -> 1 return warmup_factor * (1 - alpha) + alpha else: current_step = (x - warmup_epochs * num_step) cosine_steps = (epochs - warmup_epochs) * num_step # warmup后lr倍率因子从1 -> end_factor return ((1 + math.cos(current_step * math.pi / cosine_steps)) / 2) * (1 - end_factor) + end_factor return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=f) def get_params_groups(model: torch.nn.Module, weight_decay: float = 1e-5): # 记录optimize要训练的权重参数 parameter_group_vars = {"decay": {"params": [], "weight_decay": weight_decay}, "no_decay": {"params": [], "weight_decay": 0.}} # 记录对应的权重名称 parameter_group_names = {"decay": {"params": [], "weight_decay": weight_decay}, "no_decay": {"params": [], "weight_decay": 0.}} for name, param in model.named_parameters(): if not param.requires_grad: continue # frozen weights if len(param.shape) == 1 or name.endswith(".bias"): group_name = "no_decay" else: group_name = "decay" parameter_group_vars[group_name]["params"].append(param) parameter_group_names[group_name]["params"].append(name) print("Param groups = %s" % json.dumps(parameter_group_names, indent=2)) return list(parameter_group_vars.values()) def plot_class_preds(net, images_dir: str, transform, num_plot: int = 5, device="cpu"): if not os.path.exists(images_dir): print("not found {} path, ignore add figure.".format(images_dir)) return None label_path = os.path.join(images_dir, "label.txt") if not os.path.exists(label_path): print("not found {} file, ignore add figure".format(label_path)) return None # read class_indict json_label_path = './class_indices.json' assert os.path.exists(json_label_path), "not found {}".format(json_label_path) json_file = open(json_label_path, 'r') # {"0": "daisy"} flower_class = json.load(json_file) # {"daisy": "0"} class_indices = dict((v, k) for k, v in flower_class.items()) # reading label.txt file label_info = [] with open(label_path, "r") as rd: for line in rd.readlines(): line = line.strip() if len(line) > 0: split_info = [i for i in line.split(" ") if len(i) > 0] assert len(split_info) == 2, "label format error, expect file_name and class_name" image_name, class_name = split_info image_path = os.path.join(images_dir, image_name) # 如果文件不存在,则跳过 if not os.path.exists(image_path): print("not found {}, skip.".format(image_path)) continue # 如果读取的类别不在给定的类别内,则跳过 if class_name not in class_indices.keys(): print("unrecognized category {}, skip".format(class_name)) continue label_info.append([image_path, class_name]) if len(label_info) == 0: return None # get first num_plot info if len(label_info) > num_plot: label_info = label_info[:num_plot] num_imgs = len(label_info) images = [] labels = [] for img_path, class_name in label_info: # read img img = Image.open(img_path).convert("RGB") label_index = int(class_indices[class_name]) # preprocessing img = transform(img) images.append(img) labels.append(label_index) # batching images images = torch.stack(images, dim=0).to(device) # inference with torch.no_grad(): output = net(images) probs, preds = torch.max(torch.softmax(output, dim=1), dim=1) probs = probs.cpu().numpy() preds = preds.cpu().numpy() # width, height fig = plt.figure(figsize=(num_imgs * 2.5, 3), dpi=100) for i in range(num_imgs): # 1:子图共1行,num_imgs:子图共num_imgs列,当前绘制第i+1个子图 ax = fig.add_subplot(1, num_imgs, i+1, xticks=[], yticks=[]) # CHW -> HWC npimg = images[i].cpu().numpy().transpose(1, 2, 0) # 将图像还原至标准化之前 # mean:[0.485, 0.456, 0.406], std:[0.229, 0.224, 0.225] npimg = (npimg * [0.229, 0.224, 0.225] + [0.485, 0.456, 0.406]) * 255 plt.imshow(npimg.astype('uint8')) title = "{}, {:.2f}%\n(label: {})".format( flower_class[str(preds[i])], # predict class probs[i] * 100, # predict probability flower_class[str(labels[i])] # true class ) ax.set_title(title, color=("green" if preds[i] == labels[i] else "red")) return fig
3.预测以及批量预测(predict.py和batch_predict.py)
predict.py
import os import json import torch from PIL import Image from torchvision import transforms import matplotlib.pyplot as plt from model import convnext_tiny as create_model def main(): device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print(f"using {device} device.") num_classes = 5 img_size = 224 data_transform = transforms.Compose( [transforms.Resize(int(img_size * 1.14)), transforms.CenterCrop(img_size), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]) # load image img_path = "../tulip.jpg" assert os.path.exists(img_path), "file: '{}' dose not exist.".format(img_path) img = Image.open(img_path) plt.imshow(img) # [N, C, H, W] img = data_transform(img) # expand batch dimension img = torch.unsqueeze(img, dim=0) # read class_indict json_path = './class_indices.json' assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path) json_file = open(json_path, "r") class_indict = json.load(json_file) # create model model = create_model(num_classes=num_classes).to(device) # load model weights model_weight_path = "./weights/best_model.pth" model.load_state_dict(torch.load(model_weight_path, map_location=device)) model.eval() with torch.no_grad(): # predict class output = torch.squeeze(model(img.to(device))).cpu() predict = torch.softmax(output, dim=0) predict_cla = torch.argmax(predict).numpy() print_res = "class: {} prob: {:.3}".format(class_indict[str(predict_cla)], predict[predict_cla].numpy()) plt.title(print_res) for i in range(len(predict)): print("class: {:10} prob: {:.3}".format(class_indict[str(i)], predict[i].numpy())) plt.show() if __name__ == '__main__': main()
batch_predict.py
import os import json import torch from PIL import Image from torchvision import transforms from model import convnext_tiny def main(): device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") img_size = 224 data_transform = transforms.Compose( [transforms.Resize(int(img_size * 1.14)), transforms.CenterCrop(img_size), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]) # load image # 指向需要遍历预测的图像文件夹 imgs_root = r"D:/other/ClassicalModel/data/flower_datas/train/daisy" assert os.path.exists(imgs_root), f"file: '{imgs_root}' dose not exist." # 读取指定文件夹下所有jpg图像路径 img_path_list = [os.path.join(imgs_root, i) for i in os.listdir(imgs_root) if i.endswith(".jpg")] # read class_indict json_path = r"D:/other/ClassicalModel/ResNet/class_indices.json" assert os.path.exists(json_path), f"file: '{json_path}' dose not exist." json_file = open(json_path, "r") class_indict = json.load(json_file) # create model model = convnext_tiny(num_classes=5).to(device) # load model weights weights_path = r"D:/other/ClassicalModel/ConvNeXt/runs1/weights/bestmodel.pth" assert os.path.exists(weights_path), f"file: '{weights_path}' dose not exist." model.load_state_dict(torch.load(weights_path, map_location=device)) #save predicted img filename = 'record.txt' save_path = 'detect' path_num = 1 while os.path.exists(save_path + f'{path_num}'): path_num += 1 os.mkdir(save_path + f'{path_num}') f = open(save_path + f'{path_num}/' + filename, 'w') f.write("imgs_root:"+imgs_root+"\n") f.write("weights_path:"+weights_path+"\n") actual_classes="daisy" acc_num=0 all_num=len(img_path_list) # prediction model.eval() batch_size = 8 # 每次预测时将多少张图片打包成一个batch with torch.no_grad(): for ids in range(0, len(img_path_list) // batch_size): img_list = [] for img_path in img_path_list[ids * batch_size: (ids + 1) * batch_size]: assert os.path.exists(img_path), f"file: '{img_path}' dose not exist." img = Image.open(img_path) img = data_transform(img) img_list.append(img) # batch img # 将img_list列表中的所有图像打包成一个batch batch_img = torch.stack(img_list, dim=0) # predict class output = model(batch_img.to(device)).cpu() predict = torch.softmax(output, dim=1) probs, classes = torch.max(predict, dim=1) for idx, (pro, cla) in enumerate(zip(probs, classes)): print("image: {} class: {} prob: {:.3}".format(img_path_list[ids * batch_size + idx], class_indict[str(cla.numpy())], pro.numpy())) f.write("image: {} class: {} prob: {:.3}\n".format(img_path_list[ids * batch_size + idx], class_indict[str(cla.numpy())], pro.numpy())) if class_indict[str(cla.numpy())]==actual_classes: acc_num+=1 print("classes:{},acc_num:{:d},all_num:{:d},accuracy: {:.3f}".format(actual_classes,acc_num,all_num,acc_num/all_num)) f.write("classes:{},acc_num:{:d},all_num:{:d},accuracy: {:.3f}".format(actual_classes,acc_num,all_num,acc_num/all_num)) f.close() if __name__ == '__main__': main()