深度学习实战】行人检测追踪与双向流量计数系统【python源码+Pyqt5界面+数据集+训练代码】YOLOv8、ByteTrack、目标追踪、双向计数、行人检测追踪、过线计数(3)

简介: 深度学习实战】行人检测追踪与双向流量计数系统【python源码+Pyqt5界面+数据集+训练代码】YOLOv8、ByteTrack、目标追踪、双向计数、行人检测追踪、过线计数

深度学习实战】行人检测追踪与双向流量计数系统【python源码+Pyqt5界面+数据集+训练代码】YOLOv8、ByteTrack、目标追踪、双向计数、行人检测追踪、过线计数(2)https://developer.aliyun.com/article/1536938

ByteTrack的实现代码如下:

class ByteTrack:
    """
    Initialize the ByteTrack object.
    Parameters:
        track_thresh (float, optional): Detection confidence threshold
            for track activation.
        track_buffer (int, optional): Number of frames to buffer when a track is lost.
        match_thresh (float, optional): Threshold for matching tracks with detections.
        frame_rate (int, optional): The frame rate of the video.
    """
    def __init__(
        self,
        track_thresh: float = 0.25,
        track_buffer: int = 30,
        match_thresh: float = 0.8,
        frame_rate: int = 30,
    ):
        self.track_thresh = track_thresh
        self.match_thresh = match_thresh
        self.frame_id = 0
        self.det_thresh = self.track_thresh + 0.1
        self.max_time_lost = int(frame_rate / 30.0 * track_buffer)
        self.kalman_filter = KalmanFilter()
        self.tracked_tracks: List[STrack] = []
        self.lost_tracks: List[STrack] = []
        self.removed_tracks: List[STrack] = []
    def update_with_detections(self, detections: Detections) -> Detections:
        """
        Updates the tracker with the provided detections and
            returns the updated detection results.
        Parameters:
            detections: The new detections to update with.
        Returns:
            Detection: The updated detection results that now include tracking IDs.
        """
        tracks = self.update_with_tensors(
            tensors=detections2boxes(detections=detections)
        )
        detections = Detections.empty()
        if len(tracks) > 0:
            detections.xyxy = np.array(
                [track.tlbr for track in tracks], dtype=np.float32
            )
            detections.class_id = np.array(
                [int(t.class_ids) for t in tracks], dtype=int
            )
            detections.tracker_id = np.array(
                [int(t.track_id) for t in tracks], dtype=int
            )
            detections.confidence = np.array(
                [t.score for t in tracks], dtype=np.float32
            )
        else:
            detections.tracker_id = np.array([], dtype=int)
        return detections
    def update_with_tensors(self, tensors: np.ndarray) -> List[STrack]:
        """
        Updates the tracker with the provided tensors and returns the updated tracks.
        Parameters:
            tensors: The new tensors to update with.
        Returns:
            List[STrack]: Updated tracks.
        """
        self.frame_id += 1
        activated_starcks = []
        refind_stracks = []
        lost_stracks = []
        removed_stracks = []
        class_ids = tensors[:, 5]
        scores = tensors[:, 4]
        bboxes = tensors[:, :4]
        remain_inds = scores > self.track_thresh
        inds_low = scores > 0.1
        inds_high = scores < self.track_thresh
        inds_second = np.logical_and(inds_low, inds_high)
        dets_second = bboxes[inds_second]
        dets = bboxes[remain_inds]
        scores_keep = scores[remain_inds]
        scores_second = scores[inds_second]
        class_ids_keep = class_ids[remain_inds]
        class_ids_second = class_ids[inds_second]
        if len(dets) > 0:
            """Detections"""
            detections = [
                STrack(STrack.tlbr_to_tlwh(tlbr), s, c)
                for (tlbr, s, c) in zip(dets, scores_keep, class_ids_keep)
            ]
        else:
            detections = []
        """ Add newly detected tracklets to tracked_stracks"""
        unconfirmed = []
        tracked_stracks = []  # type: list[STrack]
        for track in self.tracked_tracks:
            if not track.is_activated:
                unconfirmed.append(track)
            else:
                tracked_stracks.append(track)
        """ Step 2: First association, with high score detection boxes"""
        strack_pool = joint_tracks(tracked_stracks, self.lost_tracks)
        # Predict the current location with KF
        STrack.multi_predict(strack_pool)
        dists = matching.iou_distance(strack_pool, detections)
        dists = matching.fuse_score(dists, detections)
        matches, u_track, u_detection = matching.linear_assignment(
            dists, thresh=self.match_thresh
        )
        for itracked, idet in matches:
            track = strack_pool[itracked]
            det = detections[idet]
            if track.state == TrackState.Tracked:
                track.update(detections[idet], self.frame_id)
                activated_starcks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)
        """ Step 3: Second association, with low score detection boxes"""
        # association the untrack to the low score detections
        if len(dets_second) > 0:
            """Detections"""
            detections_second = [
                STrack(STrack.tlbr_to_tlwh(tlbr), s, c)
                for (tlbr, s, c) in zip(dets_second, scores_second, class_ids_second)
            ]
        else:
            detections_second = []
        r_tracked_stracks = [
            strack_pool[i]
            for i in u_track
            if strack_pool[i].state == TrackState.Tracked
        ]
        dists = matching.iou_distance(r_tracked_stracks, detections_second)
        matches, u_track, u_detection_second = matching.linear_assignment(
            dists, thresh=0.5
        )
        for itracked, idet in matches:
            track = r_tracked_stracks[itracked]
            det = detections_second[idet]
            if track.state == TrackState.Tracked:
                track.update(det, self.frame_id)
                activated_starcks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)
        for it in u_track:
            track = r_tracked_stracks[it]
            if not track.state == TrackState.Lost:
                track.mark_lost()
                lost_stracks.append(track)
        """Deal with unconfirmed tracks, usually tracks with only one beginning frame"""
        detections = [detections[i] for i in u_detection]
        dists = matching.iou_distance(unconfirmed, detections)
        dists = matching.fuse_score(dists, detections)
        matches, u_unconfirmed, u_detection = matching.linear_assignment(
            dists, thresh=0.7
        )
        for itracked, idet in matches:
            unconfirmed[itracked].update(detections[idet], self.frame_id)
            activated_starcks.append(unconfirmed[itracked])
        for it in u_unconfirmed:
            track = unconfirmed[it]
            track.mark_removed()
            removed_stracks.append(track)
        """ Step 4: Init new stracks"""
        for inew in u_detection:
            track = detections[inew]
            if track.score < self.det_thresh:
                continue
            track.activate(self.kalman_filter, self.frame_id)
            activated_starcks.append(track)
        """ Step 5: Update state"""
        for track in self.lost_tracks:
            if self.frame_id - track.end_frame > self.max_time_lost:
                track.mark_removed()
                removed_stracks.append(track)
        self.tracked_tracks = [
            t for t in self.tracked_tracks if t.state == TrackState.Tracked
        ]
        self.tracked_tracks = joint_tracks(self.tracked_tracks, activated_starcks)
        self.tracked_tracks = joint_tracks(self.tracked_tracks, refind_stracks)
        self.lost_tracks = sub_tracks(self.lost_tracks, self.tracked_tracks)
        self.lost_tracks.extend(lost_stracks)
        self.lost_tracks = sub_tracks(self.lost_tracks, self.removed_tracks)
        self.removed_tracks.extend(removed_stracks)
        self.tracked_tracks, self.lost_tracks = remove_duplicate_tracks(
            self.tracked_tracks, self.lost_tracks
        )
        output_stracks = [track for track in self.tracked_tracks if track.is_activated]
        return output_stracks

