This commit is contained in:
ZZX9599
2025-09-03 16:27:53 +08:00
parent d83923d06b
commit 1911cd6588

256
ws/ws.py
View File

@ -1,223 +1,157 @@
from fastapi import WebSocket, APIRouter, WebSocketDisconnect, FastAPI
from typing import Dict, Any, Optional
import datetime
import asyncio import asyncio
import datetime
import json import json
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Dict, Optional
# 创建WebSocket路由 import numpy as np
from fastapi import WebSocket, APIRouter, WebSocketDisconnect, FastAPI
# -------------------------- 配置常量(简化硬编码) --------------------------
HEARTBEAT_INTERVAL = 30 # 心跳检查间隔(秒)
HEARTBEAT_TIMEOUT = 60 # 客户端超时阈值(秒)
WS_ENDPOINT = "/ws" # WebSocket端点路径
# -------------------------- 核心数据结构与全局变量 --------------------------
ws_router = APIRouter() ws_router = APIRouter()
# 客户端连接信息数据结构 # 客户端连接封装(仅保留核心属性和方法)
class ClientConnection: class ClientConnection:
def __init__(self, websocket: WebSocket, client_ip: str): def __init__(self, websocket: WebSocket, client_ip: str):
self.websocket = websocket self.websocket = websocket
self.client_ip = client_ip self.client_ip = client_ip
self.last_heartbeat = datetime.datetime.now() # 初始心跳时间为连接时间
def update_heartbeat(self):
"""更新心跳时间为当前时间"""
self.last_heartbeat = datetime.datetime.now() self.last_heartbeat = datetime.datetime.now()
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {self.client_ip} 心跳时间已更新")
def is_alive(self, timeout_seconds: int = 60) -> bool: # 更新心跳时间
"""检查客户端是否活跃心跳超时阈值60秒""" def update_heartbeat(self):
self.last_heartbeat = datetime.datetime.now()
# 检查是否存活超时返回False
def is_alive(self) -> bool:
timeout = (datetime.datetime.now() - self.last_heartbeat).total_seconds() timeout = (datetime.datetime.now() - self.last_heartbeat).total_seconds()
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {self.client_ip} 心跳检查:" return timeout < HEARTBEAT_TIMEOUT
f"上次心跳距今 {timeout:.1f} 秒(阈值:{timeout_seconds}秒)")
return timeout < timeout_seconds
# 存储所有已连接的客户端key客户端IP、valueClientConnection对象 # 全局连接管理IP -> 连接实例
connected_clients: Dict[str, ClientConnection] = {} connected_clients: Dict[str, ClientConnection] = {}
# 心跳任务(全局引用,用于关闭时清理)
# 心跳检查任务引用(全局变量、用于应用关闭时取消任务)
heartbeat_task: Optional[asyncio.Task] = None heartbeat_task: Optional[asyncio.Task] = None
# -------------------------- 心跳检查逻辑(精简日志) --------------------------
async def heartbeat_checker(): async def heartbeat_checker():
"""定期检查客户端心跳每30秒一次、超时直接剔除不发通知"""
while True: while True:
current_time = datetime.datetime.now() now = datetime.datetime.now()
print(f"\n[{current_time:%Y-%m-%d %H:%M:%S}] === 开始新一轮心跳检查 ===") # 1. 筛选超时客户端(避免遍历中修改字典)
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 当前在线客户端总数:{len(connected_clients)}") timeout_ips = [ip for ip, conn in connected_clients.items() if not conn.is_alive()]
# 1. 收集超时客户端IP避免遍历中修改字典 # 2. 处理超时连接(关闭+移除
timeout_clients = [] if timeout_ips:
for client_ip, connection in connected_clients.items(): print(f"[{now:%H:%M:%S}] 心跳检查:{len(timeout_ips)}个客户端超时({timeout_ips}")
if not connection.is_alive(): for ip in timeout_ips:
timeout_clients.append(client_ip)
# 2. 处理超时客户端(关闭连接+移除记录)
if timeout_clients:
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 发现超时客户端:{timeout_clients}(共{len(timeout_clients)}个)")
for client_ip in timeout_clients:
try: try:
connection = connected_clients[client_ip] await connected_clients[ip].websocket.close(code=1008, reason="心跳超时")
await connection.websocket.close(code=1008, reason="心跳超时(>60秒")
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已关闭(超时)")
except Exception as e:
print(
f"[{current_time:%Y-%m-%d %H:%M:%S}] 关闭客户端 {client_ip} 失败:{str(e)}(错误类型:{type(e).__name__}")
finally: finally:
if client_ip in connected_clients: connected_clients.pop(ip, None)
del connected_clients[client_ip]
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已从连接列表移除")
else: else:
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 无超时客户端、心跳检查完成") print(f"[{now:%H:%M:%S}] 心跳检查:{len(connected_clients)}个客户端在线,无超时")
# 3. 等待30秒后进行下一轮检查 # 3. 等待下一轮检查
await asyncio.sleep(30) await asyncio.sleep(HEARTBEAT_INTERVAL)
# -------------------------- 应用生命周期(简化异常处理) --------------------------
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""应用生命周期管理:启动时创建心跳任务、关闭时取消任务"""
global heartbeat_task global heartbeat_task
# 启动心跳任务
heartbeat_task = asyncio.create_task(heartbeat_checker()) heartbeat_task = asyncio.create_task(heartbeat_checker())
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务启动(任务ID{id(heartbeat_task)}") print(f"[{datetime.datetime.now():%H:%M:%S}] 心跳任务启动ID{id(heartbeat_task)}")
yield # 应用运行中 yield
# 关闭时取消心跳任务
if heartbeat_task and not heartbeat_task.done(): if heartbeat_task and not heartbeat_task.done():
heartbeat_task.cancel() heartbeat_task.cancel()
try: try:
await heartbeat_task await heartbeat_task
print(f"[{datetime.datetime.now():%H:%M:%S}] 心跳任务已取消")
except asyncio.CancelledError: except asyncio.CancelledError:
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务已正常取消") pass
except Exception as e:
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 取消心跳任务时出错:{str(e)}")
async def send_heartbeat_ack(client_ip: str, client_timestamp: Any) -> bool: # -------------------------- 消息处理(合并冗余逻辑) --------------------------
"""向客户端回复心跳确认(严格遵循 {"timestamp":xxxxx, "type":"heartbeat"} 格式)""" async def send_heartbeat_ack(client_ip: str):
"""回复心跳确认"""
if client_ip not in connected_clients: if client_ip not in connected_clients:
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复心跳失败:客户端 {client_ip} 不在连接列表中")
return False return False
# 服务端当前格式化时间戳(字符串类型、与日志时间格式匹配)
server_latest_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ack_msg = {
"timestamp": server_latest_timestamp,
"type": "heartbeat"
}
try: try:
connection = connected_clients[client_ip] ack = {
await connection.websocket.send_json(ack_msg) "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
print( "type": "heartbeat"
f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 已向客户端 {client_ip} 回复心跳:{json.dumps(ack_msg, ensure_ascii=False)}") }
await connected_clients[client_ip].websocket.send_json(ack)
return True return True
except Exception as e: except Exception:
print( connected_clients.pop(client_ip, None)
f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复客户端 {client_ip} 心跳失败:{str(e)}(错误类型:{type(e).__name__}")
if client_ip in connected_clients:
del connected_clients[client_ip]
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 因心跳回复失败被移除")
return False return False
# 新增:向指定客户端发送消息的方法 async def handle_text_msg(client_ip: str, text: str, conn: ClientConnection):
async def send_message_to_client(client_ip: str, message: Dict[str, Any]) -> bool: """处理文本消息(核心:心跳+JSON解析"""
"""
向指定客户端IP发送JSON消息需确保客户端已在连接列表中
:param client_ip: 目标客户端IP必须与`connected_clients`中的key完全一致"192.168.1.100"
:param message: 待发送的JSON消息需为Dict类型FastAPI会自动序列化为JSON字符串
:return: 发送成功返回True失败客户端不存在/发送异常返回False
"""
current_time = datetime.datetime.now()
# 1. 校验客户端是否在线
if client_ip not in connected_clients:
print(
f"[{current_time:%Y-%m-%d %H:%M:%S}] 发送消息失败:客户端 {client_ip} 不在连接列表中(当前在线数:{len(connected_clients)}")
return False
try: try:
# 2. 获取客户端连接实例并发送消息 msg = json.loads(text)
client_conn = connected_clients[client_ip] # 仅处理心跳类型消息
await client_conn.websocket.send_json(message) if msg.get("type") == "heartbeat":
conn.update_heartbeat()
# 3. 打印发送成功日志(格式化消息便于查看) await send_heartbeat_ack(client_ip)
formatted_msg = json.dumps(message, ensure_ascii=False, indent=2) else:
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 已向客户端 {client_ip} 发送消息:\n{formatted_msg}") print(f"[{datetime.datetime.now():%H:%M:%S}] 客户端{client_ip}:收到消息:{msg}")
return True except json.JSONDecodeError:
print(f"[{datetime.datetime.now():%H:%M:%S}] 客户端{client_ip}无效JSON消息")
except Exception as e:
# 4. 处理发送异常(如网络中断、客户端已离线但未被清理)
error_msg = f"[{current_time:%Y-%m-%d %H:%M:%S}] 向客户端 {client_ip} 发送消息失败:{str(e)}(错误类型:{type(e).__name__}"
print(error_msg)
# 5. 清理无效连接(避免僵尸连接残留)
if client_ip in connected_clients:
del connected_clients[client_ip]
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 因发送异常已从连接列表移除")
return False
@ws_router.websocket("/ws") async def handle_binary_msg(client_ip: str, data: bytes):
"""处理二进制消息(保留扩展入口)"""
print(f"[{datetime.datetime.now():%H:%M:%S}] 客户端{client_ip}:收到{len(data)}字节二进制数据")
# 将二进制转化为 numpy 数组
data_ndarray = np.frombuffer(data, dtype=np.uint8)
# 进行检测
# -------------------------- WebSocket核心端点 --------------------------
@ws_router.websocket(WS_ENDPOINT)
async def websocket_endpoint(websocket: WebSocket): async def websocket_endpoint(websocket: WebSocket):
"""WebSocket核心端点处理连接建立/消息接收/连接关闭""" # 接受连接 + 获取客户端IP
current_time = datetime.datetime.now()
# 1. 接受客户端连接请求
await websocket.accept() await websocket.accept()
# 获取客户端IP作为唯一标识 client_ip = websocket.client.host if websocket.client else "unknown"
client_ip = websocket.client.host if websocket.client else "unknown_ip" now = datetime.datetime.now()
print(f"\n[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 连接请求已接受WebSocket握手成功") print(f"[{now:%H:%M:%S}] 客户端{client_ip}连接成功")
try: try:
# 2. 处理"同一IP重复连接"场景:关闭旧连接、保留新连接 # 处理重复连接(关闭旧连接)
if client_ip in connected_clients: if client_ip in connected_clients:
old_connection = connected_clients[client_ip] await connected_clients[client_ip].websocket.close(code=1008, reason="同一IP新连接")
await old_connection.websocket.close(code=1008, reason="同一IP新连接已建立") connected_clients.pop(client_ip)
del connected_clients[client_ip] print(f"[{now:%H:%M:%S}] 客户端{client_ip}:关闭旧连接")
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 已关闭客户端 {client_ip} 的旧连接(新连接已建立)")
# 3. 注册新客户端到连接列表 # 注册新连接
new_connection = ClientConnection(websocket, client_ip) new_conn = ClientConnection(websocket, client_ip)
connected_clients[client_ip] = new_connection connected_clients[client_ip] = new_conn
print( print(f"[{now:%H:%M:%S}] 客户端{client_ip}:注册成功,当前在线{len(connected_clients)}")
f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已注册到连接列表、当前在线数:{len(connected_clients)}")
# 4. 循环接收客户端消息(持续监听) # 循环接收消息
while True: while True:
raw_data = await websocket.receive_text() data = await websocket.receive()
recv_time = datetime.datetime.now() if "text" in data:
print(f"\n[{recv_time:%Y-%m-%d %H:%M:%S}] 收到客户端 {client_ip} 的消息:{raw_data}") await handle_text_msg(client_ip, data["text"], new_conn)
elif "bytes" in data:
# 尝试解析JSON消息 await handle_binary_msg(client_ip, data["bytes"])
try:
message = json.loads(raw_data)
print(
f"[{recv_time:%Y-%m-%d %H:%M:%S}] 消息解析成功:{json.dumps(message, ensure_ascii=False, indent=2)}")
# 5. 区分消息类型:仅处理心跳、其他消息不回复
if message.get("type") == "heartbeat":
client_timestamp = message.get("timestamp")
if client_timestamp is None:
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 警告:客户端 {client_ip} 发送的心跳缺少'timestamp'字段")
continue
new_connection.update_heartbeat()
await send_heartbeat_ack(client_ip, client_timestamp)
else:
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 非心跳消息(类型:{message.get('type')})、不回复")
except json.JSONDecodeError as e:
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 消息格式错误无效JSON错误{str(e)}")
except Exception as e:
print(
f"[{recv_time:%Y-%m-%d %H:%M:%S}] 处理客户端 {client_ip} 消息时出错:{str(e)}(错误类型:{type(e).__name__}")
# 异常处理(断开/错误)
except WebSocketDisconnect as e: except WebSocketDisconnect as e:
print( print(f"[{datetime.datetime.now():%H:%M:%S}] 客户端{client_ip}:主动断开(代码:{e.code}")
f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 主动断开连接(代码:{e.code}、原因:{e.reason}")
except Exception as e: except Exception as e:
print( print(f"[{datetime.datetime.now():%H:%M:%S}] 客户端{client_ip}:连接异常({str(e)[:50]}")
f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 连接异常:{str(e)}(错误类型:{type(e).__name__}")
finally: finally:
# 无论何种退出原因、确保客户端从列表中移除 # 清理连接
if client_ip in connected_clients: connected_clients.pop(client_ip, None)
del connected_clients[client_ip] print(f"[{datetime.datetime.now():%H:%M:%S}] 客户端{client_ip}:连接已清理,当前在线{len(connected_clients)}")
print(
f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已从连接列表移除、当前在线数:{len(connected_clients)}")