YOLOv5的Tricks | 【Trick13】YOLOv5的detect.py脚本的解析与简化

本文涉及的产品
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: YOLOv5的Tricks | 【Trick13】YOLOv5的detect.py脚本的解析与简化

1. Detect脚本使用


对于测试的都会存放在runs/detect文件目录下,使用例程只需要指定输入的数据,再指定训练好的权重即可


python detect.py --source 0  # webcam
                          img.jpg  # image    单个图像文件
                          vid.mp4  # video    单个视频文件
                          path/  # directory  目录文件
                          path/*.jpg  # glob  正则表达式表示
                          'https://youtu.be/Zgi9g1ksQHc'  # YouTube
                          'rtsp://example.com/media.mp4'  # RTSP, RTMP, HTTP stream


具体的配置文件可以通过输入:python detect.py -h(-help) 来查看。对于yolo跑出来的结构都会放在 ./run/detect 文件夹中,然后以exp依次命名,如下所示:

image.png


1)测试单张图片

python detect.py --source ./data/image/bus.jpg


2)测试图片目录

python detect.py --source ./data/image/


3)测试单个视频

python detect.py --source ./data/videos/test_movie


4)测试视频目录

python detect.py --source ./data/videos/


5)测试摄像头

python detect.py --source 0   # 其中0代表是本地摄像头,还有其他的摄像头


ps:摄像头捕捉的视频同样会保存在 ./run/detect 文件夹中。


详细见参考资料1.


2. Detect脚本解析


在detect.py脚本中,主体是run函数,然后对source的来源进行判断。如果是摄像头设置或者网页视频流则设置相关标志,构建 LoadStreams 数据集。如果是普通的目录文件,或者是视频文件图像文件,则构建 LoadImages 数据集。


构造了数据集,接下来就是迭代获取每一张图像 或者是 获取视频的每一帧进行处理,图像文件直接保存,帧图像着写入一个视频对象中。摄像头捕获的帧图像同样写入一个视频对象中。这里设置的视频文件是逐帧处理的,而摄像头捕获调用了一个额外线程不断捕获帧图像,所以只能是处理当前捕获到的帧,所以摄像头文件看起来会有点卡顿。


最后,代码为图像绘制边界框专门构造了一个绘图类 Annotator 来处理。无论是普通图像还是来着视频的帧图像,都是丢到模型获取预测结果然后进行nms处理获取最后的预测结果,然后对框进行重新缩放映射到原图上,然后画框保存文件,结束。


对于代码的解析我已经注释在相应位置了。


2.1 主体部分

detect.py主要代码

@torch.no_grad()
def run(weights=ROOT / 'yolov5s.pt',  # model.pt path(s)
        source=ROOT / 'data/images',  # file/dir/URL/glob, 0 for webcam
        imgsz=640,  # inference size (pixels)
        conf_thres=0.25,  # confidence threshold
        iou_thres=0.45,  # NMS IOU threshold
        max_det=1000,  # maximum detections per image
        device='',  # cuda device, i.e. 0 or 0,1,2,3 or cpu
        view_img=False,  # show results
        save_txt=False,  # save results to *.txt
        save_conf=False,  # save confidences in --save-txt labels
        save_crop=False,  # save cropped prediction boxes
        nosave=False,  # do not save images/videos
        classes=None,  # filter by class: --class 0, or --class 0 2 3
        agnostic_nms=False,  # class-agnostic NMS
        augment=False,  # augmented inference
        visualize=False,  # visualize features
        update=False,  # update all models
        project=ROOT / 'runs/detect',  # save results to project/name
        name='exp',  # save results to project/name
        exist_ok=False,  # existing project/name ok, do not increment
        line_thickness=3,  # bounding box thickness (pixels)
        hide_labels=False,  # hide labels
        hide_conf=False,  # hide confidences
        half=False,  # use FP16 half-precision inference
        dnn=False,  # use OpenCV DNN for ONNX inference
        ):
    source = str(source)
    save_img = not nosave and not source.endswith('.txt')  # save inference images
    webcam = source.isnumeric() or source.endswith('.txt') or source.lower().startswith(
        ('rtsp://', 'rtmp://', 'http://', 'https://'))
    # Directories
    save_dir = increment_path(Path(project) / name, exist_ok=exist_ok)  # increment run
    (save_dir / 'labels' if save_txt else save_dir).mkdir(parents=True, exist_ok=True)  # make dir
    # Initialize
    set_logging()
    device = select_device(device)
    half &= device.type != 'cpu'  # half precision only supported on CUDA
    # Load model
    w = str(weights[0] if isinstance(weights, list) else weights)
    classify, suffix, suffixes = False, Path(w).suffix.lower(), ['.pt', '.onnx', '.tflite', '.pb', '']
    check_suffix(w, suffixes)  # check weights have acceptable suffix
    pt, onnx, tflite, pb, saved_model = (suffix == x for x in suffixes)  # backend booleans
    stride, names = 64, [f'class{i}' for i in range(1000)]  # assign defaults
    if pt:
        model = torch.jit.load(w) if 'torchscript' in w else attempt_load(weights, map_location=device)
        stride = int(model.stride.max())  # model stride
        names = model.module.names if hasattr(model, 'module') else model.names  # get class names
        if half:
            model.half()  # to FP16
        if classify:  # second-stage classifier
            modelc = load_classifier(name='resnet50', n=2)  # initialize
            modelc.load_state_dict(torch.load('resnet50.pt', map_location=device)['model']).to(device).eval()
    elif onnx:
        if dnn:
            # check_requirements(('opencv-python>=4.5.4',))
            net = cv2.dnn.readNetFromONNX(w)
        else:
            check_requirements(('onnx', 'onnxruntime'))
            import onnxruntime
            session = onnxruntime.InferenceSession(w, None)
    else:  # TensorFlow models
        check_requirements(('tensorflow>=2.4.1',))
        import tensorflow as tf
        if pb:  # https://www.tensorflow.org/guide/migrate#a_graphpb_or_graphpbtxt
            def wrap_frozen_graph(gd, inputs, outputs):
                x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=""), [])  # wrapped import
                return x.prune(tf.nest.map_structure(x.graph.as_graph_element, inputs),
                               tf.nest.map_structure(x.graph.as_graph_element, outputs))
            graph_def = tf.Graph().as_graph_def()
            graph_def.ParseFromString(open(w, 'rb').read())
            frozen_func = wrap_frozen_graph(gd=graph_def, inputs="x:0", outputs="Identity:0")
        elif saved_model:
            model = tf.keras.models.load_model(w)
        elif tflite:
            interpreter = tf.lite.Interpreter(model_path=w)  # load TFLite model
            interpreter.allocate_tensors()  # allocate
            input_details = interpreter.get_input_details()  # inputs
            output_details = interpreter.get_output_details()  # outputs
            int8 = input_details[0]['dtype'] == np.uint8  # is TFLite quantized uint8 model
    imgsz = check_img_size(imgsz, s=stride)  # check image size
    # Dataloader
    if webcam:
        view_img = check_imshow()
        cudnn.benchmark = True  # set True to speed up constant image size inference
        dataset = LoadStreams(source, img_size=imgsz, stride=stride, auto=pt)   # 摄像头或者网页视频的数据集构建
        bs = len(dataset)  # batch_size
    else:
        dataset = LoadImages(source, img_size=imgsz, stride=stride, auto=pt)    # 图像文件与视频文件的数据集构建
        bs = 1  # batch_size 单进程
    vid_path, vid_writer = [None] * bs, [None] * bs
    # Run inference
    if pt and device.type != 'cpu':
        model(torch.zeros(1, 3, *imgsz).to(device).type_as(next(model.parameters())))  # run once
    dt, seen = [0.0, 0.0, 0.0], 0
    # 首先执行__iter__函数构建一个迭代器,最后每执行迭代一次就执行一次__next__函数
    # 返回是的文件路径,缩放图,原图,视频源属性(当读取图片时为None, 读取视频时为视频源)
    for path, img, im0s, vid_cap in dataset:
        t1 = time_sync()
        if onnx:
            img = img.astype('float32')
        else:
            # 格式转化+半精度设置
            img = torch.from_numpy(img).to(device)
            img = img.half() if half else img.float()  # uint8 to fp16/32
        img = img / 255.0  # 0 - 255 to 0.0 - 1.0
        # [h w c] -> [1 h w c]
        if len(img.shape) == 3:
            img = img[None]  # expand for batch dim
        t2 = time_sync()
        dt[0] += t2 - t1
        # Inference
        if pt:   # 主要是下面两行,其他的都无关
            visualize = increment_path(save_dir / Path(path).stem, mkdir=True) if visualize else False
            # pred shape=[1, num_boxes, xywh+obj_conf+classes] = [1, 18900, 25]
            pred = model(img, augment=augment, visualize=visualize)[0]
        elif onnx:
            if dnn:
                net.setInput(img)
                pred = torch.tensor(net.forward())
            else:
                pred = torch.tensor(session.run([session.get_outputs()[0].name], {session.get_inputs()[0].name: img}))
        else:  # tensorflow model (tflite, pb, saved_model)
            imn = img.permute(0, 2, 3, 1).cpu().numpy()  # image in numpy
            if pb:
                pred = frozen_func(x=tf.constant(imn)).numpy()
            elif saved_model:
                pred = model(imn, training=False).numpy()
            elif tflite:
                if int8:
                    scale, zero_point = input_details[0]['quantization']
                    imn = (imn / scale + zero_point).astype(np.uint8)  # de-scale
                interpreter.set_tensor(input_details[0]['index'], imn)
                interpreter.invoke()
                pred = interpreter.get_tensor(output_details[0]['index'])
                if int8:
                    scale, zero_point = output_details[0]['quantization']
                    pred = (pred.astype(np.float32) - zero_point) * scale  # re-scale
            pred[..., 0] *= imgsz[1]  # x
            pred[..., 1] *= imgsz[0]  # y
            pred[..., 2] *= imgsz[1]  # w
            pred[..., 3] *= imgsz[0]  # h
            pred = torch.tensor(pred)
        t3 = time_sync()
        dt[1] += t3 - t2
        # NMS 非极大值抑制处理
        # pred是一个list,存储了每张图像的最后预测结果,由于这里的图像和视频都是一张,所以list里面只会有一个内容(det)
        pred = non_max_suppression(pred, conf_thres, iou_thres, classes, agnostic_nms, max_det=max_det)
        dt[2] += time_sync() - t3
        # Second-stage classifier (optional)
        if classify:
            pred = apply_classifier(pred, modelc, img, im0s)
        # Process predictions
        # 对每张图像的预测结果的每个预测内容依次处理
        for i, det in enumerate(pred):  # per image
            seen += 1
            if webcam:  # batch_size >= 1
                p, s, im0, frame = path[i], f'{i}: ', im0s[i].copy(), dataset.count
            else:
                p, s, im0, frame = path, '', im0s.copy(), getattr(dataset, 'frame', 0)
            # 当前图片路径 如 F:\yolo_v5\yolov5-U\data\images\bus.jpg
            p = Path(p)  # to Path
            # 图片/视频的保存路径save_path 如 runs\\detect\\exp8\\bus.jpg
            save_path = str(save_dir / p.name)  # img.jpg
            # txt文件(保存预测框坐标)保存路径 如 runs\\detect\\exp8\\labels\\bus
            txt_path = str(save_dir / 'labels' / p.stem) + ('' if dataset.mode == 'image' else f'_{frame}')  # img.txt
            s += '%gx%g ' % img.shape[2:]  # print string: wxh
            # gn = [w, h, w, h]  用于后面的归一化
            gn = torch.tensor(im0.shape)[[1, 0, 1, 0]]  # normalization gain whwh
            imc = im0.copy() if save_crop else im0  # for save_crop
            # 创建了一个类用来对图像画框与添加文本信息
            annotator = Annotator(im0, line_width=line_thickness, example=str(names))
            if len(det):
                # Rescale boxes from img_size to im0 size
                # 将预测信息(相对img_size 640)映射回原图 img0 size, det:xyxy + conf + cls
                det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
                # Print results
                # 输出信息s + 检测到的各个类别的目标个数 (每张图像都会有一个这样的信息,对视频来说是每帧)
                for c in det[:, -1].unique():
                    n = (det[:, -1] == c).sum()  # detections per class
                    s += f"{n} {names[int(c)]}{'s' * (n > 1)}, "  # add to string
                # Write results
                # 对每个预测对象依次绘制在原图中 + 保存在txt文件中
                for *xyxy, conf, cls in reversed(det):
                    # 将每个图片的预测信息分别存入save_dir/labels下的xxx.txt中 每行: class_id+score+xywh
                    if save_txt:  # Write to file
                        # 将xyxy(左上角 + 右下角)格式转换为xywh(中心的 + 宽高)格式 并除以gn(whwh)做归一化 转为list再保存
                        xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist()  # normalized xywh
                        line = (cls, *xywh, conf) if save_conf else (cls, *xywh)  # label format
                        with open(txt_path + '.txt', 'a') as f:
                            f.write(('%g ' * len(line)).rstrip() % line + '\n')
                    # 在原图上绘制边界框
                    if save_img or save_crop or view_img:  # Add bbox to image
                        c = int(cls)  # integer class
                        # 在name这个列表字典中获取label名称
                        label = None if hide_labels else (names[c] if hide_conf else f'{names[c]} {conf:.2f}')
                        # 根据缩放后的预测边界框信息xyxy在原图上画框
                        annotator.box_label(xyxy, label, color=colors(c, True))
                        if save_crop:
                            # 如果需要就将预测到的目标剪切出来 保存成图片 保存在save_dir/crops下
                            save_one_box(xyxy, imc, file=save_dir / 'crops' / names[c] / f'{p.stem}.jpg', BGR=True)
            # Print time (inference-only)
            print(f'{s}Done. ({t3 - t2:.3f}s)')
            # Stream results
            # 获得画框后的原图
            im0 = annotator.result()
            # 是否需要显示我们预测后的结果  img0(此时已将pred结果可视化到了img0中)
            if view_img:
                cv2.imshow(str(p), im0)
                cv2.waitKey(1)  # 1 millisecond
            # Save results (image with detections)
            if save_img:
                # 如果当前处理的文件是图像,直接写入对应的目录下
                if dataset.mode == 'image':
                    cv2.imwrite(save_path, im0)
                # 如果当前处理的文件是视频,判断是否在处理同一个视频
                # 这里的i对于处理视频任务或者是图像任务的时候是没用的,因为此时pred只有一张图像,所以i一直为0
                else:  # 'video' or 'stream'
                    # 如果不是同一个视频,则重新构建一个视频写入对象
                    if vid_path[i] != save_path:  # new video
                        # 更新路径信息,使得之后可以跳过判断
                        vid_path[i] = save_path
                        # 释放上一次视频处理的缓存信息
                        if isinstance(vid_writer[i], cv2.VideoWriter):
                            vid_writer[i].release()  # release previous video writer
                        # 获取当前视频的一些信息
                        if vid_cap:  # video
                            # 获取当前视频的帧率与宽高,设置同样的格式,以确保相同帧率与宽高的视频输出
                            fps = vid_cap.get(cv2.CAP_PROP_FPS)
                            w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                            h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                        # 摄像头的视频流设置帧数为30
                        else:  # stream
                            fps, w, h = 30, im0.shape[1], im0.shape[0]
                            save_path += '.mp4'
                        # 创建写入视频对象,设置格式
                        vid_writer[i] = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
                    # 如果是同一个视频,跳过上面的判断直接逐帧写入视频文件中
                    # 如果不是同一个视频,则创建新的视频写入对象,同样逐帧写入视频文件中
                    vid_writer[i].write(im0)
    # Print results
    # 打印最后的相关信息
    t = tuple(x / seen * 1E3 for x in dt)  # speeds per image
    print(f'Speed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {(1, 3, *imgsz)}' % t)
    if save_txt or save_img:
        s = f"\n{len(list(save_dir.glob('labels/*.txt')))} labels saved to {save_dir / 'labels'}" if save_txt else ''
        print(f"Results saved to {colorstr('bold', save_dir)}{s}")
    if update:
        strip_optimizer(weights)  # update model (to fix SourceChangeWarning)


2.2 数据集构建

这里代码中还实现了一个 LoadWebcam 的类,但是没有用上,就不做过多解析了。


LoadImages类代码

class LoadImages:
    # YOLOv5 image/video dataloader, i.e. `python detect.py --source image.jpg/vid.mp4`
    def __init__(self, path, img_size=640, stride=32, auto=True):
        # 这里的图像和文件只能是当前目录下,如果是在目录的目录下是不会处理的
        p = str(Path(path).resolve())  # os-agnostic absolute path
        if '*' in p:                # 如果是采用正则表达式题,则可以使用glob获取相关的文件路径
            files = sorted(glob.glob(p, recursive=True))  # glob
        elif os.path.isdir(p):      # 如果是一个目录路径,提取目录文件中所有含有'*'的文件
            files = sorted(glob.glob(os.path.join(p, '*.*')))  # dir
        elif os.path.isfile(p):     # 如果是一个文件则直接获取
            files = [p]  # files
        else:
            raise Exception(f'ERROR: {p} does not exist')
        # 分别存储图像文件的全部路径和视频文件的全部路径
        images = [x for x in files if x.split('.')[-1].lower() in IMG_FORMATS]
        videos = [x for x in files if x.split('.')[-1].lower() in VID_FORMATS]
        ni, nv = len(images), len(videos)
        self.img_size = img_size
        self.stride = stride
        # 按顺序,先处理完全部图像文件再处理视频文件
        self.files = images + videos
        self.nf = ni + nv  # number of files
        # 是否是视频文件的标志
        self.video_flag = [False] * ni + [True] * nv
        self.mode = 'image'
        self.auto = auto
        if any(videos):
            # 如果含有视频文件,则先对第一个视频文件初始化opencv的视频模块
            self.new_video(videos[0])  # new video
        else:
            self.cap = None
        assert self.nf > 0, f'No images or videos found in {p}. ' \
                            f'Supported formats are:\nimages: {IMG_FORMATS}\nvideos: {VID_FORMATS}'
    # dataset开始迭代时执行一次开始时
    def __iter__(self):
        self.count = 0
        return self
    # dataset每迭代一次执行一次
    def __next__(self):
        # 当全部图像或者视频处理完时退出迭代训练
        if self.count == self.nf:
            raise StopIteration
        # 当前处理的文件路径
        path = self.files[self.count]
        # 如果当前处理的是视频,利用opencv逐帧读取
        if self.video_flag[self.count]:
            # Read video
            self.mode = 'video'
            # 依次读取每一帧(处理完一帧写入一个视频文件后继续处理下一帧)
            # ret_val为一个bool变量,直到视频读取完毕之前都为True
            ret_val, img0 = self.cap.read()
            # 当前视频帧全部读取完时
            if not ret_val:
                # 当前视频处理完成,获取下一个待处理视频文件的索引
                self.count += 1
                self.cap.release()
                # 如果处理完最后一个视频就处理完所有的待处理文件就退出迭代
                if self.count == self.nf:  # last video
                    raise StopIteration
                # 继续下一个文件处理
                else:
                    path = self.files[self.count]       # 获取下一个视频文件的路径
                    self.new_video(path)                # 重新初始化opencv对象
                    ret_val, img0 = self.cap.read()     # 继续开始逐帧读取
            # 准备下一帧索引,直到视频文件全部读取完,返回的ret_val即为False
            self.frame += 1
            # 打印视频的当前任务位置,当前处理帧位置,当前的处理视频路径,后续还有其他补充信息
            # eg: video 1/2 (13/5642) E:\videos\test_movie.mp4: 384x640 1 bird, 1 kite, Done. (0.225s)
            print(f'video {self.count + 1}/{self.nf} ({self.frame}/{self.frames}) {path}: ', end='')
        # 如果当前处理的是图像,直接利用opencv读取
        else:
            # 一个图像文件就是一个任务,读取完直接count+1
            # Read image
            self.count += 1
            img0 = cv2.imread(path)  # BGR
            assert img0 is not None, 'Image Not Found ' + path
            # 打印图像的当前任务位置,当前的处理图像路径,后续还有其他补充信息
            print(f'image {self.count}/{self.nf} {path}: ', end='')
        # Padded resize 重新缩放到下采样尺寸
        img = letterbox(img0, self.img_size, stride=self.stride, auto=self.auto)[0]
        # Convert
        img = img.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
        img = np.ascontiguousarray(img)
        return path, img, img0, self.cap
    # 一开始与一个视频任务完成时需要执行,确保迭代对象可以一直持续获取,只是需要区分好视频任务
    def new_video(self, path):
        self.frame = 0                      # 帧数记录
        self.cap = cv2.VideoCapture(path)   # 获取视频对象
        self.frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))   # 得到视频中的总帧数
    def __len__(self):
        return self.nf  # number of files


解析:


在处理普通目录下的图像和视频文件时,这里会先处理完所有的图像文件,然后再处理视频文件。然后当一个视频文件处理完时,需要立刻的进行下一个视频文件的处理,以让dataset一直迭代。但最后dataset迭代完成时,使用全部的视频文件已经处理结束了。


LoadStreams类代码

class LoadStreams:
    # YOLOv5 streamloader, i.e. `python detect.py --source 'rtsp://example.com/media.mp4'  # RTSP, RTMP, HTTP streams`
    def __init__(self, sources='streams.txt', img_size=640, stride=32, auto=True):
        self.mode = 'stream'
        self.img_size = img_size
        self.stride = stride
        # 如果sources为一个保存了多个视频流的文件  获取每一个视频流,保存为一个列表
        if os.path.isfile(sources):
            with open(sources, 'r') as f:
                sources = [x.strip() for x in f.read().strip().splitlines() if len(x.strip())]
        else:
            # 反之,只有一个视频流文件就直接保存
            sources = [sources]
        n = len(sources)
        # 初始化图片 fps 总帧数 线程数
        self.imgs, self.fps, self.frames, self.threads = [None] * n, [0] * n, [0] * n, [None] * n
        self.sources = [clean_str(x) for x in sources]  # clean source names for later
        self.auto = auto
        # 这里将多个视频流分别独立,各自构建一个线程进行动态读取,i表示的是第几个视频流的数据
        for i, s in enumerate(sources):  # index, source
            # Start thread to read frames from video stream
            # 打印当前视频index/总视频数/视频流地址
            print(f'{i + 1}/{n}: {s}... ', end='')
            if 'youtube.com/' in s or 'youtu.be/' in s:  # if source is YouTube video
                check_requirements(('pafy', 'youtube_dl'))
                import pafy
                s = pafy.new(s).getbest(preftype="mp4").url  # YouTube URL
            s = eval(s) if s.isnumeric() else s  # i.e. s = '0' local webcam
            # s='0'打开本地摄像头,否则打开视频流地址(分别独立的构建一个视频对象)
            cap = cv2.VideoCapture(s)
            # 对于b站链接,油管链接,是打不开的,在这里会进行报错
            assert cap.isOpened(), f'Failed to open {s}'
            # 获取视频的宽和长
            w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            # 获取每个视频流的帧率(摄像头的帧率为30)
            self.fps[i] = max(cap.get(cv2.CAP_PROP_FPS) % 100, 0) or 30.0  # 30 FPS fallback
            # 获取每个视频流的帧数(摄像头的帧数为0 所以设置为'inf')
            self.frames[i] = max(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 0) or float('inf')  # infinite stream fallback
            # 对每个视频流读取当前画面
            _, self.imgs[i] = cap.read()  # guarantee first frame
            # 创建多线程读取视频流,daemon表示主线程结束时子线程也结束
            # 其中args=([i, cap, s])是传入给update函数的参数
            self.threads[i] = Thread(target=self.update, args=([i, cap, s]), daemon=True)
            print(f" success ({self.frames[i]} frames {w}x{h} at {self.fps[i]:.2f} FPS)")
            self.threads[i].start()
        print('')  # newline
        # check for common shapes
        # 依次对每个视频流数据进行缩放处理,然后拼接在一起
        s = np.stack([letterbox(x, self.img_size, stride=self.stride, auto=self.auto)[0].shape for x in self.imgs])
        self.rect = np.unique(s, axis=0).shape[0] == 1  # rect inference if all shapes equal
        if not self.rect:
            print('WARNING: Different stream shapes detected. For optimal performance supply similarly-shaped streams.')
    # 这个函数是在后台进行的
    def update(self, i, cap, stream):
        # Read stream `i` frames in daemon thread
        n, f, read = 0, self.frames[i], 1  # frame number, frame array, inference every 'read' frame
        # n是当前处理的帧数,f是总帧数,当当前帧数大于总帧数时处理结束
        # 对于摄像头的视频流来说总帧数无穷大'inf',所以会一直循环执行
        while cap.isOpened() and n < f:
            n += 1
            # _, self.imgs[index] = cap.read()
            cap.grab()
            # 处理每一帧数据,read表示每多少帧处理一次
            if n % read == 0:
                # 读取当前帧
                success, im = cap.retrieve()
                # 在后台获取图像,等待迭代获取最新图像
                if success:
                    self.imgs[i] = im
                else:
                    print('WARNING: Video stream unresponsive, please check your IP camera connection.')
                    self.imgs[i] *= 0
                    cap.open(stream)  # re-open stream if signal was lost
            # 这里个人觉得是等待一帧处理的时间来进行推理,让推理速度追上读取速度
            time.sleep(1 / self.fps[i])  # wait time
    def __iter__(self):
        self.count = -1
        return self
    def __next__(self):
        self.count += 1
        if not all(x.is_alive() for x in self.threads) or cv2.waitKey(1) == ord('q'):  # q to quit
            cv2.destroyAllWindows()
            raise StopIteration
        # Letterbox
        # 对所有的视频流图像进行缩放处理然后构建成一个列表
        img0 = self.imgs.copy()
        img = [letterbox(x, self.img_size, stride=self.stride, auto=self.rect and self.auto)[0] for x in img0]
        # Stack
        # 将缩放后的图像列表拼接在一起,直接丢进模型进行预测处理
        # 如果source=0调用本地摄像头,那么每次这里的img只有一张图像,代表只有一个视频流的输入数据
        img = np.stack(img, 0)
        # Convert
        img = img[..., ::-1].transpose((0, 3, 1, 2))  # BGR to RGB, BHWC to BCHW
        img = np.ascontiguousarray(img)
        return self.sources, img, img0, None
    def __len__(self):
        # len(dataset) 表示返回当前同时处理多少个视频流
        return len(self.sources)  # 1E12 frames = 32 streams at 30 FPS for 30 years


解析:


这里主要的实现思路是为每个视频流都开了一个线程来不断的捕获帧图像,然后主线程对帧图像进行一个常规的推理处理。但是需要注意,这里获取的帧图像和推理速度之间会有一个时间差,就是说获取真图像的速度可能太快了,前一帧的图像可能还没处理完就已经捕获了下一帧了,这样就会漏帧检测,所以需要适当的添加一个等待的时间。


# 这里个人觉得是等待一帧处理的时间来进行推理,让推理速度追上读取速度
            time.sleep(1 / self.fps[i])  # wait time


这个等待时间与推理速度之间有什么关系,我还是不太了解。


LoadWebcam类代码
class LoadWebcam:  # for inference
    # YOLOv5 local webcam dataloader, i.e. `python detect.py --source 0`
    def __init__(self, pipe='0', img_size=640, stride=32):
        self.img_size = img_size
        self.stride = stride
        self.pipe = eval(pipe) if pipe.isnumeric() else pipe
        self.cap = cv2.VideoCapture(self.pipe)  # video capture object
        self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)  # set buffer size
    def __iter__(self):
        self.count = -1
        return self
    def __next__(self):
        self.count += 1
        if cv2.waitKey(1) == ord('q'):  # q to quit
            self.cap.release()
            cv2.destroyAllWindows()
            raise StopIteration
        # Read frame
        ret_val, img0 = self.cap.read()
        img0 = cv2.flip(img0, 1)  # flip left-right
        # Print
        assert ret_val, f'Camera Error {self.pipe}'
        img_path = 'webcam.jpg'
        print(f'webcam {self.count}: ', end='')
        # Padded resize
        img = letterbox(img0, self.img_size, stride=self.stride)[0]
        # Convert
        img = img.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
        img = np.ascontiguousarray(img)
        return img_path, img, img0, None
    def __len__(self):
        return 0


2.3 绘图部分

这里主要是使用了 Annotator 类来进行绘制边界框与label信息。


Annotator类代码

class Annotator:
    if RANK in (-1, 0):
        check_font()  # download TTF if necessary
    # YOLOv5 Annotator for train/val mosaics and jpgs and detect/hub inference annotations
    def __init__(self, im, line_width=None, font_size=None, font='Arial.ttf', pil=False, example='abc'):
        assert im.data.contiguous, 'Image not contiguous. Apply np.ascontiguousarray(im) to Annotator() input images.'
        self.pil = pil or not is_ascii(example) or is_chinese(example)
        # 默认使用opencv
        if self.pil:  # use PIL
            self.im = im if isinstance(im, Image.Image) else Image.fromarray(im)
            self.draw = ImageDraw.Draw(self.im)
            self.font = check_font(font='Arial.Unicode.ttf' if is_chinese(example) else font,
                                   size=font_size or max(round(sum(self.im.size) / 2 * 0.035), 12))
        else:  # use cv2
            self.im = im
        # 设置框宽度
        self.lw = line_width or max(round(sum(im.shape) / 2 * 0.003), 2)  # line width
    def box_label(self, box, label='', color=(128, 128, 128), txt_color=(255, 255, 255)):
        # Add one xyxy box to image with label
        if self.pil or not is_ascii(label):
            self.draw.rectangle(box, width=self.lw, outline=color)  # box
            if label:
                w, h = self.font.getsize(label)  # text width, height
                outside = box[1] - h >= 0  # label fits outside box
                self.draw.rectangle([box[0],
                                     box[1] - h if outside else box[1],
                                     box[0] + w + 1,
                                     box[1] + 1 if outside else box[1] + h + 1], fill=color)
                # self.draw.text((box[0], box[1]), label, fill=txt_color, font=self.font, anchor='ls')  # for PIL>8.0
                self.draw.text((box[0], box[1] - h if outside else box[1]), label, fill=txt_color, font=self.font)
        else:  # cv2
            # 获取边界框的两个点
            p1, p2 = (int(box[0]), int(box[1])), (int(box[2]), int(box[3]))
            # 根据两个点坐标绘制边界框
            cv2.rectangle(self.im, p1, p2, color, thickness=self.lw, lineType=cv2.LINE_AA)
            # 在边界框左上角绘制label信息
            if label:
                tf = max(self.lw - 1, 1)  # font thickness
                # 获取label的宽度高度
                w, h = cv2.getTextSize(label, 0, fontScale=self.lw / 3, thickness=tf)[0]  # text width, height
                outside = p1[1] - h - 3 >= 0  # label fits outside box
                p2 = p1[0] + w, p1[1] - h - 3 if outside else p1[1] + h + 3
                # 绘制label的背景框(填充色)
                cv2.rectangle(self.im, p1, p2, color, -1, cv2.LINE_AA)  # filled
                # 绘制label的字符串
                cv2.putText(self.im, label, (p1[0], p1[1] - 2 if outside else p1[1] + h + 2), 0, self.lw / 3, txt_color,
                            thickness=tf, lineType=cv2.LINE_AA)
    def rectangle(self, xy, fill=None, outline=None, width=1):
        # Add rectangle to image (PIL-only)
        self.draw.rectangle(xy, fill, outline, width)
    def text(self, xy, text, txt_color=(255, 255, 255)):
        # Add text to image (PIL-only)
        w, h = self.font.getsize(text)  # text width, height
        self.draw.text((xy[0], xy[1] - h + 1), text, fill=txt_color, font=self.font)
    def result(self):
        # Return annotated image as array
        return np.asarray(self.im)


主要的调用过程很简单:


# 初始化,传入原图
annotator = Annotator(im0, line_width=line_thickness, example=str(names))
# 在原图上依次绘制每个label信息与对应的预测边界框
for *xyxy, conf, cls in reversed(det):
  ...
  annotator.box_label(xyxy, label, color=colors(c, True))
  ...
# 返回画框后的图像
im0 = annotator.result()


3. Detect脚本简化


yolov5的源码对detect脚本写得非常的详细,集成了很多功能,但对我来说需求可能没有那么大,然后为了便于自己学习与查看,这里我对detect脚本进行了简化,分别为单个图像,单个视频和摄像头信息编写了一个检测脚本。


3.1 单图像推理

自己写的参考代码

# 功能:单图像推理
def run_image(image_path, save_path, img_size=640, stride=32, augment=False, visualize=False):
    weights = r'weights/yolov5s.pt'
    device = 'cpu'
    save_path += os.path.basename(image_path)
    # 导入模型
    model = attempt_load(weights, map_location=device)
    img_size = check_img_size(img_size, s=stride)
    names = model.names
    # Padded resize
    img0 = cv2.imread(image_path)
    img = letterbox(img0, img_size, stride=stride, auto=True)[0]
    # Convert
    img = img.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
    img = np.ascontiguousarray(img)
    img = torch.from_numpy(img).to(device)
    img = img.float() / 255.0   # 0 - 255 to 0.0 - 1.0
    img = img[None]     # [h w c] -> [1 h w c]
    # inference
    pred = model(img, augment=augment, visualize=visualize)[0]
    pred = non_max_suppression(pred, conf_thres=0.25, iou_thres=0.45, max_det=1000)
    # plot label
    det = pred[0]
    annotator = Annotator(img0.copy(), line_width=3, example=str(names))
    if len(det):
        det[:, :4] = scale_coords(img.shape[2:], det[:, :4], img0.shape).round()
        for *xyxy, conf, cls in reversed(det):
            c = int(cls)  # integer class
            label = f'{names[c]} {conf:.2f}'
            annotator.box_label(xyxy, label, color=colors(c, True))
    # write video
    im0 = annotator.result()
    cv2.imwrite(save_path, im0)
    print(f'Inference {image_path} finish, save to {save_path}')


推理后的图像:

image.png


3.2 单视频推理

自己写的参考代码

# 功能:单视频推理
def run_video(video_path, save_path, img_size=640, stride=32, augment=False, visualize=False):
    weights = r'weights/yolov5s.pt'
    device = 'cpu'
    # 导入模型
    model = attempt_load(weights, map_location=device)
    img_size = check_img_size(img_size, s=stride)
    names = model.names
    # 读取视频对象
    cap = cv2.VideoCapture(video_path)
    frame = 0       # 开始处理的帧数
    frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # 待处理的总帧数
    # 获取当前视频的帧率与宽高,设置同样的格式,以确保相同帧率与宽高的视频输出
    fps = cap.get(cv2.CAP_PROP_FPS)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    save_path += os.path.basename(video_path)
    vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
    while frame <= frames:
        # 读取帧图像
        ret_val, img0 = cap.read()
        if not ret_val:
            break
        frame += 1
        print(f'video {frame}/{frames} {save_path}')
        # Padded resize
        img = letterbox(img0, img_size, stride=stride, auto=True)[0]
        # Convert
        img = img.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
        img = np.ascontiguousarray(img)
        img = torch.from_numpy(img).to(device)
        img = img.float() / 255.0   # 0 - 255 to 0.0 - 1.0
        img = img[None]     # [h w c] -> [1 h w c]
        # inference
        pred = model(img, augment=augment, visualize=visualize)[0]
        pred = non_max_suppression(pred, conf_thres=0.25, iou_thres=0.45, max_det=1000)
        # plot label
        det = pred[0]
        annotator = Annotator(img0.copy(), line_width=3, example=str(names))
        if len(det):
            det[:, :4] = scale_coords(img.shape[2:], det[:, :4], img0.shape).round()
            for *xyxy, conf, cls in reversed(det):
                c = int(cls)  # integer class
                label = f'{names[c]} {conf:.2f}'
                annotator.box_label(xyxy, label, color=colors(c, True))
        # write video
        im0 = annotator.result()
        vid_writer.write(im0)
    vid_writer.release()
    cap.release()
    print(f'{video_path} finish, save to {save_path}')


最后的输出结果:

image.png


在对应的目录文件下会生成检测视频,由于是逐帧检测的,所以视频不会压缩,长度也不会改变。

image.png


推理视频的部分截图:

image.png


3.3 摄像头推理

自己写的参考代码

def run_webcam(save_path, img_size=640, stride=32, augment=False, visualize=False):
    weights = r'weights/yolov5s.pt'
    device = 'cpu'
    # 导入模型
    model = attempt_load(weights, map_location=device)
    img_size = check_img_size(img_size, s=stride)
    names = model.names
    # 读取视频对象: 0 表示打开本地摄像头
    cap = cv2.VideoCapture(0)
    frame = 0       # 开始处理的帧数
    # 获取当前视频的帧率与宽高,设置同样的格式,以确保相同帧率与宽高的视频输出
    ret_val, img0 = cap.read()
    fps, w, h = 30, img0.shape[1], img0.shape[0]
    vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
    # 按q退出循环
    while True:
        ret_val, img0 = cap.read()
        if cv2.waitKey(1) == ord('q'):
            cap.release()
            cv2.destroyAllWindows()
            break
        if not ret_val:
            break
        frame += 1
        print(f'video {frame} {save_path}')
        # Padded resize
        img = letterbox(img0, img_size, stride=stride, auto=True)[0]
        # Convert
        img = img.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
        img = np.ascontiguousarray(img)
        img = torch.from_numpy(img).to(device)
        img = img.float() / 255.0   # 0 - 255 to 0.0 - 1.0
        img = img[None]     # [h w c] -> [1 h w c]
        # inference
        pred = model(img, augment=augment, visualize=visualize)[0]
        pred = non_max_suppression(pred, conf_thres=0.25, iou_thres=0.45, max_det=1000)
        # plot label
        det = pred[0]
        annotator = Annotator(img0.copy(), line_width=3, example=str(names))
        if len(det):
            det[:, :4] = scale_coords(img.shape[2:], det[:, :4], img0.shape).round()
            for *xyxy, conf, cls in reversed(det):
                c = int(cls)  # integer class
                label = f'{names[c]} {conf:.2f}'
                annotator.box_label(xyxy, label, color=colors(c, True))
        # write video
        im0 = annotator.result()
        cv2.imshow('webcam:0', im0)
        cv2.waitKey(1)
        vid_writer.write(im0)
    # 按q退出循环
    vid_writer.release()
    cap.release()
    print(f'Webcam finish, save to {save_path}')


按q退出摄像头的推理,然后推理完一帧再捕获下一帧进行推理,所以在最后的保存视频中推理的速度看起来会有点加速的感觉。正常按q退出视频可以正常打开视频,但是如果直接中断程序,视频是无法打开的。


这里就不展示摄像头推理了,在推理的过程中会实时显示,同时写入文件夹中。正常退出显示:

image.png


3.4 测试代码

参考测试代码

class Test:
    def test_image(self):
        test_path = r"data/images/bus.jpg"
        save_path = r"runs/detect/"
        run_image(test_path, save_path)
    def test_video(self):
        test_path = r"data/videos/demo.mp4"
        save_path = r"runs/detect/"
        run_video(test_path, save_path)
    def test_webcam(self):
        save_path = r"runs/detect/webcam.mp4"
        run_webcam(save_path)
if __name__ == '__main__':
    test = Test()
    test.test_webcam()


参考资料:


1. yolov5的使用 | 训练Pascal voc格式的数据集


2. 【YOLOV5-5.x 源码解读】detect.py


3. 【YOLOV5-5.x 源码解读】datasets.py


目录
相关文章
|
1月前
|
存储 缓存 并行计算
yolov5的train.py的参数信息解析
这篇文章解析了YOLOv5的`train.py`脚本中的参数信息,详细介绍了每个参数的功能和默认值,包括权重路径、模型配置、数据源、超参数、训练轮数、批量大小、图像尺寸、训练选项、设备选择、优化器设置等,以便用户可以根据需要自定义训练过程。
31 0
|
2月前
|
设计模式 存储 人工智能
深度解析Unity游戏开发:从零构建可扩展与可维护的游戏架构,让你的游戏项目在模块化设计、脚本对象运用及状态模式处理中焕发新生,实现高效迭代与团队协作的完美平衡之路
【9月更文挑战第1天】游戏开发中的架构设计是项目成功的关键。良好的架构能提升开发效率并确保项目的长期可维护性和可扩展性。在使用Unity引擎时,合理的架构尤为重要。本文探讨了如何在Unity中实现可扩展且易维护的游戏架构,包括模块化设计、使用脚本对象管理数据、应用设计模式(如状态模式)及采用MVC/MVVM架构模式。通过这些方法,可以显著提高开发效率和游戏质量。例如,模块化设计将游戏拆分为独立模块。
185 3
|
3月前
|
XML Web App开发 数据挖掘
Postman接口测试工具全解析:功能、脚本编写及优缺点探讨
文章详细分析了Postman接口测试工具的功能、脚本编写、使用场景以及优缺点,强调了其在接口自动化测试中的强大能力,同时指出了其在性能分析方面的不足,并建议根据项目需求和个人偏好选择合适的接口测试工具。
118 1
|
3月前
|
Java Shell Linux
【Linux入门技巧】新员工必看:用Shell脚本轻松解析应用服务日志
关于如何使用Shell脚本来解析Linux系统中的应用服务日志,提供了脚本实现的详细步骤和技巧,以及一些Shell编程的技能扩展。
55 0
【Linux入门技巧】新员工必看:用Shell脚本轻松解析应用服务日志
|
5月前
|
缓存 监控 安全
深入解析Elasticsearch中脚本原理
深入解析Elasticsearch中脚本原理
|
6月前
|
机器学习/深度学习 编解码 算法
算法工程师面试问题总结 | YOLOv5面试考点原理全解析
本文给大家带来的百面算法工程师是深度学习目标检测YOLOv5面试总结,文章内总结了常见的提问问题,旨在为广大学子模拟出更贴合实际的面试问答场景。在这篇文章中,我们还将介绍一些常见的深度学习目标检测面试问题,并提供参考的回答及其理论基础,以帮助求职者更好地准备面试。通过对这些问题的理解和回答,求职者可以展现出自己的深度学习目标检测领域的专业知识、解决问题的能力以及对实际应用场景的理解。同时,这也是为了帮助求职者更好地应对深度学习目标检测岗位的面试挑战,提升面试的成功率和竞争力。
|
6月前
|
机器学习/深度学习 算法 Go
YOLOv5网络结构解析
YOLOv5网络结构解析
|
6天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
21 2
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
67 0

推荐镜像

更多