2、Deep SORT 目标跟踪算法
DeepSort中最大的特点是加入外观信息,借用了ReID领域模型来提取特征,减少了ID switch的次数。整
体流程图如下:
可以看出,Deep SORT算法在SORT算法的基础上增加了级联匹配(Matching Cascade)+新轨迹的确认(confirmed)。总体流程就是:
- 卡尔曼滤波器预测轨迹 Tracks;
- 使用匈牙利算法将预测得到的轨迹 Tracks 和当前帧中的 detections 进行匹配(级联匹配和 IOU 匹配);
- 卡尔曼滤波更新。
2.1、Deep SORT相对于SORT的改进
整体框架没有大改,还是延续了卡尔曼滤波加匈牙利算法的思路,在这个基础上增加了Deep Association Metric。Deep Association Metric其实就是在大型行人重识别网络上学习的一个行人鉴别网络。目的是区分出不同的行人。个人感觉很类似于典型的行人重识别网络。输出行人图片,输出一组向量,通过比对两个向量之间的距离,来判断两副输入图片是否是同一个行人。
2.2、级联匹配
级联匹配的目的:当一个目标长时间被遮挡之后,kalman滤波预测的不确定性就会大大增加,状态空间内的可观察性就会大大降低。假如此时两个追踪器竞争同一个检测结果的匹配权,往往遮挡时间较长的那条轨迹的马氏距离更小,使得检测结果更可能和遮挡时间较长的那条轨迹相关联,这种不理想的效果往往会破坏追踪的持续性。这么理解吧,假设本来协方差矩阵是一个正态分布,那么连续的预测不更新就会导致这个正态分布的方差越来越大,那么离均值欧氏距离远的点可能和之前分布中离得较近的点获得同样的马氏距离值。所以,作者使用了级联匹配来对更加频繁出现的目标赋予优先权。当然同样也有弊端:可能导致一些新产生的轨迹被连接到了一些旧的轨迹上。但这种情况较少。
上图非常清晰地解释了如何进行级联匹配,首先使用外观模型(ReID)和运动模型(马氏距离)来计算相似度矩阵,得到cost矩阵以及门控矩阵,用于限制代价矩阵中过大的值。然后则是级联匹配的数据关联步骤,匹配过程是一个循环(max age个迭代,默认为70),也就是从missing age=0到missing age=70的轨迹和Detections进行匹配,没有丢失过的轨迹优先匹配,丢失较为久远的就靠后匹配。通过这部分处理,可以重新将被遮挡目标找回,降低被遮挡然后再出现的目标发生的ID Switch 次数。
将Detection和Track进行匹配,所以出现几种情况:
- Detection和Track匹配,也就是 Matched Tracks。普通连续跟踪的目标都属于这种情况,前后两帧都有目标,能够匹配上。
- Detection 没有找到匹配的Track,也就是Unmatched Detections。图像中突然出现新的目标的时候,Detection无法在之前的Track找到匹配的目标。
- Track 没有找到匹配的Detection,也就是Unmatched Tracks。连续追踪的目标超出图像区域,Track无法与当前任意一个 Detection 匹配。
- 以上没有涉及一种特殊的情况,就是两个目标遮挡的情况。刚刚被遮挡的目标的Track也无法匹配Detection,目标暂时从图像中消失。之后被遮挡目标再次出现的时候,应该尽量让被遮挡目标分配的ID不发生变动,减少IDSwitch 出现的次数,这就需要用到级联匹配了。
2.3、IOU Assignment
这个方法是在SORT中被提出的。又是比较陌生的名词。实际上匈牙利算法可以理解成“尽量多”的一种思路,比如说A检测器可以和a,c跟踪器完成匹配(与a匹配置信度更高),但是B检测器只能和a跟踪器完成匹配。那在算法中,就会让A与c完成匹配,B 与a完成匹配,而降低对于置信度的考虑。所以算法的根本目的并不是在于匹配的准不准,而是在于尽量多的匹配上,这也就是在deepsort中作者添加级联匹配与马氏距离与余弦距离的根本目的,因为仅仅使用匈牙利算法进行匹配特别容易造成 ID switch,就是一个检测框id不停地进行更换,缺乏准确性与鲁棒性。那什么是匹配的置信度高呢,其实在这里,作者使用的是 IOU 进行衡量,计算检测器与跟踪器的IOU,将这个作为置信度的高低(比较粗糙)。
2.4、矩阵更新后续处理
2.5、代码流程图
3、 Deep SORT代码解析
DeepSort 是核心类,调用其他模块,大体上可以分为三个模块:
- ReID 模块,用于提取表观特征,原论文中是生成了128维的embedding。
- Track 模块,轨迹类,用于保存一个Track的状态信息,是一个基本单位。
- Tracker 模块,Tracker模块掌握最核心的算法,卡尔曼滤波和匈牙利算法都是通过调用这个模块来完成的。
3.1、核心模块
3.1.1、Detection类
import numpy as np class Detection(object): #This class represents a bounding box detection in a single image. def __init__(self, tlwh, confidence, feature): self.tlwh = np.asarray(tlwh, dtype=np.float) self.confidence = float(confidence) self.feature = np.asarray(feature, dtype=np.float32) def to_tlbr(self): ret = self.tlwh.copy() ret[2:] += ret[:2] return ret def to_xyah(self): ret = self.tlwh.copy() ret[:2] += ret[2:] / 2 ret[2] /= ret[3] return ret
Detection 类用于保存通过目标检测器得到的一个检测框, 包含 top left 坐标+框的宽和高, 以及该 bbox 的置信 度还有通过 reid 获取得到的对应的 embedding。除此以外提供了不同 bbox 位置格式的转换方法:
- tlwh: 代表左上角坐标+宽高
- tlbr: 代表左上角坐标+右下角坐标
- xyah: 代表中心坐标+宽高比+高
3.1.2、Track类
class Track: # 一个轨迹的信息,包含(x,y,a,h) & v """ A single target track with state space `(x, y, a, h)` and associated velocities, where `(x, y)` is the center of the bounding box, `a` is the aspect ratio and `h` is the height. """ def __init__(self, mean, covariance, track_id, n_init, max_age, feature=None): # max age是一个存活期限,默认为70帧,在 self.mean = mean self.covariance = covariance self.track_id = track_id self.hits = 1 # hits和n_init进行比较 # hits每次update的时候进行一次更新(只有match的时候才进行update) # hits代表匹配上了多少次,匹配次数超过n_init就会设置为confirmed状态 self.age = 1 # 没有用到,和time_since_update功能重复 self.time_since_update = 0 # 每次调用predict函数的时候就会+1 # 每次调用update函数的时候就会设置为0 self.state = TrackState.Tentative self.features = [] # 每个track对应多个features, 每次更新都将最新的feature添加到列表中 if feature is not None: self.features.append(feature) self._n_init = n_init # 如果连续n_init帧都没有出现失配,设置为deleted状态 self._max_age = max_age # 上限
Track类主要存储的是轨迹信息,mean和covariance是保存的框的位置和速度信息,track_id代表分配给这个轨迹的ID。state代表框的状态,有三种:
- Tentative:不确定态,这种状态会在初始化一个Track的时候分配,并且只有在连续匹配上n_init帧才会转变为确定态。如果在处于不确定态的情况下没有匹配上任何detection,那将转变为删除态;
- Confirmed:确定态,代表该Track确实处于匹配状态。如果当前Track属于确定态,但是失配连续达到max age次数的时候,就会被转变为删除态;
- Deleted:删除态,说明该Track已经失效。
- max_age:代表一个Track存活期限,他需要和time_since_update变量进行比对。time_since_update是每次轨迹调用predict函数的时候就会+1,每次调用predict的时候就会重置为0,也就是说如果一个轨迹长时间没有update(没有匹配上)的时候,就会不断增加,直到time_since_update超过max age(默认70),将这个Track从Tracker中的列表删除。
- Hits:代表连续确认多少次,用在从不确定态转为确定态的时候。每次Track进行update的时候,hits就会+1, 如果hits>n_init(默认为3),也就是连续三帧的该轨迹都得到了匹配,这时候才将不确定态转为确定态。
3.1.3、ReID特征提取部分
ReID网络是独立于目标检测和跟踪器的模块,功能是提取对应bounding box中的feature,得到一个固定维度的embedding作为该bbox的代表,供计算相似度时使用。
class Extractor(object): def __init__(self, model_name, model_path, use_cuda=True): self.net = build_model(name=model_name, num_classes=96) self.device = "cuda" if torch.cuda.is_available( ) and use_cuda else "cpu" state_dict = torch.load(model_path)['net_dict'] self.net.load_state_dict(state_dict) print("Loading weights from {}... Done!".format(model_path)) self.net.to(self.device) self.size = (128,128) self.norm = transforms.Compose([ transforms.ToTensor(), transforms.Normalize([0.3568, 0.3141, 0.2781], [0.1752, 0.1857, 0.1879]) ]) def _preprocess(self, im_crops): def _resize(im, size): return cv2.resize(im.astype(np.float32) / 255., size) im_batch = torch.cat([ self.norm(_resize(im, self.size)).unsqueeze(0) for im in im_crops ],dim=0).float() return im_batch def __call__(self, im_crops): im_batch = self._preprocess(im_crops) with torch.no_grad(): im_batch = im_batch.to(self.device) features = self.net(im_batch) return features.cpu().numpy()
3.1.4、NearestNeighborDistanceMetric类
- 计算欧氏距离
def _pdist(a, b): # 用于计算成对的平方距离 # a NxM 代表N个对象,每个对象有M个数值作为embedding进行比较 # b LxM 代表L个对象,每个对象有M个数值作为embedding进行比较 # 返回的是NxL的矩阵,比如dist[i][j]代表a[i]和b[j]之间的平方和距离 # 实现见:https://blog.csdn.net/frankzd/article/details/80251042 a, b = np.asarray(a), np.asarray(b) # 拷贝一份数据 if len(a) == 0 or len(b) == 0: return np.zeros((len(a), len(b))) a2, b2 = np.square(a).sum(axis=1), np.square( b).sum(axis=1) # 求每个embedding的平方和 # sum(N) + sum(L) -2 x [NxM]x[MxL] = [NxL] r2 = -2. * np.dot(a, b.T) + a2[:, None] + b2[None, :] r2 = np.clip(r2, 0., float(np.inf)) return r2
- 计算余弦距离
def _cosine_distance(a, b, data_is_normalized=False): # a和b之间的余弦距离 # a : [NxM] b : [LxM] # 余弦距离 = 1 - 余弦相似度 # https://blog.csdn.net/u013749540/article/details/51813922 if not data_is_normalized: # 需要将余弦相似度转化成类似欧氏距离的余弦距离。 a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True) # np.linalg.norm 操作是求向量的范式,默认是L2范式,等同于求向量的欧式距离。 b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True) return 1. - np.dot(a, b.T)
3.1.6、Tracker类
Tracker类是最核心的类,Tracker中保存了所有的轨迹信息,负责初始化第一帧的轨迹、卡尔曼滤波的预测和更新、负责级联匹配、IOU匹配等等核心工作。
class Tracker: def __init__(self, metric, max_iou_distance=0.7, max_age=30, n_init=3): self.metric = metric self.max_iou_distance = max_iou_distance self.max_age = max_age self.n_init = n_init self.kf = kalman_filter.KalmanFilter() self.tracks = [] self._next_id = 1 def predict(self): for track in self.tracks: track.predict(self.kf) def update(self, detections): # Run matching cascade. matches, unmatched_tracks, unmatched_detections = \ self._match(detections) # Update track set. for track_idx, detection_idx in matches: self.tracks[track_idx].update( self.kf, detections[detection_idx]) for track_idx in unmatched_tracks: self.tracks[track_idx].mark_missed() for detection_idx in unmatched_detections: self._initiate_track(detections[detection_idx]) self.tracks = [t for t in self.tracks if not t.is_deleted()] # Update distance metric. active_targets = [t.track_id for t in self.tracks if t.is_confirmed()] features, targets = [], [] for track in self.tracks: if not track.is_confirmed(): continue features += track.features targets += [track.track_id for _ in track.features] track.features = [] self.metric.partial_fit( np.asarray(features), np.asarray(targets), active_targets) def _match(self, detections): # 主要功能是进行匹配,找到匹配的,未匹配的部分 def gated_metric(tracks, dets, track_indices, detection_indices): # 功能: 用于计算track和detection之间的距离,代价函数,需要使用在KM算法之前 features = np.array([dets[i].feature for i in detection_indices]) targets = np.array([tracks[i].track_id for i in track_indices]) # 1. 通过最近邻计算出代价矩阵 cosine distance cost_matrix = self.metric.distance(features, targets) # 2. 计算马氏距离,得到新的状态矩阵 cost_matrix = linear_assignment.gate_cost_matrix(self.kf, cost_matrix, tracks, dets, track_indices, detection_indices) return cost_matrix # 划分不同轨迹的状态 confirmed_tracks = [i for i, t in enumerate(self.tracks) if t.is_confirmed()] unconfirmed_tracks = [i for i, t in enumerate(self.tracks) if not t.is_confirmed()] # 进行级联匹配,得到匹配的track、不匹配的track、不匹配的detection ''' !!!!!!!!!!! 级联匹配 !!!!!!!!!!! ''' # gated_metric->cosine distance # 仅仅对确定态的轨迹进行级联匹配 matches_a, unmatched_tracks_a, unmatched_detections = \ linear_assignment.matching_cascade( gated_metric, self.metric.matching_threshold, self.max_age, self.tracks, detections, confirmed_tracks) # 将所有状态为未确定态的轨迹和刚刚没有匹配上的轨迹组合为iou_track_candidates, # 进行IoU的匹配 iou_track_candidates = unconfirmed_tracks + [ k for k in unmatched_tracks_a if self.tracks[k].time_since_update == 1] # 未匹配 unmatched_tracks_a = [ k for k in unmatched_tracks_a if self.tracks[k].time_since_update != 1] ''' !!!!!!!!!!! IOU 匹配 对级联匹配中还没有匹配成功的目标再进行IoU匹配 !!!!!!!!!!! ''' # 虽然和级联匹配中使用的都是min_cost_matching作为核心, # 这里使用的metric是iou cost和以上不同 matches_b, unmatched_tracks_b, unmatched_detections = \ linear_assignment.min_cost_matching( iou_matching.iou_cost, self.max_iou_distance, self.tracks, detections, iou_track_candidates, unmatched_detections) matches = matches_a + matches_b unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b)) return matches, unmatched_tracks, unmatched_detections def _initiate_track(self, detection): mean, covariance = self.kf.initiate(detection.to_xyah()) self.tracks.append(Track( mean, covariance, self._next_id, self.n_init, self.max_age, detection.feature)) self._next_id += 1
4、项目结果展示
当前的目标检车算法是yolo v3,后期会更新yolo v4、yolo v5以及Center Net的方法;这里的测试视频是小编在马路边用手机录制的,有点抖动,真么辛苦看完给个转发呗!!!!