MMDetection系列 | 4. MMDetection模型代码训练及测试过程的详细解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: MMDetection系列 | 4. MMDetection模型代码训练及测试过程的详细解析

1. MMDetection训练和测试流程解析


首先查看 faster_rcnn_r50_fpn.py 配置文件,可以发现有六个字段组成。注册的结构名称为 ‘FasterRCNN’ ,这个注册的结构可以在 mmdet/model/detectors/faster_rcnn.py 中找到,其是继承至 TwoStageDetector 父类。初始化的配置组成就是 faster_rcnn_r50_fpn.py 的配置组成。


faster_rcnn.py

@DETECTORS.register_module()
class FasterRCNN(TwoStageDetector):
    """Implementation of `Faster R-CNN <https://arxiv.org/abs/1506.01497>`_"""
    def __init__(self,
                 backbone,
                 rpn_head,
                 roi_head,
                 train_cfg,
                 test_cfg,
                 neck=None,
                 pretrained=None,
                 init_cfg=None):
        super(FasterRCNN, self).__init__(
            backbone=backbone,
            neck=neck,
            rpn_head=rpn_head,
            roi_head=roi_head,
            train_cfg=train_cfg,
            test_cfg=test_cfg,
            pretrained=pretrained,
            init_cfg=init_cfg)


faster_rcnn_r50_fpn.py

# model settings(简化了详细信息)
model = dict(
    type='FasterRCNN',
    backbone=dict(
        type='ResNet',...),
    neck=dict(
        type='FPN',...),
    rpn_head=dict(
        type='RPNHead',...),
    roi_head=dict(
        type='StandardRoIHead',...),
    train_cfg=dict(
        rpn=dict(...),
        rpn_proposal=dict(...),
        rcnn=dict(...)),
    test_cfg=dict(
        rpn=dict(...),
        rcnn=dict(...))
)


可以注意到,在Faster-rcnn的组成中,除了基本的backbone,neck,rpn_head,roi_head四个部分之外,还存在着训练和测试的配置文件train_cfg 与 test_cfg。对于backbone,neck,rpn_head,roi_head这四个部分,可以发现在配置文件中是有具体的注册模型使用,比如在这里的faster-cnn的backbone中是使用的ResNet,neck部分使用了FPN结构,另外的rpn_head,roi_head同样使用了一个指定的模块结构并进行了参数初始化进行使用,而这些指定的模块都可以在mmdet/models路径下的py文件中找到其具体的实现代码。


但是后续的train_cfg 与 test_cfg是没有指定具体的文件的,只有一些字段的组成:rpn,rpn_proposal,rcnn,首先就是探究这些字段是如何在函数中被调用的,先来直接查看父类TwoStageDetector。


在TwoStageDetector中的__init__函数中可以发现,为什么会在train_cfg中使用rpn,rpn_proposal,rcnn这些字段,是因为这是初始化的时候默认调用的。同时,会在训练的过程中使用,相关代码如下所示:


class TwoStageDetector(BaseDetector):
    def __init__(self,
                 backbone,
                 neck=None,
                 rpn_head=None,
                 roi_head=None,
                 train_cfg=None,
                 test_cfg=None,
                 pretrained=None,
                 init_cfg=None):
        super(TwoStageDetector, self).__init__(init_cfg)
        if pretrained:
            warnings.warn('DeprecationWarning: pretrained is deprecated, '
                          'please use "init_cfg" instead')
            backbone.pretrained = pretrained
        self.backbone = build_backbone(backbone)
        if neck is not None:
            self.neck = build_neck(neck)
  # rpn_head会同时将test和train的rpn部分一同注册
        if rpn_head is not None:
            rpn_train_cfg = train_cfg.rpn if train_cfg is not None else None
            rpn_head_ = rpn_head.copy()
            rpn_head_.update(train_cfg=rpn_train_cfg, test_cfg=test_cfg.rpn) # 使用部分
            self.rpn_head = build_head(rpn_head_)
  # roi_head会同时将test和train的rcnn部分一同注册
        if roi_head is not None:
            rcnn_train_cfg = train_cfg.rcnn if train_cfg is not None else None
            roi_head.update(train_cfg=rcnn_train_cfg) # 使用部分
            roi_head.update(test_cfg=test_cfg.rcnn)     # 使用部分
            roi_head.pretrained = pretrained
            self.roi_head = build_head(roi_head)
        self.train_cfg = train_cfg
        self.test_cfg = test_cfg
  ...
    def forward_train(self,..., *kwargs):
        x = self.extract_feat(img)
        losses = dict()
        # RPN forward and loss
        if self.with_rpn:
            proposal_cfg = self.train_cfg.get('rpn_proposal',
                                              self.test_cfg.rpn)
            rpn_losses, proposal_list = self.rpn_head.forward_train(
                    proposal_cfg=proposal_cfg,
                    ...)
            losses.update(rpn_losses)
        else:
            proposal_list = proposals
        roi_losses = self.roi_head.forward_train(...)
        losses.update(roi_losses)
        return losses


