RODY/app/utils/websocket_tool.py
2022-11-25 17:39:24 +08:00

61 lines
1.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
@Time 2022/10/12 17:55
@Auth
@File websocket_tool.py
@IDE PyCharm
@MottoABC(Always Be Coding)
@Desc
"""
import json
from typing import Union, List, Dict
from app.core.common_utils import logger
from app.utils.JSONEncodeTools import MyEncoder
class WebsocketUtil:
def __init__(self):
self.active_connections: List = []
self.active_connections_dist: Dict = {}
def connect(self, ws, id: str):
# 等待连接
msg = ws.receive()
# 存储ws连接对象
print("存储ws连接对象")
self.active_connections.append(ws)
if id in self.active_connections_dist:
self.active_connections_dist[id].append(ws)
else:
ws_list = [ws, ]
self.active_connections_dist[id] = ws_list
def disconnect(self, ws, id):
# ws关闭时 移除ws对象
if ws.closed:
if ws in self.active_connections_dist.values():
self.active_connections.remove(ws)
self.active_connections_dist[id].pop(ws)
@staticmethod
async def send_personal_message(message: str, ws):
# 发送个人消息
await ws.send(message)
def broadcast(self, message: str):
# 广播消息
for connection in self.active_connections:
connection.send(message)
def send_message_proj_json(self, message: Union[str, int, List, Dict], id: str):
# 广播该项目的消息
for connection in self.active_connections_dist[id]:
try:
connection.send(json.dumps(message, cls=MyEncoder, indent=4), )
except Exception as e:
logger.error("websocket异常断开{}", e)
self.disconnect(ws=connection, id=id)
manager = WebsocketUtil()