Files
video/ws/ws.py
2025-09-02 19:46:34 +08:00

200 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import WebSocket, APIRouter, WebSocketDisconnect, FastAPI
from typing import Dict, Any, Optional
import datetime
import asyncio
import json
from contextlib import asynccontextmanager
# 创建WebSocket路由
ws_router = APIRouter()
# 客户端连接信息数据结构
class ClientConnection:
def __init__(self, websocket: WebSocket, client_ip: str):
self.websocket = websocket
self.client_ip = client_ip
self.last_heartbeat = datetime.datetime.now() # 初始心跳时间为连接时间
def update_heartbeat(self):
"""更新心跳时间为当前时间"""
self.last_heartbeat = datetime.datetime.now()
# 打印心跳更新日志
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {self.client_ip} 心跳时间已更新")
def is_alive(self, timeout_seconds: int = 60) -> bool:
"""检查客户端是否活跃心跳超时阈值60秒"""
timeout = (datetime.datetime.now() - self.last_heartbeat).total_seconds()
# 打印心跳检查明细(便于排查超时原因)
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {self.client_ip} 心跳检查:"
f"上次心跳距今 {timeout:.1f} 秒(阈值:{timeout_seconds}秒)")
return timeout < timeout_seconds
# 存储所有已连接的客户端key客户端IP、valueClientConnection对象
connected_clients: Dict[str, ClientConnection] = {}
# 心跳检查任务引用(全局变量、用于应用关闭时取消任务)
heartbeat_task: Optional[asyncio.Task] = None
async def heartbeat_checker():
"""定期检查客户端心跳每30秒一次、超时直接剔除不发通知"""
while True:
current_time = datetime.datetime.now()
print(f"\n[{current_time:%Y-%m-%d %H:%M:%S}] === 开始新一轮心跳检查 ===")
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 当前在线客户端总数:{len(connected_clients)}")
# 1. 收集超时客户端IP避免遍历中修改字典
timeout_clients = []
for client_ip, connection in connected_clients.items():
if not connection.is_alive():
timeout_clients.append(client_ip)
# 2. 处理超时客户端(关闭连接+移除记录)
if timeout_clients:
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 发现超时客户端:{timeout_clients}(共{len(timeout_clients)}个)")
for client_ip in timeout_clients:
try:
connection = connected_clients[client_ip]
# 直接关闭连接(不发送任何通知)
await connection.websocket.close(code=1008, reason="心跳超时(>60秒")
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已关闭(超时)")
except Exception as e:
print(
f"[{current_time:%Y-%m-%d %H:%M:%S}] 关闭客户端 {client_ip} 失败:{str(e)}(错误类型:{type(e).__name__}")
finally:
# 确保从客户端列表中移除(无论关闭是否成功)
if client_ip in connected_clients:
del connected_clients[client_ip]
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已从连接列表移除")
else:
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 无超时客户端、心跳检查完成")
# 3. 等待30秒后进行下一轮检查
await asyncio.sleep(30)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理:启动时创建心跳任务、关闭时取消任务"""
global heartbeat_task
# 启动阶段:创建心跳检查任务
heartbeat_task = asyncio.create_task(heartbeat_checker())
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务已启动任务ID{id(heartbeat_task)}")
yield # 应用运行中
# 关闭阶段:取消心跳任务
if heartbeat_task and not heartbeat_task.done():
heartbeat_task.cancel()
try:
await heartbeat_task # 等待任务优雅退出
except asyncio.CancelledError:
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务已正常取消")
except Exception as e:
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 取消心跳任务时出错:{str(e)}")
async def send_heartbeat_ack(client_ip: str, client_timestamp: Any) -> bool:
"""向客户端回复心跳确认(严格遵循 {"timestamp":xxxxx, "type":"heartbeat"} 格式)"""
if client_ip not in connected_clients:
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复心跳失败:客户端 {client_ip} 不在连接列表中")
return False
# 修复将这部分代码移出if语句块、确保始终定义ack_msg
# 服务端当前格式化时间戳(字符串类型、与日志时间格式匹配)
server_latest_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ack_msg = {
"timestamp": server_latest_timestamp,
"type": "heartbeat"
}
try:
connection = connected_clients[client_ip]
await connection.websocket.send_json(ack_msg)
print(
f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 已向客户端 {client_ip} 回复心跳:{json.dumps(ack_msg, ensure_ascii=False)}")
return True
except Exception as e:
print(
f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复客户端 {client_ip} 心跳失败:{str(e)}(错误类型:{type(e).__name__}")
# 发送失败时移除客户端(避免无效连接残留)
if client_ip in connected_clients:
del connected_clients[client_ip]
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 因心跳回复失败被移除")
return False
@ws_router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket核心端点处理连接建立/消息接收/连接关闭"""
current_time = datetime.datetime.now()
# 1. 接受客户端连接请求
await websocket.accept()
# 获取客户端IP作为唯一标识
client_ip = websocket.client.host if websocket.client else "unknown_ip"
print(f"\n[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 连接请求已接受WebSocket握手成功")
try:
# 2. 处理"同一IP重复连接"场景:关闭旧连接、保留新连接
if client_ip in connected_clients:
old_connection = connected_clients[client_ip]
await old_connection.websocket.close(code=1008, reason="同一IP新连接已建立")
del connected_clients[client_ip]
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 已关闭客户端 {client_ip} 的旧连接(新连接已建立)")
# 3. 注册新客户端到连接列表
new_connection = ClientConnection(websocket, client_ip)
connected_clients[client_ip] = new_connection
print(
f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已注册到连接列表、当前在线数:{len(connected_clients)}")
# 4. 循环接收客户端消息(持续监听)
while True:
# 接收原始文本消息避免提前解析JSON、便于日志打印
raw_data = await websocket.receive_text()
recv_time = datetime.datetime.now()
print(f"\n[{recv_time:%Y-%m-%d %H:%M:%S}] 收到客户端 {client_ip} 的消息:{raw_data}")
# 尝试解析JSON消息
try:
message = json.loads(raw_data)
print(
f"[{recv_time:%Y-%m-%d %H:%M:%S}] 消息解析成功:{json.dumps(message, ensure_ascii=False, indent=2)}")
# 5. 区分消息类型:仅处理心跳、其他消息不回复
if message.get("type") == "heartbeat":
# 验证心跳消息是否包含timestamp字段
client_timestamp = message.get("timestamp")
if client_timestamp is None:
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 警告:客户端 {client_ip} 发送的心跳缺少'timestamp'字段")
continue # 不回复无效心跳
# 更新心跳时间 + 回复心跳确认
new_connection.update_heartbeat()
await send_heartbeat_ack(client_ip, client_timestamp)
else:
# 非心跳消息:仅打印日志、不回复任何内容
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 非心跳消息(类型:{message.get('type')})、不回复")
except json.JSONDecodeError as e:
# JSON格式错误仅打印日志、不回复
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 消息格式错误无效JSON错误{str(e)}")
except Exception as e:
# 其他未知错误:仅打印日志、不回复
print(
f"[{recv_time:%Y-%m-%d %H:%M:%S}] 处理客户端 {client_ip} 消息时出错:{str(e)}(错误类型:{type(e).__name__}")
except WebSocketDisconnect as e:
# 客户端主动断开连接(如关闭页面、网络中断)
print(
f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 主动断开连接(代码:{e.code}、原因:{e.reason}")
except Exception as e:
# 其他连接级错误(如网络异常)
print(
f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 连接异常:{str(e)}(错误类型:{type(e).__name__}")
finally:
# 无论何种退出原因、确保客户端从列表中移除
if client_ip in connected_clients:
del connected_clients[client_ip]
print(
f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已从连接列表移除、当前在线数:{len(connected_clients)}")