再重新观察faster_rcnn_r50_fpn.py文件,发现无论是rpn_head还是roi_head,都需要在train_cfg中为其分配一个assigner和sampler。


这里根据官方资料简单介绍一下:


  • BBox Assigner

正负样本属性分配模块作用是进行正负样本定义或者正负样本分配(可能也包括忽略样本定义),正样本就是常说的前景样本(可以是任何类别),负样本就是背景样本。因为目标检测是一个同时进行分类和回归的问题,对于分类场景必然需要确定正负样本,否则无法训练。该模块至关重要,不同的正负样本分配策略会带来显著的性能差异,目前大部分目标检测算法都会对这个部分进行改进,至关重要。对应的代码在mmdet/core/bbox/assigners中


  • BBox Sampler

在确定每个样本的正负属性后,可能还需要进行样本平衡操作。本模块作用是对前面定义的正负样本不平衡进行采样,力争克服该问题。一般在目标检测中 gt bbox 都是非常少的,所以正负样本比是远远小于 1 的。而基于机器学习观点:在数据极度不平衡情况下进行分类会出现预测倾向于样本多的类别,出现过拟合,为了克服该问题,适当的正负样本采样策略是非常必要的。对应的代码在mmdet/core/bbox/samplers中。


  • BBox Encoder / Decoder

为了更好的收敛和平衡多个 loss,具体解决办法非常多,而 bbox 编解码策略也算其中一个,bbox 编码阶段对应的是对正样本的 gt bbox 采用某种编码变换(反操作就是 bbox 解码),最简单的编码是对 gt bbox 除以图片宽高进行归一化以平衡分类和回归分支,对应的代码在mmdet/core/bbox/coder中。

训练时候进行了编码,那么对应的测试环节需要进行解码。根据编码的不同,解码也是不同的。举个简单例子:假设训练时候对宽高是直接除以图片宽高进行归一化的,那么解码过程也仅仅需要乘以图片宽高即可。其代码和 bbox encoder 放在一起,在mmdet/core/bbox/coder中。


参考 Faster R-CNN 配置文件,这三者是作用与在同一个结构上的。


