1模型架构
PP-YOLOE由以下方法组成:
- 可扩展的backbone和neck
- Task Alignment Learning
- Efficient Task-aligned head with DFL和VFL
- SiLU激活函数
1.1、Backbone
PP-YOLOE的Backbone主要是使用RepVGG模块以及CSP的模型思想对ResNet及逆行的改进,同时也使用了SiLU激活函数、Effitive SE Attention等模块,下面我们一一道来。
1、RepVGG
RepVGG,这个网络就是在VGG的基础上面进行改进,主要的思路包括:
- 在VGG网络的Block块中加入了Identity和残差分支,相当于把ResNet网络中的精华应用 到VGG网络中;
- 模型推理阶段,通过Op融合策略将所有的网络层都转换为3×3卷积,便于网络的部署和加速。
上图展示了模型推理阶段的重参数化过程,其实就是一个OP融合和OP替换的过程。图A从结构化的角度展示了整个重参数化流程, 图B从模型参数的角度展示了整个重参数化流程。整个重参数化步骤如下所示:
步骤1:首先通过式3将残差块中的卷积层和BN层进行融合,该操作在很多深度学习框架的推理阶段都会执行。图中的蓝色框中执行3×3卷积+BN层的融合,图中的黑色矩形框中执行1×1卷积+BN层的融合,图中的黄色矩形框中执行3×3卷积(卷积核设置为全1)+BN层的融合。其中表示转换前的卷积层参数,表示BN层的均值,表示BN层的方差,和分别表示BN层的尺度因子和偏移因子,W’和b’分别表示融合之后的卷积的权重和偏置。
步骤2:将融合后的卷积层转换为3×3卷积,即将具体不同卷积核的卷积均转换为具有3×3大小的卷积核的卷积。由于整个残差块中可能包含1×1卷积分支和Identity两种分支,如图中的黑框和黄框所示。对于1×1卷积分支而言,整个转换过程就是利用3×3卷积核替换1×1卷积核,具体的细节如图中的紫框所示,即将1×1卷积核中的数值移动到3×3卷积核的中心点即可;对于Identity分支而言,该分支并没有改变输入的特征映射的数值,那么可以设置一个3×3的卷积核,将所有的9个位置处的权重值都设置为1,那么它与输入的特征映射相乘之后,保持了原来的数值,具体的细节如图中的褐色框所示。
步骤3:合并残差分支中的3×3卷积。即将所有分支的权重W和偏置B叠加起来,从而获得一个融合之后的3×3卷积层。
为什么要用VGG式模型?
除了相信简单就是美以外,VGG式极简模型至少还有5大现实的优势:
- 3×3卷积非常快。在GPU上,3×3卷积的计算密度(理论运算量除以所用时间)可达1×1和5×5卷积的4倍。
- 单路架构非常快,因为并行度高。同样的计算量,“大而整”的运算效率远超“小而碎”的运算。
- 单路架构省内存。例如,ResNet的shortcut虽然不占计算量,却增加了一倍的显存占用。
- 单路架构灵活性更好,容易改变各层的宽度(如剪枝)。
- RepVGG主体部分只有一种算子:3×3卷积接ReLU。在设计专用芯片时,给定芯片尺寸或造价可以集成海量的3×3卷积+ReLU计算单元来达到很高的效率。
下图表示RepVGG推理融合后的ONNX输出,可以看出简化了很多。
class RepVggBlock(nn.Layer): def __init__(self, ch_in, ch_out, act='relu'): super(RepVggBlock, self).__init__() self.ch_in = ch_in self.ch_out = ch_out self.conv1 = ConvBNLayer(ch_in, ch_out, 3, stride=1, padding=1, act=None) self.conv2 = ConvBNLayer(ch_in, ch_out, 1, stride=1, padding=0, act=None) self.act = get_act_fn(act) if act is None or isinstance(act, (str, dict)) else act def forward(self, x): if hasattr(self, 'conv'): y = self.conv(x) else: y = self.conv1(x) + self.conv2(x) y = self.act(y) return y def convert_to_deploy(self): if not hasattr(self, 'conv'): self.conv = nn.Conv2D(in_channels=self.ch_in, out_channels=self.ch_out, kernel_size=3, stride=1, padding=1, groups=1) kernel, bias = self.get_equivalent_kernel_bias() self.conv.weight.set_value(kernel) self.conv.bias.set_value(bias) def get_equivalent_kernel_bias(self): # 融合推理 kernel3x3, bias3x3 = self._fuse_bn_tensor(self.conv1) kernel1x1, bias1x1 = self._fuse_bn_tensor(self.conv2) return kernel3x3 + self._pad_1x1_to_3x3_tensor(kernel1x1), bias3x3 + bias1x1 def _pad_1x1_to_3x3_tensor(self, kernel1x1): if kernel1x1 is None: return 0 else: return nn.functional.pad(kernel1x1, [1, 1, 1, 1]) def _fuse_bn_tensor(self, branch): if branch is None: return 0, 0 kernel = branch.conv.weight running_mean = branch.bn._mean running_var = branch.bn._variance gamma = branch.bn.weight beta = branch.bn.bias eps = branch.bn._epsilon std = (running_var + eps).sqrt() t = (gamma / std).reshape((-1, 1, 1, 1)) return kernel * t, beta - running_mean * gamma / std
2、Swish激活函数
从代码和公式来看,Swish包含了SiLU,换句话说SiLU是Swish的一种特例。
所以画图基本上都使用了SiLU代替Swish,因为YOLOE中的Swish的,也就是SiLU激活函数。
β是个常数或可训练的参数。Swish 具备无上界有下界、平滑、非单调的特性。Swish 在深层模型上的效果优于 ReLU。
例如,仅仅使用 Swish 单元替换 ReLU 就能把 Mobile NASNetA 在 ImageNet 上的 top-1 分类准确率提高 0.9%,Inception-ResNet-v 的分类准确率提高 0.6%。
class ConvBNLayer(nn.Layer): def __init__(self, ch_in, ch_out, filter_size=3, stride=1, groups=1, padding=0, act=None): super(ConvBNLayer, self).__init__() self.conv = nn.Conv2D(in_channels=ch_in, out_channels=ch_out, kernel_size=filter_size, stride=stride, padding=padding, groups=groups, bias_attr=False) self.bn = nn.BatchNorm2D(ch_out, weight_attr=ParamAttr(regularizer=L2Decay(0.0)), bias_attr=ParamAttr(regularizer=L2Decay(0.0))) self.act = get_act_fn(act) if act is None or isinstance(act, (str, dict)) else act def forward(self, x): x = self.conv(x) x = self.bn(x) x = self.act(x) return x
3、Effective SE Attention
该模块主要是来自于《CenterMask:Real-Time Anchor-Free Instance Segmentation》中的eSE模块;
在输出的内部添加了一个channel上的attention模块eSE。原始的SE模块中使用2个FC去进行channel权重映射,但是为了减少计算量通常会将FC中的channel给剪裁一些(小于输入的channel),这就引入了一些信息的损失,为此文章直接将2个FC替换为了1个FC。
class EffectiveSELayer(nn.Layer): def __init__(self, channels, act='hardsigmoid'): super(EffectiveSELayer, self).__init__() self.fc = nn.Conv2D(channels, channels, kernel_size=1, padding=0) self.act = get_act_fn(act) if act is None or isinstance(act, (str, dict)) else act def forward(self, x): x_se = x.mean((2, 3), keepdim=True) x_se = self.fc(x_se) return x * self.act(x_se)
4、CSPNet结构
CSPNet的主要思想还是Partial Dense Block,设计Partial Dense Block的目的是:
- 增加梯度路径:通过分裂合并策略,可以使梯度路径的数目翻倍。由于采用了跨阶段的策略,可以减轻使用显式特征映射复制进行连接的缺点;
- 平衡各层的计算:通常情况下,DenseNet底层的信道数远远大于增长率。由于部分dense block中的dense layer操作所涉及的底层信道只占原始信道的一半,因此可以有效地解决近一半的计算瓶颈;
- 减少内存流量:假设dense block在DenseNet中的基本特征映射大小为,增长率为d,并且共m层。然后,该dense block的CIO为,部分dense block的CIO为。虽然m和d通常比c小得多,但部分dense block最多可以节省网络内存流量的一半。
class CSPResStage(nn.Layer): def __init__(self, block_fn, ch_in, ch_out, n, stride, act='relu', attn='eca'): super(CSPResStage, self).__init__() ch_mid = (ch_in + ch_out) // 2 if stride == 2: self.conv_down = ConvBNLayer(ch_in, ch_mid, 3, stride=2, padding=1, act=act) else: self.conv_down = None self.conv1 = ConvBNLayer(ch_mid, ch_mid // 2, 1, act=act) self.conv2 = ConvBNLayer(ch_mid, ch_mid // 2, 1, act=act) self.blocks = nn.Sequential(* [block_fn(ch_mid // 2, ch_mid // 2, act=act, shortcut=True) for i in range(n)]) if attn: self.attn = EffectiveSELayer(ch_mid, act='hardsigmoid') else: self.attn = None self.conv3 = ConvBNLayer(ch_mid, ch_out, 1, act=act) def forward(self, x): if self.conv_down is not None: x = self.conv_down(x) y1 = self.conv1(x) y2 = self.blocks(self.conv2(x)) y = paddle.concat([y1, y2], axis=1) if self.attn is not None: y = self.attn(y) y = self.conv3(y) return y
5、SPP结构
SPP-Net全名为Spatial Pyramid Pooling(空间金字塔池化结构),2015年由微软研究院的何恺明提出,主要解决2个问题:
- 有效避免了R-CNN算法对图像区域剪裁、缩放操作导致的图像物体剪裁不全以及形状扭曲等问题。
- 解决了卷积神经网络对图像重复特征提取的问题,大大提高了产生候选框的速度,且节省了计算成本。
SPP 显著特点
- 不管输入尺寸是怎样,SPP 可以产生固定大小的输出
- 使用多个窗口(pooling window)
- SPP 可以使用同一图像不同尺寸(scale)作为输入, 得到同样长度的池化特征。
其它特点
- 由于对输入图像的不同纵横比和不同尺寸,SPP同样可以处理,所以提高了图像的尺度不变(scale-invariance)和降低了过拟合(over-fitting)
- 实验表明训练图像尺寸的多样性比单一尺寸的训练图像更容易使得网络收敛(convergence)
- SPP 对于特定的CNN网络设计和结构是独立的。(也就是说,只要把SPP放在最后一层卷积层后面,对网络的结构是没有影响的, 它只是替换了原来的pooling层)
- 不仅可以用于图像分类而且可以用来目标检测
通过spp模块实现局部特征和全局特征(所以空间金字塔池化结构的最大的池化核要尽可能的接近等于需要池化的featherMap的大小)的featherMap级别的融合,丰富最终特征图的表达能力,从而提高MAP。
class SPP(nn.Layer): def __init__(self, ch_in, ch_out, k, pool_size, act='swish', data_format='NCHW'): super(SPP, self).__init__() self.pool = [] self.data_format = data_format for i, size in enumerate(pool_size): pool = self.add_sublayer('pool{}'.format(i), nn.MaxPool2D(kernel_size=size, stride=1, padding=size // 2, data_format=data_format, ceil_mode=False)) self.pool.append(pool) self.conv = ConvBNLayer(ch_in, ch_out, k, padding=k // 2, act=act) def forward(self, x): outs = [x] for pool in self.pool: outs.append(pool(x)) if self.data_format == 'NCHW': y = paddle.concat(outs, axis=1) else: y = paddle.concat(outs, axis=-1) y = self.conv(y) return y class CSPStage(nn.Layer): def __init__(self, block_fn, ch_in, ch_out, n, act='swish', spp=False): super(CSPStage, self).__init__() ch_mid = int(ch_out // 2) self.conv1 = ConvBNLayer(ch_in, ch_mid, 1, act=act) self.conv2 = ConvBNLayer(ch_in, ch_mid, 1, act=act) self.convs = nn.Sequential() next_ch_in = ch_mid for i in range(n): self.convs.add_sublayer(str(i), eval(block_fn)(next_ch_in, ch_mid, act=act, shortcut=False)) if i == (n - 1) // 2 and spp: self.convs.add_sublayer('spp', SPP(ch_mid * 4, ch_mid, 1, [5, 9, 13], act=act)) next_ch_in = ch_mid self.conv3 = ConvBNLayer(ch_mid * 2, ch_out, 1, act=act) def forward(self, x): y1 = self.conv1(x) y2 = self.conv2(x) y2 = self.convs(y2) y = paddle.concat([y1, y2], axis=1) y = self.conv3(y) return y
1.2、Neck
yoloe的neck结构采用的依旧是FPN+PAN结构模式,将Neck部分用立体图画出来,更直观的看下两部分之间是如何通过FPN结构融合的。
如图所示,FPN是自顶向下的,将高层特征通过上采样和低层特征做融合得到进行预测的特征图。
和FPN层不同,yoloe在FPN层的后面还添加了一个自底向上的特征金字塔。FPN是自顶向下,将高层的强语义特征传递下来,对整个金字塔进行增强,不过只增强了语义信息,对定位信息没有传递,而本文就是针对这一点,在FPN的后面添加一个自底向上的金字塔。这样的操作是对FPN的补充,将低层的强定位特征传递上去。
class CustomCSPPAN(nn.Layer): __shared__ = ['norm_type', 'data_format', 'width_mult', 'depth_mult', 'trt'] def __init__(self, in_channels=[256, 512, 1024], out_channels=[1024, 512, 256], norm_type='bn', act='leaky', stage_fn='CSPStage', block_fn='BasicBlock', stage_num=1, block_num=3, drop_block=False, block_size=3, keep_prob=0.9, spp=False, data_format='NCHW', width_mult=1.0, depth_mult=1.0, trt=False): super(CustomCSPPAN, self).__init__() out_channels = [max(round(c * width_mult), 1) for c in out_channels] block_num = max(round(block_num * depth_mult), 1) act = get_act_fn(act, trt=trt) if act is None or isinstance(act, (str, dict)) else act self.num_blocks = len(in_channels) self.data_format = data_format self._out_channels = out_channels in_channels = in_channels[::-1] fpn_stages = [] fpn_routes = [] for i, (ch_in, ch_out) in enumerate(zip(in_channels, out_channels)): if i > 0: ch_in += ch_pre // 2 stage = nn.Sequential() for j in range(stage_num): stage.add_sublayer(str(j), eval(stage_fn)(block_fn, ch_in if j == 0 else ch_out, ch_out, block_num, act=act, spp=(spp and i == 0))) if drop_block: stage.add_sublayer('drop', DropBlock(block_size, keep_prob)) fpn_stages.append(stage) if i < self.num_blocks - 1: fpn_routes.append(ConvBNLayer(ch_in=ch_out, ch_out=ch_out // 2, filter_size=1, stride=1, padding=0, act=act)) ch_pre = ch_out self.fpn_stages = nn.LayerList(fpn_stages) self.fpn_routes = nn.LayerList(fpn_routes) pan_stages = [] pan_routes = [] for i in reversed(range(self.num_blocks - 1)): pan_routes.append(ConvBNLayer(ch_in=out_channels[i + 1], ch_out=out_channels[i + 1], filter_size=3, stride=2, padding=1, act=act)) ch_in = out_channels[i] + out_channels[i + 1] ch_out = out_channels[i] stage = nn.Sequential() for j in range(stage_num): stage.add_sublayer(str(j), eval(stage_fn)(block_fn, ch_in if j == 0 else ch_out, ch_out, block_num, act=act, spp=False)) if drop_block: stage.add_sublayer('drop', DropBlock(block_size, keep_prob)) pan_stages.append(stage) self.pan_stages = nn.LayerList(pan_stages[::-1]) self.pan_routes = nn.LayerList(pan_routes[::-1]) def forward(self, blocks, for_mot=False): blocks = blocks[::-1] fpn_feats = [] for i, block in enumerate(blocks): if i > 0: block = paddle.concat([route, block], axis=1) route = self.fpn_stages[i](block) fpn_feats.append(route) if i < self.num_blocks - 1: route = self.fpn_routes[i](route) route = F.interpolate(route, scale_factor=2., data_format=self.data_format) pan_feats = [fpn_feats[-1], ] route = fpn_feats[-1] for i in reversed(range(self.num_blocks - 1)): block = fpn_feats[i] route = self.pan_routes[i](route) block = paddle.concat([route, block], axis=1) route = self.pan_stages[i](block) pan_feats.append(route) return pan_feats[::-1]
1.3、Head
对于PP-YOLOE的head部分,其依旧是TOOD的head,也就是T-Head,主要是包括了Cls Head和Loc Head。具体来说,T-head首先在FPN特征基础上进行分类与定位预测;然后TAL基于所提任务对齐测度计算任务对齐信息;最后T-head根据从TAL传回的信息自动调整分类概率与定位预测。
由于2个任务的预测都是基于这个交互特征来完成的,但是2个任务对于特征的需求肯定是不一样的,因为作者设计了一个layer attention来为每个任务单独的调整一下特征,这个部分的结构也很简单,可以理解为是一个channel-wise的注意力机制。这样的话就得到了对于每个任务单独的特征,然后再利用这些特征生成所需要的类别或者定位的特征图。
class PPYOLOEHead(nn.Layer): __shared__ = ['num_classes', 'trt', 'exclude_nms'] __inject__ = ['static_assigner', 'assigner', 'nms'] def __init__(self, in_channels=[1024, 512, 256], num_classes=80, act='swish', fpn_strides=(32, 16, 8), grid_cell_scale=5.0, grid_cell_offset=0.5, reg_max=16, static_assigner_epoch=4, use_varifocal_loss=True, static_assigner='ATSSAssigner', assigner='TaskAlignedAssigner', nms='MultiClassNMS', eval_input_size=[], loss_weight={'class': 1.0, 'iou': 2.5, 'dfl': 0.5,}, trt=False, exclude_nms=False): super(PPYOLOEHead, self).__init__() assert len(in_channels) > 0, "len(in_channels) should > 0" self.in_channels = in_channels self.num_classes = num_classes self.fpn_strides = fpn_strides self.grid_cell_scale = grid_cell_scale self.grid_cell_offset = grid_cell_offset self.reg_max = reg_max self.iou_loss = GIoULoss() self.loss_weight = loss_weight self.use_varifocal_loss = use_varifocal_loss self.eval_input_size = eval_input_size self.static_assigner_epoch = static_assigner_epoch self.static_assigner = static_assigner self.assigner = assigner self.nms = nms self.exclude_nms = exclude_nms # stem self.stem_cls = nn.LayerList() self.stem_reg = nn.LayerList() act = get_act_fn(act, trt=trt) if act is None or isinstance(act, (str, dict)) else act for in_c in self.in_channels: self.stem_cls.append(ESEAttn(in_c, act=act)) self.stem_reg.append(ESEAttn(in_c, act=act)) # pred head self.pred_cls = nn.LayerList() self.pred_reg = nn.LayerList() for in_c in self.in_channels: self.pred_cls.append(nn.Conv2D(in_c, self.num_classes, 3, padding=1)) self.pred_reg.append(nn.Conv2D(in_c, 4 * (self.reg_max + 1), 3, padding=1)) # projection conv self.proj_conv = nn.Conv2D(self.reg_max + 1, 1, 1, bias_attr=False) self._init_weights() @classmethod def from_config(cls, cfg, input_shape): return {'in_channels': [i.channels for i in input_shape], } def forward_train(self, feats, targets): anchors, anchor_points, num_anchors_list, stride_tensor = generate_anchors_for_grid_cell(feats, self.fpn_strides, self.grid_cell_scale, self.grid_cell_offset) cls_score_list, reg_distri_list = [], [] for i, feat in enumerate(feats): avg_feat = F.adaptive_avg_pool2d(feat, (1, 1)) cls_logit = self.pred_cls[i](self.stem_cls[i](feat, avg_feat) + feat) reg_distri = self.pred_reg[i](self.stem_reg[i](feat, avg_feat)) # cls and reg cls_score = F.sigmoid(cls_logit) cls_score_list.append(cls_score.flatten(2).transpose([0, 2, 1])) reg_distri_list.append(reg_distri.flatten(2).transpose([0, 2, 1])) cls_score_list = paddle.concat(cls_score_list, axis=1) reg_distri_list = paddle.concat(reg_distri_list, axis=1) return self.get_loss([cls_score_list, reg_distri_list, anchors, anchor_points, num_anchors_list, stride_tensor], targets) def forward_eval(self, feats): if self.eval_input_size: anchor_points, stride_tensor = self.anchor_points, self.stride_tensor else: anchor_points, stride_tensor = self._generate_anchors(feats) cls_score_list, reg_dist_list = [], [] for i, feat in enumerate(feats): b, _, h, w = feat.shape l = h * w avg_feat = F.adaptive_avg_pool2d(feat, (1, 1)) cls_logit = self.pred_cls[i](self.stem_cls[i](feat, avg_feat) + feat) reg_dist = self.pred_reg[i](self.stem_reg[i](feat, avg_feat)) reg_dist = reg_dist.reshape([-1, 4, self.reg_max + 1, l]).transpose([0, 2, 1, 3]) reg_dist = self.proj_conv(F.softmax(reg_dist, axis=1)) # cls and reg cls_score = F.sigmoid(cls_logit) cls_score_list.append(cls_score.reshape([b, self.num_classes, l])) reg_dist_list.append(reg_dist.reshape([b, 4, l])) cls_score_list = paddle.concat(cls_score_list, axis=-1) reg_dist_list = paddle.concat(reg_dist_list, axis=-1) return cls_score_list, reg_dist_list, anchor_points, stride_tensor def forward(self, feats, targets=None): assert len(feats) == len(self.fpn_strides), "The size of feats is not equal to size of fpn_strides" if self.training: return self.forward_train(feats, targets) else: return self.forward_eval(feats)