结合上一篇对于BEVFusion理论部分的理解,这一篇对于代码细节进行分析与记录。
其中,camera-only检测网络是BEVDet的变体,其中view transformer和超参数有差异。因为采用了新提出的efficient bev pooling方式,实现比BEVDet更快和更高精度的性能。
fusion_models的定义
在bevfusion.py
中定义「pipeline」
class BEVFusion(Base3DFisionModel): def __init__(encoders,fuser,decoder,heads): # 判断是否使用camera encoder if encoders.get("camera") is not None: self.encoders["camera"] = nn.ModuleDict( { "backbone": build_backbone(encoders["camera"]["backbone"]), "neck": build_neck(encoders["camera"]["neck"]), "vtransform": build_vtransform(encoders["camera"]["vtransform"]), } ) # 判断是否使用lidar encoder if encoders.get("lidar") is not None: if encoders["lidar"]["voxelize"].get("max_num_points", -1) > 0: voxelize_module = Voxelization(**encoders["lidar"]["voxelize"]) else: voxelize_module = DynamicScatter(**encoders["lidar"]["voxelize"]) self.encoders["lidar"] = nn.ModuleDict( { "voxelize": voxelize_module, "backbone": build_backbone(encoders["lidar"]["backbone"]), } ) self.voxelize_reduce = encoders["lidar"].get("voxelize_reduce", True) # 判断是否需要fuse if fuser is not None: self.fuser = build_fuser(fuser) else: self.fuser = None self.decoder = nn.ModuleDict( { "backbone": build_backbone(decoder["backbone"]), "neck": build_neck(decoder["neck"]), } ) self.heads = nn.ModuleDict() for name in heads: if heads[name] is not None: self.heads[name] = build_head(heads[name]) ....... def extract_camera_features( self, x, points, camera2ego, lidar2ego, lidar2camera, lidar2image, camera_intrinsics, camera2lidar, img_aug_matrix, lidar_aug_matrix, img_metas, ) -> torch.Tensor: def extract_lidar_features(self, x) -> torch.Tensor: feats, coords, sizes = self.voxelize(x) batch_size = coords[-1, 0] + 1 x = self.encoders["lidar"]["backbone"](feats, coords, batch_size, sizes=sizes) return x @force_fp32() def voxelize(self, points): def forward( self, img, points, camera2ego, lidar2ego, lidar2camera, lidar2image, camera_intrinsics, camera2lidar, img_aug_matrix, lidar_aug_matrix, metas, gt_masks_bev=None, gt_bboxes_3d=None, gt_labels_3d=None, **kwargs,): ... @auto_fp16(apply_to=("img", "points")) def forward_single( self, img, points, camera2ego, lidar2ego, lidar2camera, lidar2image, camera_intrinsics, camera2lidar, img_aug_matrix, lidar_aug_matrix, metas, gt_masks_bev=None, gt_bboxes_3d=None, gt_labels_3d=None, **kwargs, ): ...
LSS的应用与实现
在./mmdet3d/models/vtransforms/base.py
中定义「LSS转换」
注意函数gen_dx_bx()
,根据设定的xbound,ybound,zbound计算每个voxel grid对应的dx,bx,nx
def gen_dx_bx(xbound, ybound, zbound): dx = torch.Tensor([row[2] for row in [xbound, ybound, zbound]]) bx = torch.Tensor([row[0] + row[2] / 2.0 for row in [xbound, ybound, zbound]]) nx = torch.LongTensor( [(row[1] - row[0]) / row[2] for row in [xbound, ybound, zbound]] ) return dx, bx, nx
BaseTransform实现将图像和Lidar点云转换到bev space的流程,具体定义如下:
class BaseTransform(nn.Module): def __init__( self, in_channels: int, out_channels: int, image_size: Tuple[int, int], feature_size: Tuple[int, int], xbound: Tuple[float, float, float], ybound: Tuple[float, float, float], zbound: Tuple[float, float, float], dbound: Tuple[float, float, float], ) -> None: super().__init__() self.in_channels = in_channels self.image_size = image_size self.feature_size = feature_size self.xbound = xbound self.ybound = ybound self.zbound = zbound self.dbound = dbound # 预先计算图像中pixel与3D voxel之间的对应关系 dx, bx, nx = gen_dx_bx(self.xbound, self.ybound, self.zbound) self.dx = nn.Parameter(dx, requires_grad=False) self.bx = nn.Parameter(bx, requires_grad=False) self.nx = nn.Parameter(nx, requires_grad=False) self.C = out_channels self.frustum = self.create_frustum() self.D = self.frustum.shape[0] self.fp16_enabled = False @force_fp32() # 构建相机坐标系下的视锥体 def create_frustum(self): iH, iW = self.image_size fH, fW = self.feature_size ds = ( torch.arange(*self.dbound, dtype=torch.float) .view(-1, 1, 1) .expand(-1, fH, fW) ) D, _, _ = ds.shape xs = ( torch.linspace(0, iW - 1, fW, dtype=torch.float) .view(1, 1, fW) .expand(D, fH, fW) ) ys = ( torch.linspace(0, iH - 1, fH, dtype=torch.float) .view(1, fH, 1) .expand(D, fH, fW) ) frustum = torch.stack((xs, ys, ds), -1) return nn.Parameter(frustum, requires_grad=False) @force_fp32() def get_geometry( self, camera2lidar_rots, camera2lidar_trans, intrins, post_rots, post_trans, **kwargs, ): B, N, _ = camera2lidar_trans.shape # undo post-transformation # B x N x D x H x W x 3 # 平移转换,旋转转换 points = self.frustum - post_trans.view(B, N, 1, 1, 1, 3) points = ( torch.inverse(post_rots) .view(B, N, 1, 1, 1, 3, 3) .matmul(points.unsqueeze(-1)) ) # cam_to_lidar points = torch.cat( ( points[:, :, :, :, :, :2] * points[:, :, :, :, :, 2:3], points[:, :, :, :, :, 2:3], ), 5, ) # 坐标系转换 combine = camera2lidar_rots.matmul(torch.inverse(intrins)) points = combine.view(B, N, 1, 1, 1, 3, 3).matmul(points).squeeze(-1) points += camera2lidar_trans.view(B, N, 1, 1, 1, 3) if "extra_rots" in kwargs: extra_rots = kwargs["extra_rots"] points = ( extra_rots.view(B, 1, 1, 1, 1, 3, 3) .repeat(1, N, 1, 1, 1, 1, 1) .matmul(points.unsqueeze(-1)) .squeeze(-1) ) if "extra_trans" in kwargs: extra_trans = kwargs["extra_trans"] points += extra_trans.view(B, 1, 1, 1, 1, 3).repeat(1, N, 1, 1, 1, 1) return points def get_cam_feats(self, x): raise NotImplementedError @force_fp32() def bev_pool(self, geom_feats, x): B, N, D, H, W, C = x.shape Nprime = B * N * D * H * W # flatten x x = x.reshape(Nprime, C) # flatten indices geom_feats = ((geom_feats - (self.bx - self.dx / 2.0)) / self.dx).long() geom_feats = geom_feats.view(Nprime, 3) batch_ix = torch.cat( [ torch.full([Nprime // B, 1], ix, device=x.device, dtype=torch.long) for ix in range(B) ] ) geom_feats = torch.cat((geom_feats, batch_ix), 1) # 明确self.nx的含义,此处将box之外的点去掉 # filter out points that are outside box kept = ( (geom_feats[:, 0] >= 0) & (geom_feats[:, 0] < self.nx[0]) & (geom_feats[:, 1] >= 0) & (geom_feats[:, 1] < self.nx[1]) & (geom_feats[:, 2] >= 0) & (geom_feats[:, 2] < self.nx[2]) ) x = x[kept] geom_feats = geom_feats[kept] x = bev_pool(x, geom_feats, B, self.nx[2], self.nx[0], self.nx[1]) # collapse Z final = torch.cat(x.unbind(dim=2), 1) return final @force_fp32() def forward( self, img, points, camera2ego, lidar2ego, lidar2camera, lidar2image, camera_intrinsics, camera2lidar, img_aug_matrix, lidar_aug_matrix, **kwargs, ): rots = camera2ego[..., :3, :3] trans = camera2ego[..., :3, 3] intrins = camera_intrinsics[..., :3, :3] post_rots = img_aug_matrix[..., :3, :3] post_trans = img_aug_matrix[..., :3, 3] lidar2ego_rots = lidar2ego[..., :3, :3] lidar2ego_trans = lidar2ego[..., :3, 3] camera2lidar_rots = camera2lidar[..., :3, :3] camera2lidar_trans = camera2lidar[..., :3, 3] extra_rots = lidar_aug_matrix[..., :3, :3] extra_trans = lidar_aug_matrix[..., :3, 3] # geom = self.get_geometry( camera2lidar_rots, camera2lidar_trans, intrins, post_rots, post_trans, extra_rots=extra_rots, extra_trans=extra_trans, ) x = self.get_cam_feats(img) x = self.bev_pool(geom, x) return x
LSSFPN的实现
# 注册NECK class LSSFPN(nn.Module): def __init__( self, in_indices: Tuple[int, int], in_channels: Tuple[int, int], out_channels: int, scale_factor: int = 1, ) -> None: super().__init__() self.in_indices = in_indices self.in_channels = in_channels self.out_channels = out_channels self.scale_factor = scale_factor self.fuse = nn.Sequential( nn.Conv2d(in_channels[0] + in_channels[1], out_channels, 1, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(True), nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(True), ) if scale_factor > 1: self.upsample = nn.Sequential( nn.Upsample( scale_factor=scale_factor, mode="bilinear", align_corners=True, ), nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(True), ) def forward(self, x: List[torch.Tensor]) -> torch.Tensor: x1 = x[self.in_indices[0]] assert x1.shape[1] == self.in_channels[0] x2 = x[self.in_indices[1]] assert x2.shape[1] == self.in_channels[1] x1 = F.interpolate( x1, size=x2.shape[-2:], mode="bilinear", align_corners=True, ) x = torch.cat([x1, x2], dim=1) x = self.fuse(x) if self.scale_factor > 1: x = self.upsample(x) return
Lidar-Encoder
Lidar 点云的feature encoder:
class PointPillarsEncoder(nn.Module): def __init__( self, pts_voxel_encoder: Dict[str, Any], pts_middle_encoder: Dict[str, Any], **kwargs, ): super().__init__() self.pts_voxel_encoder = build_backbone(pts_voxel_encoder) self.pts_middle_encoder = build_backbone(pts_middle_encoder) def forward(self, feats, coords, batch_size, sizes): x = self.pts_voxel_encoder(feats, sizes, coords) x = self.pts_middle_encoder(x, coords, batch_size) return x
PillarFeatureNet类的定义:
class PillarFeatureNet(): “”“similar role to second.pytorch.voxelnet.VoxelFeatureExtractor”“”“ def forward(self, features, num_voxels, coors): ...... # Find distance of x, y, and z from cluster center # 计算相对于cluster center的x,y,z偏移作为f_cluster points_mean = features[:, :, :3].sum(dim=1, keepdim=True) / num_voxels.type_as( features).view(-1, 1, 1) f_cluster = features[:, :, :3] - points_mean # Find distance of x, y, and z from pillar center # 计算相对于pillar center的x,y,z偏移量 # modified according to xyz coords f_center = torch.zeros_like(features[:, :, :2]) f_center[:, :, 0] = features[:, :, 0] - ( coors[:, 1].to(dtype).unsqueeze(1) * self.vx + self.x_offset ) f_center[:, :, 1] = features[:, :, 1] - ( coors[:, 2].to(dtype).unsqueeze(1) * self.vy + self.y_offset ) # Combine together feature decorations features_ls = [features, f_cluster, f_center] if self._with_distance: points_dist = torch.norm(features[:, :, :3], 2, 2, keepdim=True) features_ls.append(points_dist) features = torch.cat(features_ls, dim=-1) # The feature decorations were calculated without regard to whether pillar was empty. Need to ensure that # empty pillars remain set to zeros. voxel_count = features.shape[1] mask = get_paddings_indicator(num_voxels, voxel_count, axis=0) mask = torch.unsqueeze(mask, -1).type_as(features) features *= mask # Forward pass through PFNLayers for pfn in self.pfn_layers: features = pfn(features) return features.squeeze()