深度学习实战】行人检测追踪与双向流量计数系统【python源码+Pyqt5界面+数据集+训练代码】YOLOv8、ByteTrack、目标追踪、双向计数、行人检测追踪、过线计数(3)

简介: 深度学习实战】行人检测追踪与双向流量计数系统【python源码+Pyqt5界面+数据集+训练代码】YOLOv8、ByteTrack、目标追踪、双向计数、行人检测追踪、过线计数

深度学习实战】行人检测追踪与双向流量计数系统【python源码+Pyqt5界面+数据集+训练代码】YOLOv8、ByteTrack、目标追踪、双向计数、行人检测追踪、过线计数(2)https://developer.aliyun.com/article/1536938

ByteTrack的实现代码如下:

class ByteTrack:
    """
    Initialize the ByteTrack object.
    Parameters:
        track_thresh (float, optional): Detection confidence threshold
            for track activation.
        track_buffer (int, optional): Number of frames to buffer when a track is lost.
        match_thresh (float, optional): Threshold for matching tracks with detections.
        frame_rate (int, optional): The frame rate of the video.
    """
    def __init__(
        self,
        track_thresh: float = 0.25,
        track_buffer: int = 30,
        match_thresh: float = 0.8,
        frame_rate: int = 30,
    ):
        self.track_thresh = track_thresh
        self.match_thresh = match_thresh
        self.frame_id = 0
        self.det_thresh = self.track_thresh + 0.1
        self.max_time_lost = int(frame_rate / 30.0 * track_buffer)
        self.kalman_filter = KalmanFilter()
        self.tracked_tracks: List[STrack] = []
        self.lost_tracks: List[STrack] = []
        self.removed_tracks: List[STrack] = []
    def update_with_detections(self, detections: Detections) -> Detections:
        """
        Updates the tracker with the provided detections and
            returns the updated detection results.
        Parameters:
            detections: The new detections to update with.
        Returns:
            Detection: The updated detection results that now include tracking IDs.
        """
        tracks = self.update_with_tensors(
            tensors=detections2boxes(detections=detections)
        )
        detections = Detections.empty()
        if len(tracks) > 0:
            detections.xyxy = np.array(
                [track.tlbr for track in tracks], dtype=np.float32
            )
            detections.class_id = np.array(
                [int(t.class_ids) for t in tracks], dtype=int
            )
            detections.tracker_id = np.array(
                [int(t.track_id) for t in tracks], dtype=int
            )
            detections.confidence = np.array(
                [t.score for t in tracks], dtype=np.float32
            )
        else:
            detections.tracker_id = np.array([], dtype=int)
        return detections
    def update_with_tensors(self, tensors: np.ndarray) -> List[STrack]:
        """
        Updates the tracker with the provided tensors and returns the updated tracks.
        Parameters:
            tensors: The new tensors to update with.
        Returns:
            List[STrack]: Updated tracks.
        """
        self.frame_id += 1
        activated_starcks = []
        refind_stracks = []
        lost_stracks = []
        removed_stracks = []
        class_ids = tensors[:, 5]
        scores = tensors[:, 4]
        bboxes = tensors[:, :4]
        remain_inds = scores > self.track_thresh
        inds_low = scores > 0.1
        inds_high = scores < self.track_thresh
        inds_second = np.logical_and(inds_low, inds_high)
        dets_second = bboxes[inds_second]
        dets = bboxes[remain_inds]
        scores_keep = scores[remain_inds]
        scores_second = scores[inds_second]
        class_ids_keep = class_ids[remain_inds]
        class_ids_second = class_ids[inds_second]
        if len(dets) > 0:
            """Detections"""
            detections = [
                STrack(STrack.tlbr_to_tlwh(tlbr), s, c)
                for (tlbr, s, c) in zip(dets, scores_keep, class_ids_keep)
            ]
        else:
            detections = []
        """ Add newly detected tracklets to tracked_stracks"""
        unconfirmed = []
        tracked_stracks = []  # type: list[STrack]
        for track in self.tracked_tracks:
            if not track.is_activated:
                unconfirmed.append(track)
            else:
                tracked_stracks.append(track)
        """ Step 2: First association, with high score detection boxes"""
        strack_pool = joint_tracks(tracked_stracks, self.lost_tracks)
        # Predict the current location with KF
        STrack.multi_predict(strack_pool)
        dists = matching.iou_distance(strack_pool, detections)
        dists = matching.fuse_score(dists, detections)
        matches, u_track, u_detection = matching.linear_assignment(
            dists, thresh=self.match_thresh
        )
        for itracked, idet in matches:
            track = strack_pool[itracked]
            det = detections[idet]
            if track.state == TrackState.Tracked:
                track.update(detections[idet], self.frame_id)
                activated_starcks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)
        """ Step 3: Second association, with low score detection boxes"""
        # association the untrack to the low score detections
        if len(dets_second) > 0:
            """Detections"""
            detections_second = [
                STrack(STrack.tlbr_to_tlwh(tlbr), s, c)
                for (tlbr, s, c) in zip(dets_second, scores_second, class_ids_second)
            ]
        else:
            detections_second = []
        r_tracked_stracks = [
            strack_pool[i]
            for i in u_track
            if strack_pool[i].state == TrackState.Tracked
        ]
        dists = matching.iou_distance(r_tracked_stracks, detections_second)
        matches, u_track, u_detection_second = matching.linear_assignment(
            dists, thresh=0.5
        )
        for itracked, idet in matches:
            track = r_tracked_stracks[itracked]
            det = detections_second[idet]
            if track.state == TrackState.Tracked:
                track.update(det, self.frame_id)
                activated_starcks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)
        for it in u_track:
            track = r_tracked_stracks[it]
            if not track.state == TrackState.Lost:
                track.mark_lost()
                lost_stracks.append(track)
        """Deal with unconfirmed tracks, usually tracks with only one beginning frame"""
        detections = [detections[i] for i in u_detection]
        dists = matching.iou_distance(unconfirmed, detections)
        dists = matching.fuse_score(dists, detections)
        matches, u_unconfirmed, u_detection = matching.linear_assignment(
            dists, thresh=0.7
        )
        for itracked, idet in matches:
            unconfirmed[itracked].update(detections[idet], self.frame_id)
            activated_starcks.append(unconfirmed[itracked])
        for it in u_unconfirmed:
            track = unconfirmed[it]
            track.mark_removed()
            removed_stracks.append(track)
        """ Step 4: Init new stracks"""
        for inew in u_detection:
            track = detections[inew]
            if track.score < self.det_thresh:
                continue
            track.activate(self.kalman_filter, self.frame_id)
            activated_starcks.append(track)
        """ Step 5: Update state"""
        for track in self.lost_tracks:
            if self.frame_id - track.end_frame > self.max_time_lost:
                track.mark_removed()
                removed_stracks.append(track)
        self.tracked_tracks = [
            t for t in self.tracked_tracks if t.state == TrackState.Tracked
        ]
        self.tracked_tracks = joint_tracks(self.tracked_tracks, activated_starcks)
        self.tracked_tracks = joint_tracks(self.tracked_tracks, refind_stracks)
        self.lost_tracks = sub_tracks(self.lost_tracks, self.tracked_tracks)
        self.lost_tracks.extend(lost_stracks)
        self.lost_tracks = sub_tracks(self.lost_tracks, self.removed_tracks)
        self.removed_tracks.extend(removed_stracks)
        self.tracked_tracks, self.lost_tracks = remove_duplicate_tracks(
            self.tracked_tracks, self.lost_tracks
        )
        output_stracks = [track for track in self.tracked_tracks if track.is_activated]
        return output_stracks

