diff --git a/123.pkl b/123.pkl new file mode 100644 index 0000000..0230b08 Binary files /dev/null and b/123.pkl differ diff --git a/app/configs/global_var.py b/app/configs/global_var.py index d562738..7264ca2 100644 --- a/app/configs/global_var.py +++ b/app/configs/global_var.py @@ -16,20 +16,20 @@ def _init(): # 初始化 global _global_dict _global_dict = {} - # ws列表存储 - global _active_connections - _active_connections = multiprocessing.Manager().list() +# # ws列表存储 +# global _active_connections +# _active_connections = multiprocessing.Manager().list() - # ws字典存储 - global _active_connections_dist - _active_connections_dist = multiprocessing.Manager().dict() +# # ws字典存储 +# global _active_connections_dist +# _active_connections_dist = multiprocessing.Manager().dict() -def get_active_connections(): - return _active_connections +# def get_active_connections(): +# return _active_connections -def get_active_connections_dist(): - return _active_connections_dist +# def get_active_connections_dist(): +# return _active_connections_dist def set_value(key, value): # 定义一个全局变量 diff --git a/app/utils/websocket_tool.py b/app/utils/websocket_tool.py index 18c5f02..98e3039 100644 --- a/app/utils/websocket_tool.py +++ b/app/utils/websocket_tool.py @@ -9,48 +9,40 @@ """ import json from typing import Union, List, Dict +import os from app.core.common_utils import logger from app.utils.JSONEncodeTools import MyEncoder from configs.global_var import * from app.utils.redis_config import redis_client from multiprocessing import Process, Queue +import pickle class WebsocketUtil: def __init__(self): # self.active_connections = multiprocessing.Manager().list() - active_connections_dist = {} - self.queue = Queue(maxsize=0) - self.queue.put(active_connections_dist) - print("--------------------队列长度:", self.queue.qsize()) + self.active_connections_dist = {} + def connect(self, ws, id: str): # 等待连接 msg = ws.receive() # 存储ws连接对象 - print("--------------------队列长度:", self.queue.qsize()) - active_connections_dist = self.queue.get() - if id in active_connections_dist: - active_connections_dist[id].append(ws) + if os.path.exists(f"{id}.pkl"): + ws_list = read(id=id) + ws_list.append(ws) else: ws_list = [ws, ] - active_connections_dist[id] = ws_list - self.queue.put(active_connections_dist) - print("--------------------队列长度:", self.queue.qsize()) + write(id=id, ws=list) + def disconnect(self, ws, id): - # ws关闭时 移除ws对象 - print("--------------------队列长度:", self.queue.qsize()) - if ws.closed: - active_connections_dist = self.queue.get() - if active_connections_dist is not None: - if ws in active_connections_dist.values(): - active_connections_dist[id].pop(ws) - self.queue.put(active_connections_dist) - print("--------------------队列长度:", self.queue.qsize()) + # 删除连接 + if os.path.exists(f"{id}.pkl"): + os.remove(f"{id}.pkl") @staticmethod async def send_personal_message(message: str, ws): @@ -68,18 +60,25 @@ class WebsocketUtil: def send_message_proj_json(self, message: Union[str, int, List, Dict], id: str): # 广播该项目的消息 - # global active_connections_dist - print(self.queue.qsize()) - active_connections_dist = self.queue.get() - self.queue.put(active_connections_dist) - print(active_connections_dist) + active_connections_dist = read(id="{id}.pkl") for connection in active_connections_dist[id]: try: connection.send(json.dumps(message, cls=MyEncoder, indent=4), ) except Exception as e: logger.error("websocket异常断开,{}", e) - self.disconnect(ws=connection, id=id) - print("--------------------队列长度:", self.queue.qsize()) -manager = WebsocketUtil() \ No newline at end of file +manager = WebsocketUtil() + + +def write(id: str, ws: List): + with open(f"{id}.pkl", "wb") as f: + wsdump = pickle.dump(ws) + f.write(wsdump) + + +def read(id: str): + with open(f"{id}.pkl", "rb") as f: + wss = pickle.load(f) + print(wss) + return wss \ No newline at end of file