使用方法

1.创建ByteTrack跟踪器

# 创建跟踪器
byte_tracker = sv.ByteTrack(track_thresh=0.25, track_buffer=30, match_thresh=0.8, frame_rate=30)

2.对YOLOv8的目标检测结果进行追踪

model = YOLO(path)
results = model(frame)[0]
detections = sv.Detections.from_ultralytics(results)
detections = byte_tracker.update_with_detections(detections)

3.显示追踪结果ID、检测框及标签信息

labels = [
            f"id{tracker_id} {model.model.names[class_id]}"
            for _, _, confidence, class_id, tracker_id
            in detections
        ]
annotated_frame = frame.copy()
annotated_frame = box_annotator.annotate(
            scene=annotated_frame,
            detections=detections,
            labels=labels)

最终检测效果如下:

四、过线计数判断方式

定义过线线段

定义用于统计过线的线段,代码如下:

cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
point_A = [10, int(height/5*4)]
point_B = [width-10, int(height/5)]
# 定义过线使用的线段点
LINE_START = sv.Point(point_A[0], point_A[1])
LINE_END = sv.Point(point_B[0], point_B[1])
line_zone = MyLineZone(start=LINE_START, end=LINE_END)

判断过线方法

使用目标中心点判断是否过线,核心代码如下:

for i, (xyxy, _, confidence, class_id, tracker_id) in enumerate(detections):
            if tracker_id is None:
                continue
            # 使用中心点判断是否过线
            x1, y1, x2, y2 = xyxy
            center_x = int((x1 + x2) / 2)
            center_y = int((y1 + y2) / 2)
            center_point = Point(x=center_x, y=center_y)
            triggers = [self.vector.is_in(point=center_point)]

上述通过目标框坐标计算出目标中心点坐标center_x ,center_y ,然后通过is_in函数判断过线状态,其中is_in函数定义如下:

def is_in(self, point: Point) -> bool:
        v1 = Vector(self.start, self.end)
        v2 = Vector(self.start, point)
        cross_product = (v1.end.x - v1.start.x) * (v2.end.y - v2.start.y) - (
            v1.end.y - v1.start.y
        ) * (v2.end.x - v2.start.x)
        return cross_product < 0

函数首先根据线段的起点和终点构造两个向量v1和v2,分别表示线段和待判断的点与线段起点的向量。然后计算两个向量的叉积,并判断叉积的正负来确定点的位置关系。若叉积小于0,则点在线段的左侧;若叉积大于0,则点在线段的右侧;若叉积等于0,则点在线段上。根据题设,函数返回的是点在线段不同侧的状态,即当叉积小于0时返回True,否则返回False

判断是否通过线段

上述判断方式只能用于判断目标是否通过线段所在直线,并不是在线段内通过。如果想判断在线段内通过,需要另外加上过线时的判断条件,核心代码如下:

def point_in_line(self, center_point):
        # 判断点是否在线段之间通过
        # 计算向量 AP 与向量 AB 的点积(也称为“标量积”)
        # 点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正)
        point_A, point_B = self.get_line_points(self.vector)
        xA, yA = point_A
        xB, yB = point_B
        xP, yP = center_point
        AB = (xB - xA, yB - yA)
        AP = (xP - xA, yP - yA)
        # 计算向量 AP 与向量 AB 的点积
        dot_product = AB[0] * AP[0] + AB[1] * AP[1]
        # 计算向量 AB 模长的平方
        AB_length_squared = AB[0] ** 2 + AB[1] ** 2
        # 判断标准:点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正)
        if 0 <= dot_product <= AB_length_squared and dot_product >= 0:
            within_segment = True
        else:
            within_segment = False
        return within_segment

判断点是否在线段之间通过,通过计算向量 AP向量 AB点积(也称为“标量积”)来进行判断。其中P表示目标中心点,AB表示目标需要通过的线段。

判断标准:点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正),则表示在线段内通过。

过线效果展示

过线效果展示如下:

以上便是关于此款行人检测追踪与双向流量计数系统的原理与代码介绍。基于以上内容,博主用pythonPyqt5开发了一个带界面的软件系统,即文中第二部分的演示内容,能够很好的通过视频及摄像头进行检测追踪,以及自定义过线计数