使用方法

1.创建ByteTrack跟踪器

# 创建跟踪器
byte_tracker = sv.ByteTrack(track_thresh=0.25, track_buffer=30, match_thresh=0.8, frame_rate=30)

2.对YOLOv8的目标检测结果进行追踪

model = YOLO(path)
results = model(frame)[0]
detections = sv.Detections.from_ultralytics(results)
detections = byte_tracker.update_with_detections(detections)

3.显示追踪结果ID、检测框及标签信息

labels = [
            f"id{tracker_id} {model.model.names[class_id]}"
            for _, _, confidence, class_id, tracker_id
            in detections
        ]
annotated_frame = frame.copy()
annotated_frame = box_annotator.annotate(
            scene=annotated_frame,
            detections=detections,
            labels=labels)

最终检测效果如下:

四、过线计数判断方式

定义过线线段

定义用于统计过线的线段,代码如下:

cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
point_A = [10, int(height/5*4)]
point_B = [width-10, int(height/5)]
# 定义过线使用的线段点
LINE_START = sv.Point(point_A[0], point_A[1])
LINE_END = sv.Point(point_B[0], point_B[1])
line_zone = MyLineZone(start=LINE_START, end=LINE_END)

判断过线方法

使用目标中心点判断是否过线,核心代码如下:

