diff --git a/app/controller/AlgorithmController.py b/app/controller/AlgorithmController.py index dcfa92a..ef2a4a6 100644 --- a/app/controller/AlgorithmController.py +++ b/app/controller/AlgorithmController.py @@ -47,8 +47,8 @@ def start_train_algorithm(): def wrapped_function(): param = request.args.get('param') id = request.args.get('id') - # t = Thread(target=func, args=(param, id)) - t = Process(target=func, args=(param, id), name=id) + t = Thread(target=func, args=(param, id)) + #t = Process(target=func, args=(param, id), name=id) set_value(key=id, value=False) t.start() return output_wrapped(0, 'success', '成功') diff --git a/app/utils/websocket_tool.py b/app/utils/websocket_tool.py index c6cfc72..e69defd 100644 --- a/app/utils/websocket_tool.py +++ b/app/utils/websocket_tool.py @@ -9,101 +9,52 @@ """ import json from typing import Union, List, Dict -import os from app.core.common_utils import logger from app.utils.JSONEncodeTools import MyEncoder -from configs.global_var import * -from app.utils.redis_config import redis_client -from multiprocessing import Process, Queue -import pickle class WebsocketUtil: - def __init__(self): - # self.active_connections = multiprocessing.Manager().list() - self.active_connections_dist = [] - + self.active_connections: List = [] + self.active_connections_dist: Dict = {} def connect(self, ws, id: str): # 等待连接 msg = ws.receive() # 存储ws连接对象 - if os.path.exists(f"{id}.pkl"): - wsFile = read(id=id) - ws_list = wsFile.ws - ws_list.append(ws) - data = WsFile(id=id, ws=ws_list) - write(id=id, ws=data) + self.active_connections.append(ws) + if id in self.active_connections_dist: + self.active_connections_dist[id].append(ws) else: ws_list = [ws, ] - data = WsFile(id=id, ws=ws_list) - write(id=id, ws=data) - print("--;;-----------", ws_list) - - + self.active_connections_dist[id] = ws_list def disconnect(self, ws, id): - # 删除连接 - if os.path.exists(f"{id}.pkl"): - os.remove(f"{id}.pkl") + # ws关闭时 移除ws对象 + if ws.closed: + if ws in self.active_connections_dist.values(): + self.active_connections.remove(ws) + self.active_connections_dist[id].pop(ws) @staticmethod async def send_personal_message(message: str, ws): # 发送个人消息 await ws.send(message) - # def broadcast(self, message: str): - # # 广播消息 - # # global active_connections - # active_connections = redis_client.get_redis().get("active_connections") - # if active_connections is not None: - # active_connections = json.loads(active_connections) - # for connection in active_connections: - # connection.send(message) + def broadcast(self, message: str): + # 广播消息 + for connection in self.active_connections: + connection.send(message) def send_message_proj_json(self, message: Union[str, int, List, Dict], id: str): # 广播该项目的消息 - active_connections = read(id=id) - print("===================", type(active_connections.id), active_connections.id) - print("===================", type(active_connections.getWs()), active_connections.getWs()[0]) - for connection in active_connections.getWs(): + for connection in self.active_connections_dist[id]: try: connection.send(json.dumps(message, cls=MyEncoder, indent=4), ) except Exception as e: logger.error("websocket异常断开,{}", e) + self.disconnect(ws=connection, id=id) -manager = WebsocketUtil() - - -def write(id: str, ws: List): - print(f"序列化对象{ws}") - with open(f"{id}.pkl", "wb") as f: - pickle.dump(ws, f) - - -def read(id: str): - - with open(f"{id}.pkl", "rb") as f: - wss = pickle.load(f) - print(wss) - print(f"反序列化对象{wss}") - return wss - - -class WsFile: - def __init__(self, id: str, ws: List) -> None: - self.id = id - self.ws = ws - - def __getstate__(self): - pickled = {"id": self.id} - return pickled - - def getWs(self): - return self.ws - - def __setstate(self, pickled_dict): - self.__init__(pickled_dict['id']) +manager = WebsocketUtil() \ No newline at end of file