ConvNeXt网络介绍,搭建以及训练

简介: ConvNeXt网络介绍,搭建以及训练

ConvNeXt网络介绍



今年(2022)一月份,Facebook AI Research和UC Berkeley一起发表了一篇文章A ConvNet for the 2020s,在文章中提出了ConvNeXt纯卷积神经网络,它对标的是2021年非常火的Swin Transformer,通过一系列实验比对,在相同的FLOPs下,ConvNeXt相比Swin Transformer拥有更快的推理速度以及更高的准确率,在ImageNet 22K上ConvNeXt-XL达到了87.8%的准确率,参看下图。

cf05ef4de8f9423fa76b988bcab39fe2.png



原论文链接:https://arxiv.org/pdf/2201.03545.pdf

ConvNeXt在ResNet50模型的基础上,仿照Swin Transformer的结构进行改进而得到的纯卷积模型下面就介绍关于ConvNeXt网络的改进之处。



1.Macro design


  1. 改变stage compute ratio


Swin-T的比例是1:1:3:1 Swin-L的比例是1:1:9:1作者将ResNet50中的堆叠次数由(3, 4, 6, 3)调整成(3, 3, 9, 3),作者将stem(降采样层)换成卷积核大小为4,步距为4的卷积层。对于更大的模型,也跟进了Swin所使用的1:1:9:1。下图可以看出ResNet网络和Swin-Transformer不同网络的堆叠次数。这里改变stage compute ratio后模型准确率从78.8提高到79.4


32117bdddce346e386f6451fc2d84284.png

9d587524571f45419ddceac126964732.png



  2. 使用Patchify的stem

从Vision Transformer开始,为了将图片转化为token,图片都会先被分割成一个一个的patch,而在传统ResNet中stem层是使用一个stride=2的7x7卷积加最大池化层。作者将stem换成卷积核大小为4,步距为4的卷积层,准确率从79.4提高到79.5。



2.ResNeXt-ify


使用ResNeXt中的深度卷积


ResNeXt相比普通的ResNet而言在FLOPs以及 accuracy之间做到了更好的平衡。这里作者采用的是更激进的depthwise convolution(即分组数等于输入通道数),准确率从79.5到80.5。

b340a06c738b433eb9c2af95a0e99a1e.png