for i, (xyxy, _, confidence, class_id, tracker_id) in enumerate(detections):
            if tracker_id is None:
                continue
            # 使用中心点判断是否过线
            x1, y1, x2, y2 = xyxy
            center_x = int((x1 + x2) / 2)
            center_y = int((y1 + y2) / 2)
            center_point = Point(x=center_x, y=center_y)
            triggers = [self.vector.is_in(point=center_point)]

上述通过目标框坐标计算出目标中心点坐标center_x ,center_y ,然后通过is_in函数判断过线状态,其中is_in函数定义如下:

def is_in(self, point: Point) -> bool:
        v1 = Vector(self.start, self.end)
        v2 = Vector(self.start, point)
        cross_product = (v1.end.x - v1.start.x) * (v2.end.y - v2.start.y) - (
            v1.end.y - v1.start.y
        ) * (v2.end.x - v2.start.x)
        return cross_product < 0

函数首先根据线段的起点和终点构造两个向量v1和v2,分别表示线段和待判断的点与线段起点的向量。然后计算两个向量的叉积,并判断叉积的正负来确定点的位置关系。若叉积小于0,则点在线段的左侧;若叉积大于0,则点在线段的右侧;若叉积等于0,则点在线段上。根据题设,函数返回的是点在线段不同侧的状态,即当叉积小于0时返回True,否则返回False

判断是否通过线段

上述判断方式只能用于判断目标是否通过线段所在直线,并不是在线段内通过。如果想判断在线段内通过,需要另外加上过线时的判断条件,核心代码如下:

def point_in_line(self, center_point):
        # 判断点是否在线段之间通过
        # 计算向量 AP 与向量 AB 的点积(也称为“标量积”)
        # 点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正)
        point_A, point_B = self.get_line_points(self.vector)
        xA, yA = point_A
        xB, yB = point_B
        xP, yP = center_point
        AB = (xB - xA, yB - yA)
        AP = (xP - xA, yP - yA)
        # 计算向量 AP 与向量 AB 的点积
        dot_product = AB[0] * AP[0] + AB[1] * AP[1]
        # 计算向量 AB 模长的平方
        AB_length_squared = AB[0] ** 2 + AB[1] ** 2
        # 判断标准:点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正)
        if 0 <= dot_product <= AB_length_squared and dot_product >= 0:
            within_segment = True
        else:
            within_segment = False
        return within_segment

判断点是否在线段之间通过,通过计算向量 AP向量 AB点积(也称为“标量积”)来进行判断。其中P表示目标中心点,AB表示目标需要通过的线段。

判断标准:点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正),则表示在线段内通过。

过线效果展示

过线效果展示如下:

以上便是关于此款行人检测追踪与双向流量计数系统的原理与代码介绍。基于以上内容,博主用pythonPyqt5开发了一个带界面的软件系统,即文中第二部分的演示内容,能够很好的通过视频及摄像头进行检测追踪,以及自定义过线计数

