一、介绍
在IPC监控视频中,很多IPC现在支持区域检测,当在区域内检测到有人闯入时,发送报警并联动报警系统,以保障生命和财产安全具有重大意义。它能够在第一时间检测到人员进入危险区域的行为,并发出及时警告,从而防止潜在事故的发生。
简单说是,在地图上标记出禁区(多边形),用计算机视觉技术监控进入禁区的物体。
现在很多摄像头模组,都自带了移动侦测功能,比如海思,君正,RK等。
以前有在RV1126上实现过类似的,现在想在RK3568上实现。
记录下PC端测试情况。
检测流程:
1、使用YOLOV5识别人物
2、使用ByteTrack实现多目标跟踪
3、使用射线法判断点是否在区域内
二、环境搭建
环境搭建参考AI项目二十二:行人属性识别-CSDN博客
项目结构
ByteTrack是git下载的源码
fonts存放了字体文件
weights存放yolov5s.pt模型
yolov5是git下载的源码
main.py主程序
mask_face.py是人脸遮挡代码
track.py是多目标根据和闯入识别代码
三、代码解析
代码功能不多,直接附上源码
main.py
import cv2
import torch
import numpy as np
from PIL import Image, ImageDraw, ImageFont
print("0")
from mask_face import mask_face
print("2")
from track import PersonTrack
print("1")
def cv2_add_chinese_text(img, text, position, text_color=(0, 255, 0), tex_size=30):
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img)
font_style = ImageFont.truetype(
"./fonts/MSYH.ttc", tex_size, encoding="utf-8")
draw.text(position, text, text_color, font=font_style)
return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
print("2")
class BreakInDetection:
def __init__(self):
self.yolov5_model = torch.hub.load('yolov5'
, 'custom'
, path='./weights/yolov5s.pt'
, source='local')
self.yolov5_model.conf = 0.7
self.tracker = PersonTrack()
@staticmethod
def yolo_pd_to_numpy(yolo_pd):
box_list = yolo_pd.to_numpy()
detections = []
for box in box_list:
l, t = int(box[0]), int(box[1])
r, b = int(box[2]), int(box[3])
conf = box[4]
detections.append([l, t, r, b, conf])
return np.array(detections, dtype=float)
def plot_detection(self, person_track_dict, penalty_zone_point_list, frame, frame_idx):
print(frame_idx)
break_in_num = 0
for track_id, detection in person_track_dict.items():
l, t, r, b = detection.ltrb
track_id = detection.track_id
print(track_id, detection.is_break_in)
if detection.is_break_in:
box_color = (0, 0, 255)
id_color = (0, 0, 255)
break_in_num += 1
else:
box_color = (0, 255, 0)
id_color = (255, 0, 0)
frame[t:b, l:r] = mask_face(frame[t:b, l:r])
# 人体框
cv2.rectangle(frame, (l, t), (r, b), box_color, 1)
cv2.putText(frame, f'id-{track_id}', (l + 2, t - 3), cv2.FONT_HERSHEY_PLAIN, 3, id_color, 2)
# 绘制禁区
pts = np.array(penalty_zone_point_list, np.int32)
pts = pts.reshape((-1, 1, 2))
cv2.polylines(frame, [pts], True, (0, 0, 255), 2)
cover = np.zeros((frame.shape[0], frame.shape[1], 3), np.uint8)
cover = cv2.fillPoly(cover, [pts], (0, 0, 255))
frame = cv2.addWeighted(frame, 0.9, cover, 0.3, 0)
frame = cv2_add_chinese_text(frame, f'禁区', (600, 450), (255, 0, 0), 30)
# 统计区
info_frame_h, info_frame_w = 200, 400
info_frame = np.zeros((info_frame_h, info_frame_w, 3), np.uint8)
if_l, if_t = 100, 100
if_r, if_b = if_l + info_frame_w, if_t + info_frame_h
frame_part = frame[if_t:if_b, if_l:if_r]
mixed_frame = cv2.addWeighted(frame_part, 0.6, info_frame, 0.7, 0)
frame[if_t:if_b, if_l:if_r] = mixed_frame
frame = cv2_add_chinese_text(frame, f'统计', (if_l + 150, if_t + 10), (255, 0, 0), 40)
frame = cv2_add_chinese_text(frame, f'当前闯入禁区 {break_in_num} 人', (if_l + 60, if_t + 80), (255, 0, 0), 35)
return frame
def detect(self):
cap = cv2.VideoCapture('./video.mp4')
video_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = round(cap.get(cv2.CAP_PROP_FPS))
print(fps)
video_writer = cv2.VideoWriter('./video_result.mp4', cv2.VideoWriter_fourcc(*'H264'), fps, (video_w, video_h))
frame_idx = 0
while cap.isOpened():
frame_idx += 1
success, frame = cap.read()
if not success:
print("Ignoring empty camera frame.")
break
results = self.yolov5_model(frame[:, :, ::-1])
pd = results.pandas().xyxy[0]
person_pd = pd[pd['name'] == 'person']
person_det_boxes = self.yolo_pd_to_numpy(person_pd)
if len(person_det_boxes) > 0:
person_track_dict, penalty_zone_point_list = self.tracker.update_track(person_det_boxes, frame)
frame = self.plot_detection(person_track_dict, penalty_zone_point_list, frame, frame_idx)
cv2.imshow('Break in Detection', frame)
video_writer.write(frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
cv2.destroyAllWindows()
print("3")
if __name__ == '__main__':
BreakInDetection().detect()
mask_face.py
import cv2
import mediapipe as mp
face_detection = mp.solutions.face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.3)
def mask_face(frame):
frame.flags.writeable = False
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_detection.process(frame)
frame_h, frame_w = frame.shape[:2]
frame.flags.writeable = True
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
if results.detections:
for detection in results.detections:
face_box = detection.location_data.relative_bounding_box
xmin, ymin, face_w, face_h = face_box.xmin, face_box.ymin, face_box.width, face_box.height
l, t = int(xmin*frame_w), int(ymin*frame_h)
r, b = l+int(face_w*frame_w), t+int(face_h*frame_h)
cv2.rectangle(frame, (l, t), (r, b), (203, 192, 255), -1)
return frame
track.py
from dataclasses import dataclass
import numpy as np
from collections import deque
import cv2
import paddleclas
import sys
sys.path.append('./ByteTrack/')
from yolox.tracker.byte_tracker import BYTETracker, STrack
@dataclass(frozen=True)
class BYTETrackerArgs:
track_thresh: float = 0.25
track_buffer: int = 30
match_thresh: float = 0.8
aspect_ratio_thresh: float = 3.0
min_box_area: float = 1.0
mot20: bool = False
class Detection(object):
def __init__(self, ltrb, track_id, is_break_in):
self.track_id = track_id
self.ltrb = ltrb
self.is_break_in = is_break_in # 是否闯入
self.track_list = deque(maxlen=30)
def update(self, ltrb, is_break_in):
self.ltrb = ltrb
self.is_break_in = is_break_in
l, t, r, b = ltrb
self.track_list.append(((l+r)//2, b))
class PersonTrack(object):
def __init__(self):
self.byte_tracker = BYTETracker(BYTETrackerArgs())
self.detection_dict = {}
# 禁区多边形
point1 = (400, 440)
point2 = (460, 579)
point3 = (920, 600)
point4 = (960, 450)
self.penalty_zone_point_list = [point1, point2, point3, point4]
def is_point_in_polygon(self, vertices, point):
"""
判断点是否在多边形内
:param vertices: 多边形顶点坐标列表 [(x1, y1), (x2, y2), ..., (xn, yn)]
:param point: 需要判断的点坐标 (x, y)
:return: True or False
"""
n = len(vertices)
inside = False
p1x, p1y = vertices[0]
for i in range(1, n + 1):
p2x, p2y = vertices[i % n]
if point[1] > min(p1y, p2y):
if point[1] <= max(p1y, p2y):
if point[0] <= max(p1x, p2x):
if p1y != p2y:
xints = (point[1] - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
if p1x == p2x or point[0] <= xints:
inside = not inside
p1x, p1y = p2x, p2y
return inside
def update_track(self, boxes, frame):
tracks = self.byte_tracker.update(
output_results=boxes,
img_info=frame.shape,
img_size=frame.shape
)
new_detection_dict = {}
for track in tracks:
l, t, r, b = track.tlbr.astype(np.int32)
track_id = track.track_id
# 判断人是否闯入
detect_point = ((l + r)//2, b)
is_break_in = self.is_point_in_polygon(self.penalty_zone_point_list, detect_point)
if track_id in self.detection_dict:
detection = self.detection_dict[track_id]
detection.update((l, t, r, b), is_break_in)
else:
detection = Detection((l, t, r, b), track_id, is_break_in)
new_detection_dict[track_id] = detection
self.detection_dict = new_detection_dict
return self.detection_dict, self.penalty_zone_point_list
代码需要注意的是:
一、区域位置
二、显示参数位置
这几个参数需要根据视频的大小,去调整位置,不然会报错。
三、检测点是否在区域内
转成C语言直接部署到RK3568上。
后续将部署到RK3568,参考git和讯为电子多目标检测已实现。