1.3 ShuffleNet
ShuffleNet是由2017年07月发布的轻量级网络,设计用于移动端设备,在MobileNet之后的网络架构。主要的创新点在于使用了分组卷积(group convolution)和通道打乱(channel shuffle)。
分组卷积(group convolution)
分组卷积最早由AlexNet中使用。由于当时的硬件资源有限,训练AlexNet时卷积操作不能全部放在同一个GPU处理,因此作者把特征图分给多个GPU分别进行处理,最后把特征图结果进行连接。
对于输入特征图,将其按照channel-level分为组,每组的通道数为,因此卷积核的深度也要变为,此时可认为有个卷积核去对应处理一个特征图生成个通道的特征图。若预期输出的特征图为通道数为,则总卷积核数也是。
通道打乱(channel shuffle)
通道打乱并不是随机打乱,而是有规律地打乱。
(a):表示的是使用分组卷积处理未加打乱的特征图。每一个输出通道都对应组内的输入通道。
(b):对组卷积GConv1处理后的特征图channel进行如上方式的打乱,此时大部分输出通道会对应不同组的输入通道;
(c)是与(b)的等价实现,将通道调整到合适的位置。
ShuffleNet单元
- (a):是ResNet提出的bottleneck层,只是将3x3的卷积替换为depthwise(DW)卷积。
- (b):是1x1卷积变为pointwise分组卷积(GConv),并且加入了通道打乱。按照DW卷积的原论文,在3x3的DW后面不添加非线性因素。最后的点卷积用来匹配相加操作。
- (c):加入了stride=2去减少特征图尺寸,旁路加入平均池化层来维持相同的尺寸,最终在channel水平上连接特征图。
以下是基于PyTorch实现的ShuffleNet:
import torch import torch.nn as nn import torch.nn.functional as F class ShuffleBlock(nn.Module): def __init__(self, groups): super(ShuffleBlock, self).__init__() self.groups = groups def forward(self, x): '''Channel shuffle: [N,C,H,W] -> [N,g,C/g,H,W] -> [N,C/g,g,H,w] -> [N,C,H,W]''' N,C,H,W = x.size() g = self.groups # 维度变换之后必须要使用.contiguous()使得张量在内存连续之后才能调用view函数 return x.view(N,g,int(C/g),H,W).permute(0,2,1,3,4).contiguous().view(N,C,H,W) class Bottleneck(nn.Module): def __init__(self, in_planes, out_planes, stride, groups): super(Bottleneck, self).__init__() self.stride = stride # bottleneck层中间层的channel数变为输出channel数的1/4 mid_planes = int(out_planes/4) g = 1 if in_planes==24 else groups # 作者提到不在stage2的第一个pointwise层使用组卷积,因为输入channel数量太少,只有24 self.conv1 = nn.Conv2d(in_planes, mid_planes, kernel_size=1, groups=g, bias=False) self.bn1 = nn.BatchNorm2d(mid_planes) self.shuffle1 = ShuffleBlock(groups=g) self.conv2 = nn.Conv2d(mid_planes, mid_planes, kernel_size=3, stride=stride, padding=1, groups=mid_planes, bias=False) self.bn2 = nn.BatchNorm2d(mid_planes) self.conv3 = nn.Conv2d(mid_planes, out_planes, kernel_size=1, groups=groups, bias=False) self.bn3 = nn.BatchNorm2d(out_planes) self.shortcut = nn.Sequential() if stride == 2: self.shortcut = nn.Sequential(nn.AvgPool2d(3, stride=2, padding=1)) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.shuffle1(out) out = F.relu(self.bn2(self.conv2(out))) out = self.bn3(self.conv3(out)) res = self.shortcut(x) out = F.relu(torch.cat([out,res], 1)) if self.stride==2 else F.relu(out+res) return out class ShuffleNet(nn.Module): def __init__(self, cfg): super(ShuffleNet, self).__init__() out_planes = cfg['out_planes'] num_blocks = cfg['num_blocks'] groups = cfg['groups'] self.conv1 = nn.Conv2d(3, 24, kernel_size=1, bias=False) self.bn1 = nn.BatchNorm2d(24) self.in_planes = 24 self.layer1 = self._make_layer(out_planes[0], num_blocks[0], groups) self.layer2 = self._make_layer(out_planes[1], num_blocks[1], groups) self.layer3 = self._make_layer(out_planes[2], num_blocks[2], groups) self.linear = nn.Linear(out_planes[2], 10) def _make_layer(self, out_planes, num_blocks, groups): layers = [] for i in range(num_blocks): if i == 0: layers.append(Bottleneck(self.in_planes, out_planes-self.in_planes, stride=2, groups=groups)) else: layers.append(Bottleneck(self.in_planes, out_planes, stride=1, groups=groups)) self.in_planes = out_planes return nn.Sequential(*layers) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.layer1(out) out = self.layer2(out) out = self.layer3(out) out = F.avg_pool2d(out, 4) out = out.view(out.size(0), -1) out = self.linear(out) return out def ShuffleNetG2(): cfg = { 'out_planes': [200,400,800], 'num_blocks': [4,8,4], 'groups': 2 } return ShuffleNet(cfg) def ShuffleNetG3(): cfg = { 'out_planes': [240,480,960], 'num_blocks': [4,8,4], 'groups': 3 } return ShuffleNet(cfg) def test(): net = ShuffleNetG2() x = torch.randn(1,3,32,32) y = net(x) print(y) test()
1.4 Xception
Xception并不是真正意义上的轻量化模型,借鉴depth-wise convolution,而depth-wise convolution又是上述几个轻量化模型的关键点,其思想非常值得借鉴。
Xception是基于Inception-V3,并结合了depth-wise convolution,这样做的好处是提高网络效率,以及在同等参数量的情况下,在大规模数据集上,效果要优于Inception-V3。这也提供了另外一种“轻量化”的思路:在硬件资源给定的情况下,尽可能的增加网络效率和性能,也可以理解为充分利用硬件资源。
以下为基于PyTorch实现的Xception代码:
import math import torch import torch.nn as nn import torch.nn.functional as F import torch.utils.model_zoo as model_zoo from modeling.sync_batchnorm.batchnorm import SynchronizedBatchNorm2d def fixed_padding(inputs, kernel_size, dilation): kernel_size_effective = kernel_size + (kernel_size - 1) * (dilation - 1) pad_total = kernel_size_effective - 1 pad_beg = pad_total // 2 pad_end = pad_total - pad_beg padded_inputs = F.pad(inputs, (pad_beg, pad_end, pad_beg, pad_end)) return padded_inputs class SeparableConv2d(nn.Module): def __init__(self, inplanes, planes, kernel_size=3, stride=1, dilation=1, bias=False, BatchNorm=None): super(SeparableConv2d, self).__init__() self.conv1 = nn.Conv2d(inplanes, inplanes, kernel_size, stride, 0, dilation, groups=inplanes, bias=bias) self.bn = BatchNorm(inplanes) self.pointwise = nn.Conv2d(inplanes, planes, 1, 1, 0, 1, 1, bias=bias) def forward(self, x): x = fixed_padding(x, self.conv1.kernel_size[0], dilation=self.conv1.dilation[0]) x = self.conv1(x) x = self.bn(x) x = self.pointwise(x) return x class Block(nn.Module): def __init__(self, inplanes, planes, reps, stride=1, dilation=1, BatchNorm=None, start_with_relu=True, grow_first=True, is_last=False): super(Block, self).__init__() if planes != inplanes or stride != 1: self.skip = nn.Conv2d(inplanes, planes, 1, stride=stride, bias=False) self.skipbn = BatchNorm(planes) else: self.skip = None self.relu = nn.ReLU(inplace=True) rep = [] filters = inplanes if grow_first: rep.append(self.relu) rep.append(SeparableConv2d(inplanes, planes, 3, 1, dilation, BatchNorm=BatchNorm)) rep.append(BatchNorm(planes)) filters = planes for i in range(reps - 1): rep.append(self.relu) rep.append(SeparableConv2d(filters, filters, 3, 1, dilation, BatchNorm=BatchNorm)) rep.append(BatchNorm(filters)) if not grow_first: rep.append(self.relu) rep.append(SeparableConv2d(inplanes, planes, 3, 1, dilation, BatchNorm=BatchNorm)) rep.append(BatchNorm(planes)) if stride != 1: rep.append(self.relu) rep.append(SeparableConv2d(planes, planes, 3, 2, BatchNorm=BatchNorm)) rep.append(BatchNorm(planes)) if stride == 1 and is_last: rep.append(self.relu) rep.append(SeparableConv2d(planes, planes, 3, 1, BatchNorm=BatchNorm)) rep.append(BatchNorm(planes)) if not start_with_relu: rep = rep[1:] self.rep = nn.Sequential(*rep) def forward(self, inp): x = self.rep(inp) if self.skip is not None: skip = self.skip(inp) skip = self.skipbn(skip) else: skip = inp x = x + skip return x class AlignedXception(nn.Module): def __init__(self, output_stride, BatchNorm, pretrained=True): super(AlignedXception, self).__init__() if output_stride == 16: entry_block3_stride = 2 middle_block_dilation = 1 exit_block_dilations = (1, 2) elif output_stride == 8: entry_block3_stride = 1 middle_block_dilation = 2 exit_block_dilations = (2, 4) else: raise NotImplementedError # Entry flow self.conv1 = nn.Conv2d(3, 32, 3, stride=2, padding=1, bias=False) self.bn1 = BatchNorm(32) self.relu = nn.ReLU(inplace=True) self.conv2 = nn.Conv2d(32, 64, 3, stride=1, padding=1, bias=False) self.bn2 = BatchNorm(64) self.block1 = Block(64, 128, reps=2, stride=2, BatchNorm=BatchNorm, start_with_relu=False) self.block2 = Block(128, 256, reps=2, stride=2, BatchNorm=BatchNorm, start_with_relu=False, grow_first=True) self.block3 = Block(256, 728, reps=2, stride=entry_block3_stride, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True, is_last=True) # Middle flow self.block4 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block5 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block6 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block7 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block8 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block9 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block10 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block11 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block12 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block13 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block14 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block15 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block16 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block17 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block18 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) self.block19 = Block(728, 728, reps=3, stride=1, dilation=middle_block_dilation, BatchNorm=BatchNorm, start_with_relu=True, grow_first=True) # Exit flow self.block20 = Block(728, 1024, reps=2, stride=1, dilation=exit_block_dilations[0], BatchNorm=BatchNorm, start_with_relu=True, grow_first=False, is_last=True) self.conv3 = SeparableConv2d(1024, 1536, 3, stride=1, dilation=exit_block_dilations[1], BatchNorm=BatchNorm) self.bn3 = BatchNorm(1536) self.conv4 = SeparableConv2d(1536, 1536, 3, stride=1, dilation=exit_block_dilations[1], BatchNorm=BatchNorm) self.bn4 = BatchNorm(1536) self.conv5 = SeparableConv2d(1536, 2048, 3, stride=1, dilation=exit_block_dilations[1], BatchNorm=BatchNorm) self.bn5 = BatchNorm(2048) # Init weights self._init_weight() # Load pretrained model if pretrained: self._load_pretrained_model() def forward(self, x): # Entry flow x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.conv2(x) x = self.bn2(x) x = self.relu(x) x = self.block1(x) # add relu here x = self.relu(x) low_level_feat = x x = self.block2(x) x = self.block3(x) # Middle flow x = self.block4(x) x = self.block5(x) x = self.block6(x) x = self.block7(x) x = self.block8(x) x = self.block9(x) x = self.block10(x) x = self.block11(x) x = self.block12(x) x = self.block13(x) x = self.block14(x) x = self.block15(x) x = self.block16(x) x = self.block17(x) x = self.block18(x) x = self.block19(x) # Exit flow x = self.block20(x) x = self.relu(x) x = self.conv3(x) x = self.bn3(x) x = self.relu(x) x = self.conv4(x) x = self.bn4(x) x = self.relu(x) x = self.conv5(x) x = self.bn5(x) x = self.relu(x) return x, low_level_feat def _init_weight(self): for m in self.modules(): if isinstance(m, nn.Conv2d): n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels m.weight.data.normal_(0, math.sqrt(2. / n)) elif isinstance(m, SynchronizedBatchNorm2d): m.weight.data.fill_(1) m.bias.data.zero_() elif isinstance(m, nn.BatchNorm2d): m.weight.data.fill_(1) m.bias.data.zero_() def _load_pretrained_model(self): pretrain_dict = model_zoo.load_url('http://data.lip6.fr/cadene/pretrainedmodels/xception-b5690688.pth') model_dict = {} state_dict = self.state_dict() for k, v in pretrain_dict.items(): if k in state_dict: if 'pointwise' in k: v = v.unsqueeze(-1).unsqueeze(-1) if k.startswith('block11'): model_dict[k] = v model_dict[k.replace('block11', 'block12')] = v model_dict[k.replace('block11', 'block13')] = v model_dict[k.replace('block11', 'block14')] = v model_dict[k.replace('block11', 'block15')] = v model_dict[k.replace('block11', 'block16')] = v model_dict[k.replace('block11', 'block17')] = v model_dict[k.replace('block11', 'block18')] = v model_dict[k.replace('block11', 'block19')] = v elif k.startswith('block12'): model_dict[k.replace('block12', 'block20')] = v elif k.startswith('bn3'): model_dict[k] = v model_dict[k.replace('bn3', 'bn4')] = v elif k.startswith('conv4'): model_dict[k.replace('conv4', 'conv5')] = v elif k.startswith('bn4'): model_dict[k.replace('bn4', 'bn5')] = v else: model_dict[k] = v state_dict.update(model_dict) self.load_state_dict(state_dict) if __name__ == "__main__": import torch model = AlignedXception(BatchNorm=nn.BatchNorm2d, pretrained=True, output_stride=16) input = torch.rand(1, 3, 512, 512) output, low_level_feat = model(input) print(output.size()) print(low_level_feat.size())
轻量化模型小结
网络 | 实现轻量化技巧 |
SqueezeNet | 1x1卷积核压缩Feature map数量 |
MobileNet | Depth-wise Convolution |
ShuffleNet | Depth-wise Convolution & Channel Shuffle |
Xception | 修改的Depth-wise Convolution |
轻量化主要得益于depth-wise convolution,因此大家可以考虑采用depth-wise convolution来设计自己的轻量化网络,但是要注意信息流通不畅问题。
解决“信息流通不畅”的问题:
- MobileNet采用了point-wise convolution
- ShuffleNet采用的是channel shuffle
MobileNet相较于ShuffleNet使用了更多的卷积,计算量和参数量上是劣势,但是增加了非线性层数,理论上特征更抽象,更高级了;ShuffleNet则省去point-wise convolution,采用channel shuffle,简单明了,省去卷积步骤,减少了参数量。
下图是对全文以及上一篇笔记的经典卷积神经网络的总结,可以说是很全面的了!!!