rpn_head=dict(
        type='RPNHead',
        ...
        bbox_coder=dict(
            type='DeltaXYWHBBoxCoder',
            target_means=[.0, .0, .0, .0],
            target_stds=[1.0, 1.0, 1.0, 1.0]),
        ...
train_cfg=dict(
        rpn=dict(
            assigner=dict(
                type='MaxIoUAssigner',
                pos_iou_thr=0.7,
                neg_iou_thr=0.3,
                min_pos_iou=0.3,
                match_low_quality=True,
                ignore_iof_thr=-1),
            sampler=dict(
                type='RandomSampler',
                num=256,
                pos_fraction=0.5,
                neg_pos_ub=-1,
                add_gt_as_proposals=False),
        ...


1.1 训练流程

重新回到TwoStageDetector的代码中,对于two-stage的训练流程主要是调用forward_train这个函数,而在这个函数中,模块核心是调用 self.rpn_head.forward_train 和 self.roi_head.forward_train 函数,输出 losses 和其他相关数据。


#============= mmdet/models/detectors/two_stage.py/TwoStageDetector ============
def forward_train(...):
    # 先进行 backbone+neck 的特征提取
    x = self.extract_feat(img)
    losses = dict()
    # RPN forward and loss
    if self.with_rpn:
        # 训练 RPN
        proposal_cfg = self.train_cfg.get('rpn_proposal',
                                        self.test_cfg.rpn)
        # 主要是调用 rpn_head 内部的 forward_train 方法
        rpn_losses, proposal_list = self.rpn_head.forward_train(x,...)
        losses.update(rpn_losses)
    else:
        proposal_list = proposals
    # 第二阶段,主要是调用 roi_head 内部的 forward_train 方法
    roi_losses = self.roi_head.forward_train(x, ...)
    losses.update(roi_losses)
    return losses


对于 one-stage 而言,具体如下所示:


#============= mmdet/models/detectors/single_stage.py/SingleStageDetector ============
def forward_train(...):
    super(SingleStageDetector, self).forward_train(img, img_metas)
    # 先进行 backbone+neck 的特征提取
    x = self.extract_feat(img)
    # 主要是调用 bbox_head 内部的 forward_train 方法
    losses = self.bbox_head.forward_train(x, ...)
    return losses


这个比 two-stage head 模块简单,因为其只有第一个 stage,对应的函数是 self.bbox_head.forward_train


1.2 测试流程

对于测试流程,如果是单尺度测试则会调用 TwoStageDetector中的 simple_test 方法,如果是多尺度测试,则调用 aug_test 方法,如下所示:


 

# 单尺度测试(est without augmentation)
    def simple_test(self, img, img_metas, proposals=None, rescale=False):
        assert self.with_bbox, 'Bbox head must be implemented.'
        x = self.extract_feat(img)
        if proposals is None:
            proposal_list = self.rpn_head.simple_test_rpn(x, img_metas)
        else:
            proposal_list = proposals
        return self.roi_head.simple_test(
            x, proposal_list, img_metas, rescale=rescale)
  # 多尺度测试(Test with augmentations)
    def aug_test(self, imgs, img_metas, rescale=False):
        x = self.extract_feats(imgs)
        proposal_list = self.rpn_head.aug_test_rpn(x, img_metas)
        return self.roi_head.aug_test(
            x, proposal_list, img_metas, rescale=rescale)


但是,可以看出在测试阶段,主要是调用了 Head 模块自身的 simple_test 或 aug_test 方法。


2. RPN_Head解析


这里以faster_rcnn_r50_fpn.py的配置文件为例,在这里配置文件的rpn使用了RPNHead这个类,所以现在来查看下这个类。


RPNHead继承自AnchorHead,同时AnchorHead又继承自BaseDenseHead与BBoxTestMixin。


@HEADS.register_module()
class RPNHead(AnchorHead):
...
@HEADS.register_module()
class AnchorHead(BaseDenseHead, BBoxTestMixin):
...
class BaseDenseHead(BaseModule, metaclass=ABCMeta):


2.1 训练流程

由于不断的继承,我们最终会发现,对于RPNHead来说,其训练函数入口forward_train最后是在其父类的父类BaseDenseHead中,其实现是在 mmdet/models/dense_heads/base_dense_head.py/BaseDenseHead 中,如下所示:


class BaseDenseHead(BaseModule, metaclass=ABCMeta):
  ...
  def forward_train(self,
                   x,
                   img_metas,
                   gt_bboxes,
                   gt_labels=None,
                   gt_bboxes_ignore=None,
                   proposal_cfg=None,
                   **kwargs):
     # 调用各个子类实现的 forward 方法
     outs = self(x)
     if gt_labels is None:
         loss_inputs = outs + (gt_bboxes, img_metas)
     else:
         loss_inputs = outs + (gt_bboxes, gt_labels, img_metas)
     # 调用各个子类实现的 loss 计算方法
     losses = self.loss(*loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
     if proposal_cfg is None:
         return losses
     else:
         # two-stage 算法还需要返回 proposal
         proposal_list = self.get_bboxes(*outs, img_metas, cfg=proposal_cfg)
         return losses, proposal_list


这里面有两个关键函数:self.loss 与 self(x)。每个算法的 Head 子类一般不会重写上述方法,但是每个 Head 子类都会重写 forward 和 loss 方法,其中 forward 方法用于运行 head 网络部分输出分类回归分支的特征图,而 loss 方法接收 forward 输出,并且结合 label 计算 loss。


而这里查看RPNHead这个类,就可以发现其重写了loss这个函数,不过调用的也是其父类AnchorHead的loss函数。


BaseDenseHead

BaseDenseHead 基类过于简单,对于 anchor-based 和 anchor-free 算法又进一步进行了继承,得到 AnchorHead 或者 AnchorFreeHead 类。在目前的各类算法实现中,绝大部分子类都是继承自 AnchorHead 或者 AnchorFreeHead,其提供了一些相关的默认操作,如果直接继承 BaseDenseHead 则子类需要重写大部分算法逻辑。


AnchorHead

刚刚分析代码到,在BaseDenseHead的forward_train训练入口函数中,都会实现子类的 forward 和 loss 方法。下面就对BaseDenseHead的子类AnchorHead的forward 和 loss 函数进行分析。其实也就说,在BaseDenseHead的forward_train中,self(x)就是调用AnchorHead的forward函数,self.loss就是调用AnchorHead的loss函数。


先来查看AnchorHead的forward函数:


# BBoxTestMixin 是多尺度测试时候调用
class AnchorHead(BaseDenseHead, BBoxTestMixin):
    # feats 是 backbone+neck 输出的多个尺度图
    def forward(self, feats):
        # 对每张特征图单独计算预测输出
        return multi_apply(self.forward_single, feats)
    # head 模块分类回归分支输出
    def forward_single(self, x):
        cls_score = self.conv_cls(x)
        bbox_pred = self.conv_reg(x)
        return cls_score, bbox_pred


forward 函数比较简单,就是对多尺度特征图中每个特征图分别计算分类和回归输出即可,主要复杂度在 loss 函数中,其运行流程图如下所示:

image.png


代码如下:


class AnchorHead(BaseDenseHead, BBoxTestMixin):
  @force_fp32(apply_to=('cls_scores', 'bbox_preds'))
    def loss(self,
             cls_scores,
             bbox_preds,
             gt_bboxes,
             gt_labels,
             img_metas,
             gt_bboxes_ignore=None):
        """Compute losses of the head.
        Args:
            cls_scores (list[Tensor]): Box scores for each scale level
                Has shape (N, num_anchors * num_classes, H, W)
            bbox_preds (list[Tensor]): Box energies / deltas for each scale
                level with shape (N, num_anchors * 4, H, W)
            gt_bboxes (list[Tensor]): Ground truth bboxes for each image with
                shape (num_gts, 4) in [tl_x, tl_y, br_x, br_y] format.
            gt_labels (list[Tensor]): class indices corresponding to each box
            img_metas (list[dict]): Meta information of each image, e.g.,
                image size, scaling factor, etc.
            gt_bboxes_ignore (None | list[Tensor]): specify which bounding
                boxes can be ignored when computing the loss. Default: None
        Returns:
            dict[str, Tensor]: A dictionary of loss components.
        """
        featmap_sizes = [featmap.size()[-2:] for featmap in cls_scores]
        assert len(featmap_sizes) == self.prior_generator.num_levels
        device = cls_scores[0].device
        anchor_list, valid_flag_list = self.get_anchors(
            featmap_sizes, img_metas, device=device)
        label_channels = self.cls_out_channels if self.use_sigmoid_cls else 1
        cls_reg_targets = self.get_targets(
            anchor_list,
            valid_flag_list,
            gt_bboxes,
            img_metas,
            gt_bboxes_ignore_list=gt_bboxes_ignore,
            gt_labels_list=gt_labels,
            label_channels=label_channels)
        if cls_reg_targets is None:
            return None
        (labels_list, label_weights_list, bbox_targets_list, bbox_weights_list,
         num_total_pos, num_total_neg) = cls_reg_targets
        num_total_samples = (
            num_total_pos + num_total_neg if self.sampling else num_total_pos)
        # anchor number of multi levels
        num_level_anchors = [anchors.size(0) for anchors in anchor_list[0]]
        # concat all level anchors and flags to a single tensor
        concat_anchor_list = []
        for i in range(len(anchor_list)):
            concat_anchor_list.append(torch.cat(anchor_list[i]))
        all_anchor_list = images_to_levels(concat_anchor_list,
                                           num_level_anchors)
        losses_cls, losses_bbox = multi_apply(
            self.loss_single,
            cls_scores,
            bbox_preds,
            all_anchor_list,
            labels_list,
            label_weights_list,
            bbox_targets_list,
            bbox_weights_list,
            num_total_samples=num_total_samples)
        return dict(loss_cls=losses_cls, loss_bbox=losses_bbox)


分析:


  1. 在 loss 函数中首先会调用 get_anchors 函数得到默认 anchor 列表。而 get_anchors 函数内部会先计算多尺度特征图上每个特征点位置的 anchor,然后再计算有效 anchor 标志(因为在组织 batch 时候有些图片会进行左上角 padding,这部分像素人为加的,不需要考虑 anchor)
  2. 然后基于 anchor、gt bbox 以及其他必备信息调用 get_targets 函数计算每个预测分支对应的 target。get_targets 函数内部会调用 multi_apply(_get_targets_single) 函数对每张图片单独计算 target,而 _get_targets_single 函数实现的功能比较多,包括:bbox assigner、bbox sampler 和 bbox encoder 三个关键环节
  3. 在得到 targets 后,调用 loss_single 函数计算每个输出尺度的 loss 值,最终返回各个分支的 loss


AnchorFreeHead

AnchorFreeHead 逻辑比 AnchorHead 简单很多,主要是因为 anchor-free 类算法比 anchor-based 算法更加灵活多变,而且少了复杂的 anchor 生成过程,其 forward 方法实现和 AnchorHead 完全相同,而 loss 方法没有实现,其子类必须实现。

@HEADS.register_module()

class AnchorFreeHead(BaseDenseHead, BBoxTestMixin):


def forward(self, feats):
  return multi_apply(self.forward_single, feats)[:2]
  def forward_single(self, x):
  cls_feat = x
        reg_feat = x
        for cls_layer in self.cls_convs:
            cls_feat = cls_layer(cls_feat)
        cls_score = self.conv_cls(cls_feat)
        for reg_layer in self.reg_convs:
            reg_feat = reg_layer(reg_feat)
        bbox_pred = self.conv_reg(reg_feat)
        return cls_score, bbox_pred, cls_feat, reg_feat
  @abstractmethod
    @force_fp32(apply_to=('cls_scores', 'bbox_preds'))
    def loss(self,
             cls_scores,
             bbox_preds,
             gt_bboxes,
             gt_labels,
             img_metas,
             gt_bboxes_ignore=None):
        # loss 方法没有实现,其子类必须实现
  raise NotImplementedError


上述内容介绍了在rpn_head中,具体的forward_train执行的内容。就是执行子类的forward与loss函数,下面就查看测试的流程。


2.2 测试过程

前面说过在测试流程中,最终会调用 Head 模块的 simple_test 或 aug_test 方法分别进行单尺度和多尺度测试,涉及到具体代码层面,one-stage 和 two-stage 调用函数有区别。具体上说,是调用head的simple_test_rpn 或 aug_test_rpn方法。但是最终调用的依然是 Head 模块的 get_bboxes 方法。


class TwoStageDetector(BaseDetector):


# 单尺度测试(est without augmentation)
    def simple_test(self, img, img_metas, proposals=None, rescale=False):
        if proposals is None:
            proposal_list = self.rpn_head.simple_test_rpn(x, img_metas)
            ...
        return self.roi_head.simple_test(
            x, proposal_list, img_metas, rescale=rescale)
  # 多尺度测试(Test with augmentations)
    def aug_test(self, imgs, img_metas, rescale=False):
        x = self.extract_feats(imgs)
        proposal_list = self.rpn_head.aug_test_rpn(x, img_metas)
        return self.roi_head.aug_test(
            x, proposal_list, img_metas, rescale=rescale)


AnchorHead

AnchorHead不仅集成了BaseDenseHead,同时还集成了BBoxTestMixin,而且其子类RPNHead的self.rpn_head.aug_test_rpn调用就是来自于BBoxTestMixin。但是该方法内部最终也是调用了 BaseDenseHead 中的get_bboxes 方法。同时,get_bboxes 方法又调用了子函数_get_bboxes_single方法。

class BBoxTestMixin(object):

...
    def simple_test_rpn(self, x, img_metas):
        """Test without augmentation, only for ``RPNHead`` and its variants,
        e.g., ``GARPNHead``, etc.
        Args:
            x (tuple[Tensor]): Features from the upstream network, each is
                a 4D-tensor.
            img_metas (list[dict]): Meta info of each image.
        Returns:
            list[Tensor]: Proposals of each image, each item has shape (n, 5),
                where 5 represent (tl_x, tl_y, br_x, br_y, score).
        """
        rpn_outs = self(x)
        proposal_list = self.get_bboxes(*rpn_outs, img_metas=img_metas)  # 归根到底还是调用了self.get_bboxes方法
        return proposal_list
    def aug_test_rpn(self, feats, img_metas):
        """Test with augmentation for only for ``RPNHead`` and its variants,
        e.g., ``GARPNHead``, etc.
        Args:
            feats (tuple[Tensor]): Features from the upstream network, each is
                        a 4D-tensor.
            img_metas (list[dict]): Meta info of each image.
        Returns:
            list[Tensor]: Proposals of each image, each item has shape (n, 5),
                where 5 represent (tl_x, tl_y, br_x, br_y, score).
        """
        samples_per_gpu = len(img_metas[0])
        aug_proposals = [[] for _ in range(samples_per_gpu)]
        for x, img_meta in zip(feats, img_metas):
            proposal_list = self.simple_test_rpn(x, img_meta)
            for i, proposals in enumerate(proposal_list):
                aug_proposals[i].append(proposals)
        # reorganize the order of 'img_metas' to match the dimensions
        # of 'aug_proposals'
        aug_img_metas = []
        for i in range(samples_per_gpu):
            aug_img_meta = []
            for j in range(len(img_metas)):
                aug_img_meta.append(img_metas[j][i])
            aug_img_metas.append(aug_img_meta)
        # after merging, proposals will be rescaled to the original image size
        merged_proposals = [
            merge_aug_proposals(proposals, aug_img_meta, self.test_cfg)
            for proposals, aug_img_meta in zip(aug_proposals, aug_img_metas)
        ]
        return merged_proposals


ps:这里再补充多一句,AnchorHead同时继承了BaseDenseHead, BBoxTestMixin两个父类,而get_bboxes方法是在BaseDenseHead父类中。


get_bboxes函数的主要流程:


  1. 遍历每个特征尺度输出分支,利用 nms_pre 配置参数对该层预测结果按照 scores 值进行从大到小进行 topk 截取,保留 scores 最高的前 nms_pre 的预测结果
  2. 对保留的预测结果进行 bbox 解码还原操作
  3. 还原到最原始图片尺度
  4. 如果需要进行 nms,则对所有分支预测保留结果进行统一 nms 即可,否则直接属于多尺度预测结果


ps:流程的1,2两步是get_bboxes调用_get_bboxes_single函数实现的,而流程的3,4两步是_get_bboxes_single函数再调用_bbox_post_process实现的。部分重要代码如下:


def _get_bboxes_single(self,...)
  # 遍历每个特征尺度输出分支
  for level_idx, (cls_score, bbox_pred, score_factor, priors) in \
                 enumerate(zip(cls_score_list, bbox_pred_list,
                               score_factor_list, mlvl_priors)):
     ...
     # 步骤1
  results = filter_scores_and_topk(
                 scores, cfg.score_thr, nms_pre,
                 dict(bbox_pred=bbox_pred, priors=priors))
  ...
  # 步骤2
     bboxes = self.bbox_coder.decode(
         priors, bbox_pred, max_shape=img_shape)
  return self._bbox_post_process(mlvl_scores, mlvl_labels, mlvl_bboxes,...)
def _bbox_post_process(self,...):
  # 步骤3
  if rescale:
      mlvl_bboxes /= mlvl_bboxes.new_tensor(scale_factor)
  ...
  # 步骤4
    if with_nms:
        det_bboxes, keep_idxs = batched_nms(mlvl_bboxes, mlvl_scores, mlvl_labels, cfg.nms)


AnchorFreeHead

AnchorFreeHead同样继承自BaseDenseHead, BBoxTestMixin,和AnchorHead的父类是完全一样的。所以其get_bboxes的所在位置与AnchorHead一样,也是在BaseDenseHead中。

除了 RPN 算法的多尺度测试是在mmdet/models/dense_heads/rpn_test_mixin.py,其余 Head 多尺度测试都是在 mmdet/models/dense_heads/dense_test_mixins.py/BBoxTestMixin 中实现,其思路是对多尺度图片中每张图片单独运行 get_bboxes,然后还原到原图尺度,最后把多尺度图片预测结果合并进行统一 nms。

以上,结束了Two-Stage中rpn_head部分的讲解,接下来就是rpn_head的下一步——roi_head。


3. ROI_Head解析


这里以faster_rcnn_r50_fpn.py的配置文件为例,在这里配置文件的rpn使用了StandardRoIHead这个类,所以现在来查看下这个类。


StandardRoIHead继承了3个父类,分别是BaseRoIHead, BBoxTestMixin, MaskTestMixin


3.1 训练流程

在two-stage网络的训练过程forward_train中,在实现了self.rpn_head.forward_train之后,随即而来的就是self.roi_head.forward_train。也就是训练完rpn_head,再进行roi_head的训练操作。而在faster-rcnn中作为roi_head的StandardRoIHead,其继承的父类BaseRoIHead本是只是定义了forward_train这个函数,但是没有任何操作,所以需要StandardRoIHead子类对其进行复写。其核心代码如下:


@HEADS.register_module()
class StandardRoIHead(BaseRoIHead, BBoxTestMixin, MaskTestMixin):
  def forward_train(self,
                   x,
                   img_metas,
                   proposal_list,
                   gt_bboxes,
                   gt_labels,
                   ...):
     if self.with_bbox or self.with_mask:
         num_imgs = len(img_metas)
         sampling_results = []
         for i in range(num_imgs):
             # 对每张图片进行 bbox 正负样本属性分配
             assign_result = self.bbox_assigner.assign(
                 proposal_list[i], ...)
             # 然后进行正负样本采样
             sampling_result = self.bbox_sampler.sample(
                 assign_result,
                 proposal_list[i],
                 ...)
             sampling_results.append(sampling_result)
     losses = dict()
     if self.with_bbox:
         # bbox 分支 forward,返回 loss
         bbox_results = self._bbox_forward_train(...)
         losses.update(bbox_results['loss_bbox'])
     if self.with_mask:
         # mask 分支 forward,返回 loss
     return losses
  def _bbox_forward_train(self, x, sampling_results, gt_bboxes, gt_labels,
                         img_metas):
     rois = bbox2roi([res.bboxes for res in sampling_results])
     # forward
     bbox_results = self._bbox_forward(x, rois)
     # 计算 target
     bbox_targets = self.bbox_head.get_targets(...)  
     # 计算 loss                                          
     loss_bbox = self.bbox_head.loss(...)
     return ...    
  def _bbox_forward(self, x, rois):
     # roi 提取
     bbox_feats = self.bbox_roi_extractor(
         x[:self.bbox_roi_extractor.num_inputs], rois)
     # bbox head 网络前向
     cls_score, bbox_pred = self.bbox_head(bbox_feats)
     return ...


从上述逻辑可以看出,StandardRoIHead 中 forward_train 函数仅仅是对内部的 bbox_head 相关函数进行调用,例如 get_targets 和 loss,本身 StandardRoIHead 类不做具体算法逻辑计算。也就是和rpn_head的算法逻辑不同,这里的roi_head的forward_train操作中是不涉及get_targets 和 loss函数。


可以参考 Faster R-CNN 配置文件理解 StandardRoIHead 和 bbox_head 的关系:


roi_head=dict(
    type='StandardRoIHead',
    bbox_roi_extractor=dict(
        type='SingleRoIExtractor',
        roi_layer=dict(type='RoIAlign', output_size=7, sampling_ratio=0),
        out_channels=256,
        featmap_strides=[4, 8, 16, 32]),
    bbox_head=dict(
        type='Shared2FCBBoxHead',
        in_channels=256,
        fc_out_channels=1024,
        roi_feat_size=7,
        num_classes=80,
        bbox_coder=dict(
            type='DeltaXYWHBBoxCoder',
            target_means=[0., 0., 0., 0.],
            target_stds=[0.1, 0.1, 0.2, 0.2]),
        reg_class_agnostic=False,
        loss_cls=dict(
            type='CrossEntropyLoss', use_sigmoid=False, loss_weight=1.0),
        loss_bbox=dict(type='L1Loss', loss_weight=1.0))))


StandardRoIHead 类包装了 bbox_roi_extractor 和 bbox_head 的实例,前者用于 RoI 特征提取,后者才是真正计算分类和回归的逻辑。在 bbox_head 中除了网络模型有些变换外,loss计算过程是非常类似的,其 get_targets 和 loss 计算过程都是封装在基类 mmdet/models/roi_heads/bbox_heads/bbox_head.py 中。


class StandardRoIHead(BaseRoIHead, BBoxTestMixin, MaskTestMixin):
  def _bbox_forward(self, x, rois):
     bbox_feats = self.bbox_roi_extractor(...)
     cls_score, bbox_pred = self.bbox_head(...) # 调用bbox_head本身的forward函数
     ......
     return bbox_results
  def _bbox_forward_train(self, x, ...):
     bbox_results = self._bbox_forward(x, rois)
  ...
     bbox_targets = self.bbox_head.get_targets(...)
     loss_bbox = self.bbox_head.loss(...) # 调用bbox_head本身的loss函数


在这里,StandardRoIHead使用的是Shared2FCBBoxHead这个bbox_head,现在直接来看这个类的代码,了解其forward函数与loss函数的实现。其中,Shared2FCBBoxHead继承自ConvFCBBoxHead,而ConvFCBBoxHead又继承自BBoxHead。ConvFCBBoxHead复写了BBoxHead中的forward函数,而其余的loss函数,get_targets,get_bboxes均是BBoxHead父类实现的。(在StandardRoIHead这里的get_bboxes好像没有被使用上)


3.2 测试流程

测试流程是调用 Head 模块的 simple_test 和 aug_test 函数。单尺度测试 bbox 相关实现代码在 mmdet/models/roi_heads/test_mixins.py/BBoxTestMixin 的 simple_test_bboxes 函数中。而多尺度测试的bbox 相关实现代码在 mmdet/models/roi_heads/test_mixins.py/BBoxTestMixin 的 aug_test_bboxes 函数中。这两个函数是直接在StandardRoIHead中的simple_test和aug_test中直接调用的,相关代码如下所示:


# 父类
class BBoxTestMixin:
  def simple_test_bboxes(self, ...):
     rois = bbox2roi(proposals)
     # roi 提取+ forward,输出预测结果
     bbox_results = self._bbox_forward(x, rois)
     cls_score = bbox_results['cls_score']
     bbox_pred = bbox_results['bbox_pred']
     det_bboxes = []
     det_labels = []
     for i in range(len(proposals)):
         # 对预测结果进行解码输出 bbox 和对应 label
         det_bbox, det_label = self.bbox_head.get_bboxes(...)
         det_bboxes.append(det_bbox)
         det_labels.append(det_label)
     return det_bboxes, det_labels
  def aug_test_bboxes(self, ...):
  aug_bboxes = []
        aug_scores = []
        for x, img_meta in zip(feats, img_metas):
            ...
            # TODO more flexible
            proposals = bbox_mapping(proposal_list[0][:, :4], img_shape,
                                     scale_factor, flip, flip_direction)
            rois = bbox2roi([proposals])
            bbox_results = self._bbox_forward(x, rois)
            bboxes, scores = self.bbox_head.get_bboxes(...)
    det_bboxes, det_labels = multiclass_nms(...)
        return det_bboxes, det_labels
# 子类继承
@HEADS.register_module()
class StandardRoIHead(BaseRoIHead, BBoxTestMixin, MaskTestMixin):
  def simple_test(self,...):
  det_bboxes, det_labels = self.simple_test_bboxes(...)
  ...
  if self.with_mask:
            segm_results = self.simple_test_mask(...)
  def aug_test(self, x, proposal_list, img_metas, rescale=False):
  det_bboxes, det_labels = self.aug_test_bboxes(...)
  ...
  if self.with_mask:
            segm_results = self.aug_test_mask(...)


实际上依然是调用了 Head 模块内部的 get_bboxes 函数,处理逻辑和 dense_head 差不多( 解码+还原尺度+ nms)。


参考资料:


1. 轻松掌握 MMDetection 中 Head 流程


2. 轻松掌握 MMDetection 整体构建流程(一)


3. 轻松掌握 MMDetection 整体构建流程(二)


目录
相关文章
|
1月前
|
开发框架 供应链 监控
并行开发模型详解:类型、步骤及其应用解析
在现代研发环境中,企业需要在有限时间内推出高质量的产品,以满足客户不断变化的需求。传统的线性开发模式往往拖慢进度,导致资源浪费和延迟交付。并行开发模型通过允许多个开发阶段同时进行,极大提高了产品开发的效率和响应能力。本文将深入解析并行开发模型,涵盖其类型、步骤及如何通过辅助工具优化团队协作和管理工作流。
61 3
|
11天前
|
存储 网络协议 安全
30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场
本文精选了 30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场。
35 2
|
11天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
14天前
|
域名解析 网络协议 测试技术
IP、掩码、网关、DNS1、DNS2到底是什么东西,ping telnet测试
理解IP地址、子网掩码、默认网关和DNS服务器的概念是有效管理和配置网络的基础。通过使用ping和telnet命令,可以测试网络连通性和服务状态,快速诊断和解决网络问题。这些工具和概念是网络管理员和IT专业人员日常工作中不可或缺的部分。希望本文提供的详细解释和示例能够帮助您更好地理解和应用这些网络配置和测试工具。
42 2
|
27天前
|
存储 安全 Java
系统安全架构的深度解析与实践:Java代码实现
【11月更文挑战第1天】系统安全架构是保护信息系统免受各种威胁和攻击的关键。作为系统架构师,设计一套完善的系统安全架构不仅需要对各种安全威胁有深入理解,还需要熟练掌握各种安全技术和工具。
78 10
|
26天前
|
编解码 人工智能 自然语言处理
迈向多语言医疗大模型:大规模预训练语料、开源模型与全面基准测试
【10月更文挑战第23天】Oryx 是一种新型多模态架构,能够灵活处理各种分辨率的图像和视频数据,无需标准化。其核心创新包括任意分辨率编码和动态压缩器模块,适用于从微小图标到长时间视频的多种应用场景。Oryx 在长上下文检索和空间感知数据方面表现出色,并且已开源,为多模态研究提供了强大工具。然而,选择合适的分辨率和压缩率仍需谨慎,以平衡处理效率和识别精度。论文地址:https://www.nature.com/articles/s41467-024-52417-z
43 2
|
27天前
|
前端开发 JavaScript 开发者
揭秘前端高手的秘密武器:深度解析递归组件与动态组件的奥妙,让你代码效率翻倍!
【10月更文挑战第23天】在Web开发中,组件化已成为主流。本文深入探讨了递归组件与动态组件的概念、应用及实现方式。递归组件通过在组件内部调用自身,适用于处理层级结构数据,如菜单和树形控件。动态组件则根据数据变化动态切换组件显示,适用于不同业务逻辑下的组件展示。通过示例,展示了这两种组件的实现方法及其在实际开发中的应用价值。
33 1
|
30天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
18 1
|
1月前
|
测试技术 API 开发者
精通.NET单元测试:MSTest、xUnit、NUnit全面解析
【10月更文挑战第15天】本文介绍了.NET生态系统中最流行的三种单元测试框架:MSTest、xUnit和NUnit。通过示例代码展示了每种框架的基本用法和特点,帮助开发者根据项目需求和个人偏好选择合适的测试工具。
39 3
|
1月前
|
机器学习/深度学习 算法 Python
深度解析机器学习中过拟合与欠拟合现象:理解模型偏差背后的原因及其解决方案,附带Python示例代码助你轻松掌握平衡技巧
【10月更文挑战第10天】机器学习模型旨在从数据中学习规律并预测新数据。训练过程中常遇过拟合和欠拟合问题。过拟合指模型在训练集上表现优异但泛化能力差,欠拟合则指模型未能充分学习数据规律,两者均影响模型效果。解决方法包括正则化、增加训练数据和特征选择等。示例代码展示了如何使用Python和Scikit-learn进行线性回归建模,并观察不同情况下的表现。
310 3
下一篇
无影云桌面