import cv2 import requests import threading import subprocess import numpy as np from pathlib import Path from collections import Counter from websocket import WebSocketApp from configparser import ConfigParser from algo import YoloModel FILE = Path(__file__).resolve() ROOT = FILE.parents[0] model = YoloModel(ROOT / 'chache/best.pt') def run_client(client: WebSocketApp): client.run_forever() def get_config(): # 获取websocket的配置连接 config_path = ROOT / "config.ini" config = ConfigParser() config.read(config_path) rtsp_url = config.get('config', 'urls') wss = config.get('config', 'wss') api_url = config.get('config', 'api') return wss.split(','), rtsp_url.split(','), api_url def process_camera(rtsp_url, ws_url, api_url=None): websocket = WebSocketApp(url=ws_url, on_open=on_open) threading.Thread(target=run_client, args=(websocket,), daemon=True).start() width, height = 1920, 1080 command = [ 'ffmpeg', '-i', rtsp_url, '-an', # 不处理音频 '-f', 'image2pipe', # 输出为图像流 '-pix_fmt', 'bgr24', # OpenCV 默认是 BGR '-vf', f'fps=1,scale={width}:{height}', # 每秒 1 帧 + 缩放 '-vcodec', 'rawvideo', '-' ] # 启动 FFmpeg 子进程 pipe = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=10 ** 8) while True: # 读取一帧原始数据(大小 = width * height * 3)0 raw_frame = pipe.stdout.read(width * height * 3) if not raw_frame: break # 转换为 NumPy 数组并 reshape 成图像 frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3)) # # 使用 YOLO 进行推理 results = model.predict(frame) img = results[0].plot() ret, jpeg = cv2.imencode('.jpg', img) if ret: frame_data = jpeg.tobytes() websocket.send_bytes(frame_data) if api_url is not None: # 获取类别索引 classes = results[0].boxes.cls.cpu().numpy().astype(int) # 统计每个类别的数量 class_counts = Counter(classes) forklift_count = class_counts.get(0, 0) person_count = class_counts.get(1, 0) try: data = { "result": f"0, {person_count}, {forklift_count}" } response = requests.post(api_url, json=data) if response.status_code == 200: print("接口调用success") else: print("接口error") except requests.exceptions.RequestException as exc: print("接口except") def on_open(ws): print(ws.url, '已连接') if __name__ == '__main__': websocket_urls, rtsp_urls, api_url = get_config() for i in range(len(websocket_urls)): if i == 0: thread = threading.Thread(target=process_camera, args=(rtsp_urls[i], websocket_urls[i], api_url)) else: thread = threading.Thread(target=process_camera, args=(rtsp_urls[i], websocket_urls[i], None)) thread.start()