相关文章
|
8天前
|
JSON 算法 API
1688商品详情API实战:Python调用全流程与数据解析技巧
本文介绍了1688电商平台的商品详情API接口,助力电商从业者高效获取商品信息。接口可返回商品基础属性、价格体系、库存状态、图片描述及商家详情等多维度数据,支持全球化语言设置。通过Python示例代码展示了如何调用该接口,帮助用户快速上手,适用于选品分析、市场研究等场景。
|
4天前
|
数据采集 自然语言处理 Java
Playwright 多语言一体化——Python/Java/.NET 全栈采集实战
本文以反面教材形式,剖析了在使用 Playwright 爬取懂车帝车友圈问答数据时常见的配置错误(如未设置代理、Cookie 和 User-Agent),并提供了 Python、Java 和 .NET 三种语言的修复代码示例。通过错误示例 → 问题剖析 → 修复过程 → 总结教训的完整流程,帮助读者掌握如何正确配置爬虫代理及其它必要参数,避免 IP 封禁和反爬检测,实现高效数据采集与分析。
Playwright 多语言一体化——Python/Java/.NET 全栈采集实战
|
4天前
|
监控 供应链 数据挖掘
淘宝商品详情API接口解析与 Python 实战指南
淘宝商品详情API接口是淘宝开放平台提供的编程工具,支持开发者获取商品详细信息,包括基础属性、价格、库存、销售策略及卖家信息等。适用于电商数据分析、竞品分析与价格策略优化等场景。接口功能涵盖商品基础信息、详情描述、图片视频资源、SKU属性及评价统计的查询。通过构造请求URL和签名,可便捷调用数据。典型应用场景包括电商比价工具、商品数据分析平台、供应链管理及营销活动监控等,助力高效运营与决策。
82 26
|
4天前
|
人工智能 缓存 搜索推荐
1688图片搜索API接口解析与 Python实战指南
1688图片搜索API接口支持通过上传图片搜索相似商品,适用于电商及商品推荐场景。用户上传图片后,经图像识别提取特征并生成关键词,调用接口返回包含商品ID、标题和价格的相似商品列表。该接口需提供图片URL或Base64编码数据,还可附加分页与筛选参数。示例代码展示Python调用方法,调试时建议使用沙箱环境测试稳定性,并优化性能与错误处理逻辑。
|
小程序 Python
如何用python做一个简单的输入输出交互界面?
想问下你写的程序怎么分享给别人使用? **直接发代码!**那不会代码的人岂不是得抓瞎 **那做成网站或者微信小程序!**时间成本太高了,更何况服务器又是一笔成本,后期可能还得不断维护
747 0
如何用python做一个简单的输入输出交互界面?
|
2月前
|
机器学习/深度学习 存储 设计模式
Python 高级编程与实战:深入理解性能优化与调试技巧
本文深入探讨了Python的性能优化与调试技巧,涵盖profiling、caching、Cython等优化工具,以及pdb、logging、assert等调试方法。通过实战项目,如优化斐波那契数列计算和调试Web应用,帮助读者掌握这些技术,提升编程效率。附有进一步学习资源,助力读者深入学习。
|
12天前
|
数据采集 安全 BI
用Python编程基础提升工作效率
一、文件处理整明白了,少加两小时班 (敲暖气管子)领导让整理100个Excel表?手都干抽筋儿了?Python就跟铲雪车似的,哗哗给你整利索!
50 11
|
2月前
|
人工智能 Java 数据安全/隐私保护
[oeasy]python081_ai编程最佳实践_ai辅助编程_提出要求_解决问题
本文介绍了如何利用AI辅助编程解决实际问题,以猫屎咖啡的购买为例,逐步实现将购买斤数换算成人民币金额的功能。文章强调了与AI协作时的三个要点:1) 去除无关信息,聚焦目标;2) 将复杂任务拆解为小步骤,逐步完成;3) 巩固已有成果后再推进。最终代码实现了输入验证、单位转换和价格计算,并保留两位小数。总结指出,在AI时代,人类负责明确目标、拆分任务和确认结果,AI则负责生成代码、解释含义和提供优化建议,编程不会被取代,而是会更广泛地融入各领域。
103 28
|
2月前
|
机器学习/深度学习 数据可视化 TensorFlow
Python 高级编程与实战:深入理解数据科学与机器学习
本文深入探讨了Python在数据科学与机器学习中的应用,介绍了pandas、numpy、matplotlib等数据科学工具,以及scikit-learn、tensorflow、keras等机器学习库。通过实战项目,如数据可视化和鸢尾花数据集分类,帮助读者掌握这些技术。最后提供了进一步学习资源,助力提升Python编程技能。
|
2月前
|
Python
[oeasy]python074_ai辅助编程_水果程序_fruits_apple_banana_加法_python之禅
本文回顾了从模块导入变量和函数的方法,并通过一个求和程序实例,讲解了Python中输入处理、类型转换及异常处理的应用。重点分析了“明了胜于晦涩”(Explicit is better than implicit)的Python之禅理念,强调代码应清晰明确。最后总结了加法运算程序的实现过程,并预告后续内容将深入探讨变量类型的隐式与显式问题。附有相关资源链接供进一步学习。
54 4