import time import torch from app.util.yolov5.models.common import DetectMultiBackend from app.util.yolov5.utils.torch_utils import select_device from app.util.yolov5.utils.dataloaders import LoadStreams from app.util.yolov5.utils.general import check_img_size, non_max_suppression, cv2, scale_coords, xyxy2xywh from app.websocket.web_socket_server import room_manager from app.common.redis_cli import redis_conn from deep_sort.deep_sort import DeepSort palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1) # 初始化 DeepSORT 跟踪器 deepsort = DeepSort( model_path="deep_sort/deep/checkpoint/ckpt.t7", # ReID 模型路径 max_dist=0.2, # 外观特征匹配阈值(越小越严格) max_iou_distance=0.7, # 最大IoU距离阈值 max_age=70, # 目标最大存活帧数(未匹配时保留的帧数) n_init=3 # 初始确认帧数(连续匹配到n_init次后确认跟踪) ) async def run_deepsort_rtsp(weights_pt: str, rtsp_url: str, data: str, detect_id: int, idx_to_class: dict): """ rtsp 视频流推理 :param detect_id: 训练集的id :param weights_pt: 权重文件 :param rtsp_url: 视频流地址 :param data: yaml文件 :param idx_to_class: yaml文件 :return: """ room = 'deepsort_rtsp_' + str(detect_id) # 选择设备(CPU 或 GPU) device = select_device('cpu') is_gpu = redis_conn.get('is_gpu') # 判断是否存在cuda版本 if is_gpu == 'True': device = select_device('cuda:0') # 加载模型 model = DetectMultiBackend(weights_pt, device=device, dnn=False, data=data, fp16=False) stride, names, pt = model.stride, model.names, model.pt imgsz = check_img_size((640, 640), s=stride) # check image size dataset = LoadStreams(rtsp_url, img_size=imgsz, stride=stride, auto=pt, vid_stride=1) bs = len(dataset) model.warmup(imgsz=(1 if pt or model.triton else bs, 3, *imgsz)) time.sleep(3) # 等待3s,等待websocket进入 for path, im, im0s, vid_cap, s in dataset: if room_manager.rooms.get(room): im = torch.from_numpy(im).to(model.device) im = im.half() if model.fp16 else im.float() # uint8 to fp16/32 im /= 255 # 0 - 255 to 0.0 - 1.0 if len(im.shape) == 3: im = im[None] # expand for batch dim if model.xml and im.shape[0] > 1: ims = torch.chunk(im, im.shape[0], 0) # Inference if model.xml and im.shape[0] > 1: pred = None for image in ims: if pred is None: pred = model(image, augment=False, visualize=False).unsqueeze(0) else: pred = torch.cat((pred, model(image, augment=False, visualize=False).unsqueeze(0)), dim=0) pred = [pred, None] else: pred = model(im, augment=False, visualize=False) # NMS pred = non_max_suppression(pred, 0.45, 0.45, None, False, max_det=1000) image = im0s[0] pred[:, :4] = scale_coords(im.shape[2:], pred[:, :4], image.shape).round() # 使用YOLOv5进行检测后得到的pred bbox_xywh, cls_conf, cls_ids = yolo_to_deepsort_format(pred) # select person class mask = cls_ids == 0 bbox_xywh = bbox_xywh[mask] # bbox dilation just in case bbox too small, delete this line if using a better pedestrian detector bbox_xywh[:, 2:] *= 1.2 cls_conf = cls_conf[mask] cls_ids = cls_ids[mask] # 调用Deep SORT更新方法 outputs, _ = deepsort.update(bbox_xywh, cls_conf, cls_ids, image) # draw boxes for visualization if len(outputs) > 0: bbox_xyxy = outputs[:, :4] identities = outputs[:, -1] cls = outputs[:, -2] names = [idx_to_class[str(label)] for label in cls] image = draw_boxes(image, bbox_xyxy, names, identities) # 将帧编码为 JPEG ret, jpeg = cv2.imencode('.jpg', image) if ret: frame_data = jpeg.tobytes() await room_manager.send_stream_to_room(room, frame_data) else: print(room, '结束跟踪') break def draw_boxes(img, bbox, names=None, identities=None, offset=(0, 0)): for i, box in enumerate(bbox): x1, y1, x2, y2 = [int(i) for i in box] x1 += offset[0] x2 += offset[0] y1 += offset[1] y2 += offset[1] # box text and bar id = int(identities[i]) if identities is not None else 0 color = compute_color_for_labels(id) label = '{:}{:d}'.format(names[i], id) t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2, 2)[0] cv2.rectangle(img, (x1, y1), (x2, y2), color, 3) cv2.rectangle(img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1) cv2.putText(img, label, (x1, y1 + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 2, [255, 255, 255], 2) return img def compute_color_for_labels(label): """ Simple function that adds fixed color depending on the class """ color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette] return tuple(color) def yolo_to_deepsort_format(pred): """ 将YOLOv5的预测结果转换为Deep SORT所需的格式 :param pred: YOLOv5的预测结果 :return: xywh, conf, cls """ pred[:, :4] = xyxy2xywh(pred[:, :4]) xywh = pred[:, :4].cpu().numpy() conf = pred[:, 4].cpu().numpy() cls = pred[:, 5].cpu().numpy() return xywh, conf, cls