RTC提交
This commit is contained in:
59
ws/ws.py
59
ws/ws.py
@ -19,13 +19,11 @@ class ClientConnection:
|
|||||||
def update_heartbeat(self):
|
def update_heartbeat(self):
|
||||||
"""更新心跳时间为当前时间"""
|
"""更新心跳时间为当前时间"""
|
||||||
self.last_heartbeat = datetime.datetime.now()
|
self.last_heartbeat = datetime.datetime.now()
|
||||||
# 打印心跳更新日志
|
|
||||||
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {self.client_ip} 心跳时间已更新")
|
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {self.client_ip} 心跳时间已更新")
|
||||||
|
|
||||||
def is_alive(self, timeout_seconds: int = 60) -> bool:
|
def is_alive(self, timeout_seconds: int = 60) -> bool:
|
||||||
"""检查客户端是否活跃(心跳超时阈值:60秒)"""
|
"""检查客户端是否活跃(心跳超时阈值:60秒)"""
|
||||||
timeout = (datetime.datetime.now() - self.last_heartbeat).total_seconds()
|
timeout = (datetime.datetime.now() - self.last_heartbeat).total_seconds()
|
||||||
# 打印心跳检查明细(便于排查超时原因)
|
|
||||||
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {self.client_ip} 心跳检查:"
|
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {self.client_ip} 心跳检查:"
|
||||||
f"上次心跳距今 {timeout:.1f} 秒(阈值:{timeout_seconds}秒)")
|
f"上次心跳距今 {timeout:.1f} 秒(阈值:{timeout_seconds}秒)")
|
||||||
return timeout < timeout_seconds
|
return timeout < timeout_seconds
|
||||||
@ -57,14 +55,12 @@ async def heartbeat_checker():
|
|||||||
for client_ip in timeout_clients:
|
for client_ip in timeout_clients:
|
||||||
try:
|
try:
|
||||||
connection = connected_clients[client_ip]
|
connection = connected_clients[client_ip]
|
||||||
# 直接关闭连接(不发送任何通知)
|
|
||||||
await connection.websocket.close(code=1008, reason="心跳超时(>60秒)")
|
await connection.websocket.close(code=1008, reason="心跳超时(>60秒)")
|
||||||
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已关闭(超时)")
|
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已关闭(超时)")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(
|
print(
|
||||||
f"[{current_time:%Y-%m-%d %H:%M:%S}] 关闭客户端 {client_ip} 失败:{str(e)}(错误类型:{type(e).__name__})")
|
f"[{current_time:%Y-%m-%d %H:%M:%S}] 关闭客户端 {client_ip} 失败:{str(e)}(错误类型:{type(e).__name__})")
|
||||||
finally:
|
finally:
|
||||||
# 确保从客户端列表中移除(无论关闭是否成功)
|
|
||||||
if client_ip in connected_clients:
|
if client_ip in connected_clients:
|
||||||
del connected_clients[client_ip]
|
del connected_clients[client_ip]
|
||||||
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已从连接列表移除")
|
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已从连接列表移除")
|
||||||
@ -79,15 +75,13 @@ async def heartbeat_checker():
|
|||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
"""应用生命周期管理:启动时创建心跳任务、关闭时取消任务"""
|
"""应用生命周期管理:启动时创建心跳任务、关闭时取消任务"""
|
||||||
global heartbeat_task
|
global heartbeat_task
|
||||||
# 启动阶段:创建心跳检查任务
|
|
||||||
heartbeat_task = asyncio.create_task(heartbeat_checker())
|
heartbeat_task = asyncio.create_task(heartbeat_checker())
|
||||||
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务已启动(任务ID:{id(heartbeat_task)})")
|
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务已启动(任务ID:{id(heartbeat_task)})")
|
||||||
yield # 应用运行中
|
yield # 应用运行中
|
||||||
# 关闭阶段:取消心跳任务
|
|
||||||
if heartbeat_task and not heartbeat_task.done():
|
if heartbeat_task and not heartbeat_task.done():
|
||||||
heartbeat_task.cancel()
|
heartbeat_task.cancel()
|
||||||
try:
|
try:
|
||||||
await heartbeat_task # 等待任务优雅退出
|
await heartbeat_task
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务已正常取消")
|
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务已正常取消")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -100,7 +94,6 @@ async def send_heartbeat_ack(client_ip: str, client_timestamp: Any) -> bool:
|
|||||||
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复心跳失败:客户端 {client_ip} 不在连接列表中")
|
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复心跳失败:客户端 {client_ip} 不在连接列表中")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# 修复:将这部分代码移出if语句块、确保始终定义ack_msg
|
|
||||||
# 服务端当前格式化时间戳(字符串类型、与日志时间格式匹配)
|
# 服务端当前格式化时间戳(字符串类型、与日志时间格式匹配)
|
||||||
server_latest_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
server_latest_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
ack_msg = {
|
ack_msg = {
|
||||||
@ -117,13 +110,51 @@ async def send_heartbeat_ack(client_ip: str, client_timestamp: Any) -> bool:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(
|
print(
|
||||||
f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复客户端 {client_ip} 心跳失败:{str(e)}(错误类型:{type(e).__name__})")
|
f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复客户端 {client_ip} 心跳失败:{str(e)}(错误类型:{type(e).__name__})")
|
||||||
# 发送失败时移除客户端(避免无效连接残留)
|
|
||||||
if client_ip in connected_clients:
|
if client_ip in connected_clients:
|
||||||
del connected_clients[client_ip]
|
del connected_clients[client_ip]
|
||||||
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 因心跳回复失败被移除")
|
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 因心跳回复失败被移除")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# 新增:向指定客户端发送消息的方法
|
||||||
|
async def send_message_to_client(client_ip: str, message: Dict[str, Any]) -> bool:
|
||||||
|
"""
|
||||||
|
向指定客户端IP发送JSON消息(需确保客户端已在连接列表中)
|
||||||
|
|
||||||
|
:param client_ip: 目标客户端IP(必须与`connected_clients`中的key完全一致,如"192.168.1.100")
|
||||||
|
:param message: 待发送的JSON消息(需为Dict类型,FastAPI会自动序列化为JSON字符串)
|
||||||
|
:return: 发送成功返回True,失败(客户端不存在/发送异常)返回False
|
||||||
|
"""
|
||||||
|
current_time = datetime.datetime.now()
|
||||||
|
# 1. 校验客户端是否在线
|
||||||
|
if client_ip not in connected_clients:
|
||||||
|
print(
|
||||||
|
f"[{current_time:%Y-%m-%d %H:%M:%S}] 发送消息失败:客户端 {client_ip} 不在连接列表中(当前在线数:{len(connected_clients)})")
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 2. 获取客户端连接实例并发送消息
|
||||||
|
client_conn = connected_clients[client_ip]
|
||||||
|
await client_conn.websocket.send_json(message)
|
||||||
|
|
||||||
|
# 3. 打印发送成功日志(格式化消息便于查看)
|
||||||
|
formatted_msg = json.dumps(message, ensure_ascii=False, indent=2)
|
||||||
|
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 已向客户端 {client_ip} 发送消息:\n{formatted_msg}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# 4. 处理发送异常(如网络中断、客户端已离线但未被清理)
|
||||||
|
error_msg = f"[{current_time:%Y-%m-%d %H:%M:%S}] 向客户端 {client_ip} 发送消息失败:{str(e)}(错误类型:{type(e).__name__})"
|
||||||
|
print(error_msg)
|
||||||
|
|
||||||
|
# 5. 清理无效连接(避免僵尸连接残留)
|
||||||
|
if client_ip in connected_clients:
|
||||||
|
del connected_clients[client_ip]
|
||||||
|
print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 因发送异常已从连接列表移除")
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@ws_router.websocket("/ws")
|
@ws_router.websocket("/ws")
|
||||||
async def websocket_endpoint(websocket: WebSocket):
|
async def websocket_endpoint(websocket: WebSocket):
|
||||||
"""WebSocket核心端点:处理连接建立/消息接收/连接关闭"""
|
"""WebSocket核心端点:处理连接建立/消息接收/连接关闭"""
|
||||||
@ -150,7 +181,6 @@ async def websocket_endpoint(websocket: WebSocket):
|
|||||||
|
|
||||||
# 4. 循环接收客户端消息(持续监听)
|
# 4. 循环接收客户端消息(持续监听)
|
||||||
while True:
|
while True:
|
||||||
# 接收原始文本消息(避免提前解析JSON、便于日志打印)
|
|
||||||
raw_data = await websocket.receive_text()
|
raw_data = await websocket.receive_text()
|
||||||
recv_time = datetime.datetime.now()
|
recv_time = datetime.datetime.now()
|
||||||
print(f"\n[{recv_time:%Y-%m-%d %H:%M:%S}] 收到客户端 {client_ip} 的消息:{raw_data}")
|
print(f"\n[{recv_time:%Y-%m-%d %H:%M:%S}] 收到客户端 {client_ip} 的消息:{raw_data}")
|
||||||
@ -163,33 +193,26 @@ async def websocket_endpoint(websocket: WebSocket):
|
|||||||
|
|
||||||
# 5. 区分消息类型:仅处理心跳、其他消息不回复
|
# 5. 区分消息类型:仅处理心跳、其他消息不回复
|
||||||
if message.get("type") == "heartbeat":
|
if message.get("type") == "heartbeat":
|
||||||
# 验证心跳消息是否包含timestamp字段
|
|
||||||
client_timestamp = message.get("timestamp")
|
client_timestamp = message.get("timestamp")
|
||||||
if client_timestamp is None:
|
if client_timestamp is None:
|
||||||
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 警告:客户端 {client_ip} 发送的心跳缺少'timestamp'字段")
|
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 警告:客户端 {client_ip} 发送的心跳缺少'timestamp'字段")
|
||||||
continue # 不回复无效心跳
|
continue
|
||||||
|
|
||||||
# 更新心跳时间 + 回复心跳确认
|
|
||||||
new_connection.update_heartbeat()
|
new_connection.update_heartbeat()
|
||||||
await send_heartbeat_ack(client_ip, client_timestamp)
|
await send_heartbeat_ack(client_ip, client_timestamp)
|
||||||
else:
|
else:
|
||||||
# 非心跳消息:仅打印日志、不回复任何内容
|
|
||||||
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 非心跳消息(类型:{message.get('type')})、不回复")
|
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 非心跳消息(类型:{message.get('type')})、不回复")
|
||||||
|
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
# JSON格式错误:仅打印日志、不回复
|
|
||||||
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 消息格式错误:无效JSON(错误:{str(e)})")
|
print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 消息格式错误:无效JSON(错误:{str(e)})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# 其他未知错误:仅打印日志、不回复
|
|
||||||
print(
|
print(
|
||||||
f"[{recv_time:%Y-%m-%d %H:%M:%S}] 处理客户端 {client_ip} 消息时出错:{str(e)}(错误类型:{type(e).__name__})")
|
f"[{recv_time:%Y-%m-%d %H:%M:%S}] 处理客户端 {client_ip} 消息时出错:{str(e)}(错误类型:{type(e).__name__})")
|
||||||
|
|
||||||
except WebSocketDisconnect as e:
|
except WebSocketDisconnect as e:
|
||||||
# 客户端主动断开连接(如关闭页面、网络中断)
|
|
||||||
print(
|
print(
|
||||||
f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 主动断开连接(代码:{e.code}、原因:{e.reason})")
|
f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 主动断开连接(代码:{e.code}、原因:{e.reason})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# 其他连接级错误(如网络异常)
|
|
||||||
print(
|
print(
|
||||||
f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 连接异常:{str(e)}(错误类型:{type(e).__name__})")
|
f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 连接异常:{str(e)}(错误类型:{type(e).__name__})")
|
||||||
finally:
|
finally:
|
||||||
|
Reference in New Issue
Block a user