3.使用Inverted bottleneck(反瓶颈结构


MLP模块非常像MobileNetV2中的Inverted Bottleneck模块,即两头细中间粗。然后修改反瓶颈中卷积层位置,Moving up depthwise conv layer,将depthwise conv模块上移,原来是 1x1 conv -> depthwise conv -> 1x1 conv现在变成depthwise conv -> 1x1 conv -> 1x1 conv,准确率从80.5提高到80.6。


b98e59bd439448569d3c6378b2a1db8a.png



4.使用更大的卷积核


由于Swin-T中使用了7x7卷积核,这一步主要是为了对齐比较。又因为inverted bottleneck放大了中间卷积层的缘故,直接替换会导致参数量增大,因而作者把dw conv的位置进行了调整,放到了反瓶颈的开头。最终结果相近,说明在7x7在相同参数量下效果是一致的。将depthwise conv的卷积核大小由3x3改成了7x7,准确率从80.6到80.6。


3e642ed1e93f4958b7e44c83ea7c13c0.png



5.Micro designs(其它小调整)


1.Replacing ReLU with GELU( 用GELU激活函数替换ReLU),准确率从80.6到80.6


2.Fewer activation functions,(减少激活层数量),由于Transformer中只使用了一个激活层,因此在设计上进行了效仿,结果发现只在block中的两个1x1卷积之间使用一层激活层,其他地方不适用,反而带来了0.7个点的提升。这说明太频繁地做非线性投影对于网络特征的信息传递实际上是有害的,准确率从80.6到81.3。


3.Fewer normalization layers( 减少归一化层数量),基于跟减少激活层相同的逻辑,由于Transformer中BN层很少,本文也只保留了1x1卷积之前的一层BN,而两个1x1卷积层之间甚至没有使用归一化层,准确率从81.3到81.4。


4.Substituting BN with LN(用LN替换BN),由于Transformer中使用了LN,且一些研究发现BN会对网络性能带来一些负面影响,本文将所有的BN替换为LN,准确率从81.4到81.5。


5.Separate downsampling layers( 单独的下采样层),标准ResNet的下采样层通常是stride=2的3x3卷积,对于有残差结构的block则在短路连接中使用stride=2的1x1卷积,这使得CNN的下采样层基本与其他层保持了相似的计算策略。而Swin-T中的下采样层是单独的,因此本文用stride=2的2x2卷积进行模拟。又因为这样会使训练不稳定,因此在每个下采样层前面增加了LN来稳定训练,准确率从81.5到82.0。


b49ad9c3950a4ed69fa2241bc09804e8.png

整体来说,ConvNeXt-Tiny模型表示在下图,训练使用AdamW优化器。



总结:

ConvNeXt是一个向transformer网络靠拢的cnn模型,从作者的实验看出,每一点精度的提升都是经过大量的实验。



模型以及训练代码


训练在5分类的花数据集上在ConvNeXt-Tiny模型上准确率为92.3,在ConvNeXt-Tiny模型上准确率为93.2(可以更高,这里只训练了20epochs)


1.模型代码(model.py)

"""
original code from facebook research:
https://github.com/facebookresearch/ConvNeXt
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
def drop_path(x, drop_prob: float = 0., training: bool = False):
    """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
    This is the same as the DropConnect impl I created for EfficientNet, etc networks, however,
    the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper...
    See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for
    changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use
    'survival rate' as the argument.
    """
    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1)  # work with diff dim tensors, not just 2D ConvNets
    random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
    random_tensor.floor_()  # binarize
    output = x.div(keep_prob) * random_tensor
    return output
class DropPath(nn.Module):
    """Drop paths (Stochastic Depth) per sample  (when applied in main path of residual blocks).
    """
    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob
    def forward(self, x):
        return drop_path(x, self.drop_prob, self.training)
class LayerNorm(nn.Module):
    r""" LayerNorm that supports two data formats: channels_last (default) or channels_first.
    The ordering of the dimensions in the inputs. channels_last corresponds to inputs with
    shape (batch_size, height, width, channels) while channels_first corresponds to inputs
    with shape (batch_size, channels, height, width).
    """
    def __init__(self, normalized_shape, eps=1e-6, data_format="channels_last"):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(normalized_shape), requires_grad=True)
        self.bias = nn.Parameter(torch.zeros(normalized_shape), requires_grad=True)
        self.eps = eps
        self.data_format = data_format
        if self.data_format not in ["channels_last", "channels_first"]:
            raise ValueError(f"not support data format '{self.data_format}'")
        self.normalized_shape = (normalized_shape,)
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        if self.data_format == "channels_last":
            return F.layer_norm(x, self.normalized_shape, self.weight, self.bias, self.eps)
        elif self.data_format == "channels_first":
            # [batch_size, channels, height, width]
            mean = x.mean(1, keepdim=True)
            var = (x - mean).pow(2).mean(1, keepdim=True)
            x = (x - mean) / torch.sqrt(var + self.eps)
            x = self.weight[:, None, None] * x + self.bias[:, None, None]
            return x
class Block(nn.Module):
    r""" ConvNeXt Block. There are two equivalent implementations:
    (1) DwConv -> LayerNorm (channels_first) -> 1x1 Conv -> GELU -> 1x1 Conv; all in (N, C, H, W)
    (2) DwConv -> Permute to (N, H, W, C); LayerNorm (channels_last) -> Linear -> GELU -> Linear; Permute back
    We use (2) as we find it slightly faster in PyTorch
    Args:
        dim (int): Number of input channels.
        drop_rate (float): Stochastic depth rate. Default: 0.0
        layer_scale_init_value (float): Init value for Layer Scale. Default: 1e-6.
    """
    def __init__(self, dim, drop_rate=0., layer_scale_init_value=1e-6):
        super().__init__()
        self.dwconv = nn.Conv2d(dim, dim, kernel_size=7, padding=3, groups=dim)  # depthwise conv
        self.norm = LayerNorm(dim, eps=1e-6, data_format="channels_last")
        self.pwconv1 = nn.Linear(dim, 4 * dim)  # pointwise/1x1 convs, implemented with linear layers
        self.act = nn.GELU()
        self.pwconv2 = nn.Linear(4 * dim, dim)
        self.gamma = nn.Parameter(layer_scale_init_value * torch.ones((dim,)),
                                  requires_grad=True) if layer_scale_init_value > 0 else None
        self.drop_path = DropPath(drop_rate) if drop_rate > 0. else nn.Identity()
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        shortcut = x
        x = self.dwconv(x)
        x = x.permute(0, 2, 3, 1)  # [N, C, H, W] -> [N, H, W, C]
        x = self.norm(x)
        x = self.pwconv1(x)
        x = self.act(x)
        x = self.pwconv2(x)
        if self.gamma is not None:
            x = self.gamma * x
        x = x.permute(0, 3, 1, 2)  # [N, H, W, C] -> [N, C, H, W]
        x = shortcut + self.drop_path(x)
        return x
class ConvNeXt(nn.Module):
    r""" ConvNeXt
        A PyTorch impl of : `A ConvNet for the 2020s`  -
          https://arxiv.org/pdf/2201.03545.pdf
    Args:
        in_chans (int): Number of input image channels. Default: 3
        num_classes (int): Number of classes for classification head. Default: 1000
        depths (tuple(int)): Number of blocks at each stage. Default: [3, 3, 9, 3]
        dims (int): Feature dimension at each stage. Default: [96, 192, 384, 768]
        drop_path_rate (float): Stochastic depth rate. Default: 0.
        layer_scale_init_value (float): Init value for Layer Scale. Default: 1e-6.
        head_init_scale (float): Init scaling value for classifier weights and biases. Default: 1.
    """
    def __init__(self, in_chans: int = 3, num_classes: int = 1000, depths: list = None,
                 dims: list = None, drop_path_rate: float = 0., layer_scale_init_value: float = 1e-6,
                 head_init_scale: float = 1.):
        super().__init__()
        self.downsample_layers = nn.ModuleList()  # stem and 3 intermediate downsampling conv layers
        stem = nn.Sequential(nn.Conv2d(in_chans, dims[0], kernel_size=4, stride=4),
                             LayerNorm(dims[0], eps=1e-6, data_format="channels_first"))
        self.downsample_layers.append(stem)
        # 对应stage2-stage4前的3个downsample
        for i in range(3):
            downsample_layer = nn.Sequential(LayerNorm(dims[i], eps=1e-6, data_format="channels_first"),
                                             nn.Conv2d(dims[i], dims[i+1], kernel_size=2, stride=2))
            self.downsample_layers.append(downsample_layer)
        self.stages = nn.ModuleList()  # 4 feature resolution stages, each consisting of multiple blocks
        dp_rates = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]
        cur = 0
        # 构建每个stage中堆叠的block
        for i in range(4):
            stage = nn.Sequential(
                *[Block(dim=dims[i], drop_rate=dp_rates[cur + j], layer_scale_init_value=layer_scale_init_value)
                  for j in range(depths[i])]
            )
            self.stages.append(stage)
            cur += depths[i]
        self.norm = nn.LayerNorm(dims[-1], eps=1e-6)  # final norm layer
        self.head = nn.Linear(dims[-1], num_classes)
        self.apply(self._init_weights)
        self.head.weight.data.mul_(head_init_scale)
        self.head.bias.data.mul_(head_init_scale)
    def _init_weights(self, m):
        if isinstance(m, (nn.Conv2d, nn.Linear)):
            nn.init.trunc_normal_(m.weight, std=0.2)
            nn.init.constant_(m.bias, 0)
    def forward_features(self, x: torch.Tensor) -> torch.Tensor:
        for i in range(4):
            x = self.downsample_layers[i](x)
            x = self.stages[i](x)
        return self.norm(x.mean([-2, -1]))  # global average pooling, (N, C, H, W) -> (N, C)
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.forward_features(x)
        x = self.head(x)
        return x
def convnext_tiny(num_classes: int):
    # https://dl.fbaipublicfiles.com/convnext/convnext_tiny_1k_224_ema.pth
    model = ConvNeXt(depths=[3, 3, 9, 3],
                     dims=[96, 192, 384, 768],
                     num_classes=num_classes)
    return model
def convnext_small(num_classes: int):
    # https://dl.fbaipublicfiles.com/convnext/convnext_small_1k_224_ema.pth
    model = ConvNeXt(depths=[3, 3, 27, 3],
                     dims=[96, 192, 384, 768],
                     num_classes=num_classes)
    return model
def convnext_base(num_classes: int):
    # https://dl.fbaipublicfiles.com/convnext/convnext_base_1k_224_ema.pth
    # https://dl.fbaipublicfiles.com/convnext/convnext_base_22k_224.pth
    model = ConvNeXt(depths=[3, 3, 27, 3],
                     dims=[128, 256, 512, 1024],
                     num_classes=num_classes)
    return model
def convnext_large(num_classes: int):
    # https://dl.fbaipublicfiles.com/convnext/convnext_large_1k_224_ema.pth
    # https://dl.fbaipublicfiles.com/convnext/convnext_large_22k_224.pth
    model = ConvNeXt(depths=[3, 3, 27, 3],
                     dims=[192, 384, 768, 1536],
                     num_classes=num_classes)
    return model
def convnext_xlarge(num_classes: int):
    # https://dl.fbaipublicfiles.com/convnext/convnext_xlarge_22k_224.pth
    model = ConvNeXt(depths=[3, 3, 27, 3],
                     dims=[256, 512, 1024, 2048],
                     num_classes=num_classes)
    return model


2.训练代码(包括train.py和utils.py)

train.py

import json
import os
import argparse
import time
import torch
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms, datasets
from model import convnext_tiny as create_model
from utils import  create_lr_scheduler, get_params_groups, train_one_epoch, evaluate,plot_class_preds
def main(args):
    os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
    device = torch.device(args.device if torch.cuda.is_available() else "cpu")
    print(args)
    # 创建定义文件夹以及文件
    filename = 'record.txt'
    save_path = 'runs'
    path_num = 1
    while os.path.exists(save_path + f'{path_num}'):
        path_num += 1
    save_path = save_path + f'{path_num}'
    os.mkdir(save_path)
    f = open(save_path + "/" + filename, 'w')
    f.write("{}\n".format(args))
    # print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
    # 实例化SummaryWriter对象
    # #######################################
    tb_writer = SummaryWriter(log_dir=save_path + "/flower_experiment")
    if os.path.exists(save_path + "/weights") is False:
        os.makedirs(save_path + "/weights")
    img_size = 224
    data_transform = {
        "train": transforms.Compose([transforms.RandomResizedCrop(img_size),
                                     transforms.RandomHorizontalFlip(),
                                     transforms.ToTensor(),
                                     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
        "val": transforms.Compose([transforms.Resize(int(img_size * 1.143)),
                                   transforms.CenterCrop(img_size),
                                   transforms.ToTensor(),
                                   transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}
    # 实例化训练数据集
    train_data_set = datasets.ImageFolder(root=os.path.join(args.data_path, "train"),
                                          transform=data_transform["train"])
    # 实例化验证数据集
    val_data_set = datasets.ImageFolder(root=os.path.join(args.data_path, "val"),
                                        transform=data_transform["val"])
    # 生成class_indices.json文件,包括有模型对应的序列号
    # #######################################
    classes_list = train_data_set.class_to_idx
    cla_dict = dict((val, key) for key, val in classes_list.items())
    # write dict into json file
    json_str = json.dumps(cla_dict, indent=4)
    with open('class_indices.json', 'w') as json_file:
        json_file.write(json_str)
    batch_size = args.batch_size
    nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])  # number of workers
    print('Using {} dataloader workers every process'.format(nw))
    train_loader = torch.utils.data.DataLoader(train_data_set,
                                               batch_size=batch_size,
                                               shuffle=True,
                                               pin_memory=True,
                                               num_workers=nw)
    val_loader = torch.utils.data.DataLoader(val_data_set,
                                             batch_size=batch_size,
                                             shuffle=False,
                                             pin_memory=True,
                                             num_workers=nw)
    model = create_model(num_classes=args.num_classes).to(device)
    # Write the model into tensorboard
    # #######################################
    init_img = torch.zeros((1, 3, 224, 224), device=device)
    tb_writer.add_graph(model, init_img)
    if args.weights != "":
        assert os.path.exists(args.weights), "weights file: '{}' not exist.".format(args.weights)
        # weights_dict = torch.load(args.weights, map_location=device)
        weights_dict = torch.load(args.weights, map_location=device)["model"]
        # 删除有关分类类别的权重
        for k in list(weights_dict.keys()):
            if "head" in k:
                del weights_dict[k]
        print(model.load_state_dict(weights_dict, strict=False))
    if args.freeze_layers:
        for name, para in model.named_parameters():
            # 除head外,其他权重全部冻结
            if "head" not in name:
                para.requires_grad_(False)
            else:
                print("training {}".format(name))
    # pg = [p for p in model.parameters() if p.requires_grad]
    pg = get_params_groups(model, weight_decay=args.wd)
    optimizer = optim.AdamW(pg, lr=args.lr, weight_decay=args.wd)
    lr_scheduler = create_lr_scheduler(optimizer, len(train_loader), args.epochs,
                                       warmup=True, warmup_epochs=1)
    best_acc = 0.0
    for epoch in range(args.epochs):
        # 计时器time_start
        time_start = time.time()
        # train
        train_loss, train_acc = train_one_epoch(model=model,
                                                optimizer=optimizer,
                                                data_loader=train_loader,
                                                device=device,
                                                epoch=epoch,
                                                lr_scheduler=lr_scheduler)
        # validate
        val_loss, val_acc = evaluate(model=model,
                                     data_loader=val_loader,
                                     device=device,
                                     epoch=epoch)
        time_end = time.time()
        f.write("[epoch {}] train_loss: {:.3f},train_acc:{:.3f},val_loss:{:.3f},val_acc:{:.3f},Spend_time:{:.3f}S"
                .format(epoch + 1, train_loss, train_acc, val_loss, val_acc, time_end - time_start))
        f.flush()
        # add Training results into tensorboard
        # #######################################
        tags = ["train_loss", "train_acc", "val_loss", "val_acc", "learning_rate"]
        tb_writer.add_scalar(tags[0], train_loss, epoch)
        tb_writer.add_scalar(tags[1], train_acc, epoch)
        tb_writer.add_scalar(tags[2], val_loss, epoch)
        tb_writer.add_scalar(tags[3], val_acc, epoch)
        tb_writer.add_scalar(tags[4], optimizer.param_groups[0]["lr"], epoch)
        # add figure into tensorboard
        # #######################################
        fig = plot_class_preds(net=model,
                               images_dir=r"D:/other/ClassicalModel/data/flower_datas/plot_img",
                               transform=data_transform["val"],
                               num_plot=6,
                               device=device)
        if fig is not None:
            tb_writer.add_figure("predictions vs. actuals",
                                 figure=fig,
                                 global_step=epoch)
        if val_acc > best_acc:
            best_acc = val_acc
            f.write(',save best model')
            torch.save(model.state_dict(), save_path + "/weights/bestmodel.pth")
        f.write('\n')
    f.close()
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--num_classes', type=int, default=5)
    parser.add_argument('--epochs', type=int, default=1)
    parser.add_argument('--batch-size', type=int, default=4)
    parser.add_argument('--lr', type=float, default=5e-4)
    parser.add_argument('--wd', type=float, default=5e-2)
    parser.add_argument('--data-path', type=str,
                        default=r"D:/other/ClassicalModel/data/flower_datas")
    parser.add_argument('--weights', type=str,
                        # default=r"D:/other/ClassicalModel/ConvNext/weights_pre/convnext_tiny_1k_224_ema.pth",
                        help='initial weights path')
    # 是否冻结head以外所有权重
    parser.add_argument('--freeze-layers', type=bool, default=False)
    parser.add_argument('--device', default='cuda:0', help='device id (i.e. 0 or 0,1 or cpu)')
    opt = parser.parse_args()
    main(opt)


注意:其中带有很多#的行的代码,可以注释掉

utils.py

import os
import sys
import json
import math
import torch
from PIL import Image
from matplotlib import pyplot as plt
from tqdm import tqdm
def train_one_epoch(model, optimizer, data_loader, device, epoch, lr_scheduler):
    model.train()
    loss_function = torch.nn.CrossEntropyLoss()
    accu_loss = torch.zeros(1).to(device)  # 累计损失
    accu_num = torch.zeros(1).to(device)   # 累计预测正确的样本数
    optimizer.zero_grad()
    sample_num = 0
    data_loader = tqdm(data_loader, file=sys.stdout)
    for step, data in enumerate(data_loader):
        images, labels = data
        sample_num += images.shape[0]
        pred = model(images.to(device))
        pred_classes = torch.max(pred, dim=1)[1]
        accu_num += torch.eq(pred_classes, labels.to(device)).sum()
        loss = loss_function(pred, labels.to(device))
        loss.backward()
        accu_loss += loss.detach()
        data_loader.desc = "[train epoch {}] loss: {:.3f}, acc: {:.3f}, lr: {:.5f}".format(
            epoch+1,
            accu_loss.item() / (step + 1),
            accu_num.item() / sample_num,
            optimizer.param_groups[0]["lr"]
        )
        if not torch.isfinite(loss):
            print('WARNING: non-finite loss, ending training ', loss)
            sys.exit(1)
        optimizer.step()
        optimizer.zero_grad()
        # update lr
        lr_scheduler.step()
    return accu_loss.item() / (step + 1), accu_num.item() / sample_num
@torch.no_grad()
def evaluate(model, data_loader, device, epoch):
    loss_function = torch.nn.CrossEntropyLoss()
    model.eval()
    accu_num = torch.zeros(1).to(device)   # 累计预测正确的样本数
    accu_loss = torch.zeros(1).to(device)  # 累计损失
    sample_num = 0
    data_loader = tqdm(data_loader, file=sys.stdout)
    for step, data in enumerate(data_loader):
        images, labels = data
        sample_num += images.shape[0]
        pred = model(images.to(device))
        pred_classes = torch.max(pred, dim=1)[1]
        accu_num += torch.eq(pred_classes, labels.to(device)).sum()
        loss = loss_function(pred, labels.to(device))
        accu_loss += loss
        data_loader.desc = "[valid epoch {}] loss: {:.3f}, acc: {:.3f}".format(
            epoch+1,
            accu_loss.item() / (step + 1),
            accu_num.item() / sample_num
        )
    return accu_loss.item() / (step + 1), accu_num.item() / sample_num
def create_lr_scheduler(optimizer,
                        num_step: int,
                        epochs: int,
                        warmup=True,
                        warmup_epochs=1,
                        warmup_factor=1e-3,
                        end_factor=1e-6):
    assert num_step > 0 and epochs > 0
    if warmup is False:
        warmup_epochs = 0
    def f(x):
        """
        根据step数返回一个学习率倍率因子,
        注意在训练开始之前,pytorch会提前调用一次lr_scheduler.step()方法
        """
        if warmup is True and x <= (warmup_epochs * num_step):
            alpha = float(x) / (warmup_epochs * num_step)
            # warmup过程中lr倍率因子从warmup_factor -> 1
            return warmup_factor * (1 - alpha) + alpha
        else:
            current_step = (x - warmup_epochs * num_step)
            cosine_steps = (epochs - warmup_epochs) * num_step
            # warmup后lr倍率因子从1 -> end_factor
            return ((1 + math.cos(current_step * math.pi / cosine_steps)) / 2) * (1 - end_factor) + end_factor
    return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=f)
def get_params_groups(model: torch.nn.Module, weight_decay: float = 1e-5):
    # 记录optimize要训练的权重参数
    parameter_group_vars = {"decay": {"params": [], "weight_decay": weight_decay},
                            "no_decay": {"params": [], "weight_decay": 0.}}
    # 记录对应的权重名称
    parameter_group_names = {"decay": {"params": [], "weight_decay": weight_decay},
                             "no_decay": {"params": [], "weight_decay": 0.}}
    for name, param in model.named_parameters():
        if not param.requires_grad:
            continue  # frozen weights
        if len(param.shape) == 1 or name.endswith(".bias"):
            group_name = "no_decay"
        else:
            group_name = "decay"
        parameter_group_vars[group_name]["params"].append(param)
        parameter_group_names[group_name]["params"].append(name)
    print("Param groups = %s" % json.dumps(parameter_group_names, indent=2))
    return list(parameter_group_vars.values())
def plot_class_preds(net,
                     images_dir: str,
                     transform,
                     num_plot: int = 5,
                     device="cpu"):
    if not os.path.exists(images_dir):
        print("not found {} path, ignore add figure.".format(images_dir))
        return None
    label_path = os.path.join(images_dir, "label.txt")
    if not os.path.exists(label_path):
        print("not found {} file, ignore add figure".format(label_path))
        return None
    # read class_indict
    json_label_path = './class_indices.json'
    assert os.path.exists(json_label_path), "not found {}".format(json_label_path)
    json_file = open(json_label_path, 'r')
    # {"0": "daisy"}
    flower_class = json.load(json_file)
    # {"daisy": "0"}
    class_indices = dict((v, k) for k, v in flower_class.items())
    # reading label.txt file
    label_info = []
    with open(label_path, "r") as rd:
        for line in rd.readlines():
            line = line.strip()
            if len(line) > 0:
                split_info = [i for i in line.split(" ") if len(i) > 0]
                assert len(split_info) == 2, "label format error, expect file_name and class_name"
                image_name, class_name = split_info
                image_path = os.path.join(images_dir, image_name)
                # 如果文件不存在,则跳过
                if not os.path.exists(image_path):
                    print("not found {}, skip.".format(image_path))
                    continue
                # 如果读取的类别不在给定的类别内,则跳过
                if class_name not in class_indices.keys():
                    print("unrecognized category {}, skip".format(class_name))
                    continue
                label_info.append([image_path, class_name])
    if len(label_info) == 0:
        return None
    # get first num_plot info
    if len(label_info) > num_plot:
        label_info = label_info[:num_plot]
    num_imgs = len(label_info)
    images = []
    labels = []
    for img_path, class_name in label_info:
        # read img
        img = Image.open(img_path).convert("RGB")
        label_index = int(class_indices[class_name])
        # preprocessing
        img = transform(img)
        images.append(img)
        labels.append(label_index)
    # batching images
    images = torch.stack(images, dim=0).to(device)
    # inference
    with torch.no_grad():
        output = net(images)
        probs, preds = torch.max(torch.softmax(output, dim=1), dim=1)
        probs = probs.cpu().numpy()
        preds = preds.cpu().numpy()
    # width, height
    fig = plt.figure(figsize=(num_imgs * 2.5, 3), dpi=100)
    for i in range(num_imgs):
        # 1:子图共1行,num_imgs:子图共num_imgs列,当前绘制第i+1个子图
        ax = fig.add_subplot(1, num_imgs, i+1, xticks=[], yticks=[])
        # CHW -> HWC
        npimg = images[i].cpu().numpy().transpose(1, 2, 0)
        # 将图像还原至标准化之前
        # mean:[0.485, 0.456, 0.406], std:[0.229, 0.224, 0.225]
        npimg = (npimg * [0.229, 0.224, 0.225] + [0.485, 0.456, 0.406]) * 255
        plt.imshow(npimg.astype('uint8'))
        title = "{}, {:.2f}%\n(label: {})".format(
            flower_class[str(preds[i])],  # predict class
            probs[i] * 100,  # predict probability
            flower_class[str(labels[i])]  # true class
        )
        ax.set_title(title, color=("green" if preds[i] == labels[i] else "red"))
    return fig



3.预测以及批量预测(predict.py和batch_predict.py)

predict.py

import os
import json
import torch
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
from model import convnext_tiny as create_model
def main():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"using {device} device.")
    num_classes = 5
    img_size = 224
    data_transform = transforms.Compose(
        [transforms.Resize(int(img_size * 1.14)),
         transforms.CenterCrop(img_size),
         transforms.ToTensor(),
         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
    # load image
    img_path = "../tulip.jpg"
    assert os.path.exists(img_path), "file: '{}' dose not exist.".format(img_path)
    img = Image.open(img_path)
    plt.imshow(img)
    # [N, C, H, W]
    img = data_transform(img)
    # expand batch dimension
    img = torch.unsqueeze(img, dim=0)
    # read class_indict
    json_path = './class_indices.json'
    assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path)
    json_file = open(json_path, "r")
    class_indict = json.load(json_file)
    # create model
    model = create_model(num_classes=num_classes).to(device)
    # load model weights
    model_weight_path = "./weights/best_model.pth"
    model.load_state_dict(torch.load(model_weight_path, map_location=device))
    model.eval()
    with torch.no_grad():
        # predict class
        output = torch.squeeze(model(img.to(device))).cpu()
        predict = torch.softmax(output, dim=0)
        predict_cla = torch.argmax(predict).numpy()
    print_res = "class: {}   prob: {:.3}".format(class_indict[str(predict_cla)],
                                                 predict[predict_cla].numpy())
    plt.title(print_res)
    for i in range(len(predict)):
        print("class: {:10}   prob: {:.3}".format(class_indict[str(i)],
                                                  predict[i].numpy()))
    plt.show()
if __name__ == '__main__':
    main()


batch_predict.py

import os
import json
import torch
from PIL import Image
from torchvision import transforms
from model import convnext_tiny
def main():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    img_size = 224
    data_transform = transforms.Compose(
        [transforms.Resize(int(img_size * 1.14)),
         transforms.CenterCrop(img_size),
         transforms.ToTensor(),
         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
    # load image
    # 指向需要遍历预测的图像文件夹
    imgs_root = r"D:/other/ClassicalModel/data/flower_datas/train/daisy"
    assert os.path.exists(imgs_root), f"file: '{imgs_root}' dose not exist."
    # 读取指定文件夹下所有jpg图像路径
    img_path_list = [os.path.join(imgs_root, i) for i in os.listdir(imgs_root) if i.endswith(".jpg")]
    # read class_indict
    json_path = r"D:/other/ClassicalModel/ResNet/class_indices.json"
    assert os.path.exists(json_path), f"file: '{json_path}' dose not exist."
    json_file = open(json_path, "r")
    class_indict = json.load(json_file)
    # create model
    model = convnext_tiny(num_classes=5).to(device)
    # load model weights
    weights_path = r"D:/other/ClassicalModel/ConvNeXt/runs1/weights/bestmodel.pth"
    assert os.path.exists(weights_path), f"file: '{weights_path}' dose not exist."
    model.load_state_dict(torch.load(weights_path, map_location=device))
    #save predicted img
    filename = 'record.txt'
    save_path = 'detect'
    path_num = 1
    while os.path.exists(save_path + f'{path_num}'):
        path_num += 1
    os.mkdir(save_path + f'{path_num}')
    f = open(save_path + f'{path_num}/' + filename, 'w')
    f.write("imgs_root:"+imgs_root+"\n")
    f.write("weights_path:"+weights_path+"\n")
    actual_classes="daisy"
    acc_num=0
    all_num=len(img_path_list)
    # prediction
    model.eval()
    batch_size = 8  # 每次预测时将多少张图片打包成一个batch
    with torch.no_grad():
        for ids in range(0, len(img_path_list) // batch_size):
            img_list = []
            for img_path in img_path_list[ids * batch_size: (ids + 1) * batch_size]:
                assert os.path.exists(img_path), f"file: '{img_path}' dose not exist."
                img = Image.open(img_path)
                img = data_transform(img)
                img_list.append(img)
            # batch img
            # 将img_list列表中的所有图像打包成一个batch
            batch_img = torch.stack(img_list, dim=0)
            # predict class
            output = model(batch_img.to(device)).cpu()
            predict = torch.softmax(output, dim=1)
            probs, classes = torch.max(predict, dim=1)
            for idx, (pro, cla) in enumerate(zip(probs, classes)):
                print("image: {}  class: {}  prob: {:.3}".format(img_path_list[ids * batch_size + idx],
                                                                 class_indict[str(cla.numpy())],
                                                                 pro.numpy()))
                f.write("image: {}  class: {}  prob: {:.3}\n".format(img_path_list[ids * batch_size + idx],
                                                                 class_indict[str(cla.numpy())],
                                                                 pro.numpy()))
                if class_indict[str(cla.numpy())]==actual_classes:
                    acc_num+=1
    print("classes:{},acc_num:{:d},all_num:{:d},accuracy: {:.3f}".format(actual_classes,acc_num,all_num,acc_num/all_num))
    f.write("classes:{},acc_num:{:d},all_num:{:d},accuracy: {:.3f}".format(actual_classes,acc_num,all_num,acc_num/all_num))
    f.close()
if __name__ == '__main__':
    main()
相关文章
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
ICLR 2024 Spotlight:训练一个图神经网络即可解决图领域所有分类问题!
【2月更文挑战第17天】ICLR 2024 Spotlight:训练一个图神经网络即可解决图领域所有分类问题!
107 2
ICLR 2024 Spotlight:训练一个图神经网络即可解决图领域所有分类问题!
|
2月前
|
机器学习/深度学习 算法 数据挖掘
神经网络训练失败的原因总结 !!
神经网络训练失败的原因总结 !!
60 0
|
2月前
|
机器学习/深度学习 PyTorch 算法框架/工具
【PyTorch实战演练】AlexNet网络模型构建并使用Cifar10数据集进行批量训练(附代码)
【PyTorch实战演练】AlexNet网络模型构建并使用Cifar10数据集进行批量训练(附代码)
275 0
|
2月前
|
机器学习/深度学习 PyTorch 算法框架/工具
【PyTorch实战演练】使用Cifar10数据集训练LeNet5网络并实现图像分类(附代码)
【PyTorch实战演练】使用Cifar10数据集训练LeNet5网络并实现图像分类(附代码)
258 0
|
10天前
|
机器学习/深度学习 算法
**反向传播算法**在多层神经网络训练中至关重要,它包括**前向传播**、**计算损失**、**反向传播误差**和**权重更新**。
【6月更文挑战第28天】**反向传播算法**在多层神经网络训练中至关重要,它包括**前向传播**、**计算损失**、**反向传播误差**和**权重更新**。数据从输入层流经隐藏层到输出层,计算预测值。接着,比较预测与真实值计算损失。然后,从输出层开始,利用链式法则反向计算误差和梯度,更新权重以减小损失。此过程迭代进行,直到损失收敛或达到训练次数,优化模型性能。反向传播实现了自动微分,使模型能适应训练数据并泛化到新数据。
21 2
|
25天前
|
机器学习/深度学习 PyTorch 算法框架/工具
【从零开始学习深度学习】36. 门控循环神经网络之长短期记忆网络(LSTM)介绍、Pytorch实现LSTM并进行训练预测
【从零开始学习深度学习】36. 门控循环神经网络之长短期记忆网络(LSTM)介绍、Pytorch实现LSTM并进行训练预测
|
2月前
|
机器学习/深度学习
简单通用:视觉基础网络最高3倍无损训练加速,清华EfficientTrain++入选TPAMI 2024
【5月更文挑战第30天】清华大学研究团队提出的EfficientTrain++是一种新型训练方法,旨在加速视觉基础网络(如ResNet、ConvNeXt、DeiT)的训练,最高可达3倍速度提升,同时保持模型准确性。该方法基于傅里叶谱裁剪和动态数据增强,实现了课程学习的创新应用。在ImageNet-1K/22K数据集上,EfficientTrain++能有效减少多种模型的训练时间,且在自监督学习任务中表现出色。尽管面临适应性与稳定性的挑战,EfficientTrain++为深度学习模型的高效训练开辟了新途径,对学术和工业界具有重要意义。
40 4
|
2月前
|
机器学习/深度学习 算法
ICLR 2024 Oral:用巧妙的传送技巧,让神经网络的训练更加高效
【5月更文挑战第21天】ICLR 2024 Oral 提出了一种名为“传送”的新方法,利用参数对称性提升神经网络训练效率。该方法通过参数变换加速收敛,改善泛化能力,减少了训练所需的计算资源和时间。研究显示,传送能将模型移到不同曲率的极小值点,可能有助于泛化。论文还探讨了将传送应用于元学习等优化算法的潜力,但对传送加速优化的确切机制理解尚不深入,且实际应用效果有待更多验证。[论文链接](https://openreview.net/forum?id=L0r0GphlIL)
36 2
|
25天前
|
机器学习/深度学习 PyTorch 算法框架/工具
【从零开始学习深度学习】35. 门控循环神经网络之门控循环单元(gated recurrent unit,GRU)介绍、Pytorch实现GRU并进行训练预测
【从零开始学习深度学习】35. 门控循环神经网络之门控循环单元(gated recurrent unit,GRU)介绍、Pytorch实现GRU并进行训练预测
|
2月前
|
机器学习/深度学习 数据采集 算法

热门文章

最新文章