相关文章
|
5月前
|
机器学习/深度学习 人工智能 文字识别
中药材图像识别数据集(100类,9200张)|适用于YOLO系列深度学习分类检测任务
本数据集包含9200张中药材图像,覆盖100种常见品类,已标注并划分为训练集与验证集,支持YOLO等深度学习模型。适用于中药分类、目标检测、AI辅助识别及教学应用,助力中医药智能化发展。
|
5月前
|
机器学习/深度学习 数据采集 人工智能
深度学习实战指南:从神经网络基础到模型优化的完整攻略
🌟 蒋星熠Jaxonic,AI探索者。深耕深度学习,从神经网络到Transformer,用代码践行智能革命。分享实战经验,助你构建CV、NLP模型,共赴二进制星辰大海。
|
7月前
|
机器学习/深度学习 人工智能 监控
河道塑料瓶识别标准数据集 | 科研与项目必备(图片已划分、已标注)| 适用于YOLO系列深度学习分类检测任务【数据集分享】
随着城市化进程加快和塑料制品使用量增加,河道中的塑料垃圾问题日益严重。塑料瓶作为河道漂浮垃圾的主要类型,不仅破坏水体景观,还威胁水生生态系统的健康。传统的人工巡查方式效率低、成本高,难以满足实时监控与治理的需求。
|
7月前
|
机器学习/深度学习 传感器 人工智能
火灾火焰识别数据集(2200张图片已划分、已标注)|适用于YOLO系列深度学习分类检测任务【数据集分享】
在人工智能和计算机视觉的快速发展中,火灾检测与火焰识别逐渐成为智慧城市、公共安全和智能监控的重要研究方向。一个高质量的数据集往往是推动相关研究的核心基础。本文将详细介绍一个火灾火焰识别数据集,该数据集共包含 2200 张图片,并已按照 训练集(train)、验证集(val)、测试集(test) 划分,同时配有对应的标注文件,方便研究者快速上手模型训练与评估。
2197 10
火灾火焰识别数据集(2200张图片已划分、已标注)|适用于YOLO系列深度学习分类检测任务【数据集分享】
|
7月前
|
机器学习/深度学习 人工智能 监控
坐姿标准好坏姿态数据集(图片已划分、已标注)|适用于YOLO系列深度学习分类检测任务【数据集分享】
坐姿标准好坏姿态数据集的发布,填补了计算机视觉领域在“细分健康行为识别”上的空白。它不仅具有研究价值,更在实际应用层面具备广阔前景。从青少年的健康教育,到办公室的智能提醒,再到驾驶员的安全监控和康复训练,本数据集都能发挥巨大的作用。
坐姿标准好坏姿态数据集(图片已划分、已标注)|适用于YOLO系列深度学习分类检测任务【数据集分享】
|
7月前
|
机器学习/深度学习 数据采集 算法
PCB电路板缺陷检测数据集(近千张图片已划分、已标注)| 适用于YOLO系列深度学习检测任务【数据集分享】
在现代电子制造中,印刷电路板(PCB)是几乎所有电子设备的核心组成部分。随着PCB设计复杂度不断增加,人工检测PCB缺陷不仅效率低,而且容易漏检或误判。因此,利用计算机视觉和深度学习技术对PCB缺陷进行自动检测成为行业发展的必然趋势。
PCB电路板缺陷检测数据集(近千张图片已划分、已标注)| 适用于YOLO系列深度学习检测任务【数据集分享】
|
7月前
|
机器学习/深度学习 编解码 人工智能
102类农业害虫数据集(20000张图片已划分、已标注)|适用于YOLO系列深度学习分类检测任务【数据集分享】
在现代农业发展中,病虫害监测与防治 始终是保障粮食安全和提高农作物产量的关键环节。传统的害虫识别主要依赖人工观察与统计,不仅效率低下,而且容易受到主观经验、环境条件等因素的影响,导致识别准确率不足。
|
机器学习/深度学习 人工智能 监控
单车、共享单车已标注数据集(图片已划分、已标注)|适用于深度学习检测任务【数据集分享】
数据是人工智能的“燃料”。一个高质量、标注精准的单车与共享单车数据集,不仅能够推动学术研究的进步,还能为智慧交通、智慧城市的建设提供有力支撑。 在计算机视觉领域,研究者们常常会遇到“数据鸿沟”问题:公开数据集与真实业务需求之间存在不匹配。本次分享的数据集正是为了弥补这一不足,使得研究人员与工程师能够快速切入单车检测领域,加速模型从实验室走向真实应用场景。
|
6月前
|
数据采集 机器学习/深度学习 人工智能
Python:现代编程的首选语言
Python:现代编程的首选语言
782 102
|
6月前
|
数据采集 机器学习/深度学习 算法框架/工具
Python:现代编程的瑞士军刀
Python:现代编程的瑞士军刀
413 104

推荐镜像

更多