Files
chache_rtsp/main.py
2025-06-26 16:48:49 +08:00

96 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import requests
import threading
import subprocess
import numpy as np
from pathlib import Path
from collections import Counter
from websocket import WebSocketApp
from configparser import ConfigParser
from algo import YoloModel
FILE = Path(__file__).resolve()
ROOT = FILE.parents[0]
model = YoloModel(ROOT / 'chache/best.pt')
def run_client(client: WebSocketApp):
client.run_forever()
def get_config():
# 获取websocket的配置连接
config_path = ROOT / "config.ini"
config = ConfigParser()
config.read(config_path)
rtsp_url = config.get('config', 'urls')
wss = config.get('config', 'wss')
api_url = config.get('config', 'api')
return wss.split(','), rtsp_url.split(','), api_url
def process_camera(rtsp_url, ws_url, api_url=None):
websocket = WebSocketApp(url=ws_url, on_open=on_open)
threading.Thread(target=run_client, args=(websocket,), daemon=True).start()
width, height = 1920, 1080
command = [
'ffmpeg',
'-i', rtsp_url,
'-an', # 不处理音频
'-f', 'image2pipe', # 输出为图像流
'-pix_fmt', 'bgr24', # OpenCV 默认是 BGR
'-vf', f'fps=1,scale={width}:{height}', # 每秒 1 帧 + 缩放
'-vcodec', 'rawvideo',
'-'
]
# 启动 FFmpeg 子进程
pipe = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=10 ** 8)
while True:
# 读取一帧原始数据(大小 = width * height * 30
raw_frame = pipe.stdout.read(width * height * 3)
if not raw_frame:
break
# 转换为 NumPy 数组并 reshape 成图像
frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3))
# # 使用 YOLO 进行推理
results = model.predict(frame)
img = results[0].plot()
ret, jpeg = cv2.imencode('.jpg', img)
if ret:
frame_data = jpeg.tobytes()
websocket.send_bytes(frame_data)
if api_url is not None:
# 获取类别索引
classes = results[0].boxes.cls.cpu().numpy().astype(int)
# 统计每个类别的数量
class_counts = Counter(classes)
forklift_count = class_counts.get(0, 0)
person_count = class_counts.get(1, 0)
try:
data = {
"result": f"0, {person_count}, {forklift_count}"
}
response = requests.post(api_url, json=data)
if response.status_code == 200:
print("接口调用success")
else:
print("接口error")
except requests.exceptions.RequestException as exc:
print("接口except")
def on_open(ws):
print(ws.url, '已连接')
if __name__ == '__main__':
websocket_urls, rtsp_urls, api_url = get_config()
for i in range(len(websocket_urls)):
if i == 0:
thread = threading.Thread(target=process_camera, args=(rtsp_urls[i], websocket_urls[i], api_url))
else:
thread = threading.Thread(target=process_camera, args=(rtsp_urls[i], websocket_urls[i], None))
thread.start()