Files
video_detect/ws/ws.py
2025-09-30 17:17:20 +08:00

444 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import datetime
import json
import os
import base64
from contextlib import asynccontextmanager
from typing import Dict, Optional
import cv2
import numpy as np
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from fastapi import WebSocket, APIRouter, WebSocketDisconnect, FastAPI
from core.detect import detectFrame
from service.device_service import update_online_status_by_ip, get_unique_client_ips, get_is_need_handler_by_ip
AES_SECRET_KEY = b"jr1vA6tfWMHOYi6UXw67UuO6fdak2rMa"
AES_BLOCK_SIZE = 16 # AES固定块大小
HEARTBEAT_INTERVAL = 10 # 心跳检查间隔(秒)
HEARTBEAT_TIMEOUT = 30 # 客户端超时阈值(秒)
WS_ENDPOINT = "/ws" # WebSocket端点路径
FRAME_QUEUE_SIZE = 1 # 帧队列大小限制
ONLINE_STATUS = 1
OFFLINE_STATUS = 0
def aes_encrypt(plaintext: str) -> dict:
try:
iv = os.urandom(AES_BLOCK_SIZE) # 随机IV16字节
cipher = AES.new(AES_SECRET_KEY, AES.MODE_CBC, iv)
padded_plaintext = pad(plaintext.encode("utf-8"), AES_BLOCK_SIZE)
ciphertext = base64.b64encode(cipher.encrypt(padded_plaintext)).decode("utf-8")
iv_base64 = base64.b64encode(iv).decode("utf-8")
return {
"ciphertext": ciphertext,
"iv": iv_base64,
"algorithm": "AES-CBC"
}
except Exception as e:
raise Exception(f"AES加密失败: {str(e)}") from e
def get_current_time_str() -> str:
"""获取格式化时间字符串"""
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# ------------------------------
# 客户端连接类(保持不变)
# ------------------------------
class ClientConnection:
def __init__(self, websocket: WebSocket, client_ip: str):
self.websocket = websocket
self.client_ip = client_ip
self.last_heartbeat = datetime.datetime.now()
self.frame_queue = asyncio.Queue(maxsize=FRAME_QUEUE_SIZE)
self.consumer_task: Optional[asyncio.Task] = None
self.is_connected = False # 连接状态标记(用于连接管理,非业务状态)
def update_heartbeat(self):
"""更新心跳时间"""
self.last_heartbeat = datetime.datetime.now()
def is_alive(self) -> bool:
"""判断客户端是否存活(心跳未超时)"""
timeout_seconds = (datetime.datetime.now() - self.last_heartbeat).total_seconds()
return timeout_seconds < HEARTBEAT_TIMEOUT
def start_consumer(self):
"""启动帧消费任务"""
self.consumer_task = asyncio.create_task(self.consume_frames())
return self.consumer_task
async def send_frame_permit(self):
"""发送加密的帧许可信号(服务器→客户端)"""
try:
frame_permit_msg = {
"type": "frame",
"timestamp": get_current_time_str(),
"client_ip": self.client_ip
}
encrypted_msg = aes_encrypt(json.dumps(frame_permit_msg))
await self.websocket.send_json(encrypted_msg)
print(f"[{get_current_time_str()}] 客户端{self.client_ip}: 已发送加密帧许可")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{self.client_ip}: 帧许可加密/发送失败 - {str(e)}")
async def consume_frames(self) -> None:
"""消费队列中的明文图像帧并调用检测"""
try:
while self.is_connected: # 仅在有效连接状态下处理
frame_data = await self.frame_queue.get()
await self.send_frame_permit() # 回复帧许可
try:
await self.process_frame(frame_data)
finally:
self.frame_queue.task_done()
except asyncio.CancelledError:
print(f"[{get_current_time_str()}] 客户端{self.client_ip}: 帧消费任务已取消")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{self.client_ip}: 帧消费错误 - {str(e)}")
async def process_frame(self, frame_data: bytes) -> None:
"""处理图像帧(调用检测接口)"""
nparr = np.frombuffer(frame_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
print(f"[{get_current_time_str()}] 客户端{self.client_ip}: 无法解析明文图像")
return
try:
# 仅保留检测调用,移除数据库记录相关逻辑
await asyncio.to_thread(detectFrame, self.client_ip, img)
print(f"[{get_current_time_str()}] 客户端{self.client_ip}: 图像检测调用完成")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{self.client_ip}: 检测调用失败 - {str(e)}")
# ------------------------------
# 连接管理与全局任务(基础结构不变,新增状态更新逻辑)
# ------------------------------
connected_clients: Dict[str, ClientConnection] = {}
heartbeat_task: Optional[asyncio.Task] = None
# ------------------------------
# 心跳检查任务(超时后更新设备为下线)
# ------------------------------
async def heartbeat_checker():
"""全局心跳检查(清理超时连接 + 标记设备为下线)"""
while True:
current_time = get_current_time_str()
# 筛选超时连接(心跳超时且处于连接状态)
timeout_ips = [
ip for ip, conn in connected_clients.items()
if conn.is_connected and not conn.is_alive()
]
if timeout_ips:
print(f"[{current_time}] 心跳检查: {len(timeout_ips)}个客户端超时IP: {timeout_ips}")
for ip in timeout_ips:
try:
conn = connected_clients[ip]
try:
update_success = update_online_status_by_ip(ip, OFFLINE_STATUS)
if update_success:
print(f"[{current_time}] 客户端{ip}: 心跳超时,设备状态更新为下线成功")
else:
print(f"[{current_time}] 客户端{ip}: 心跳超时,设备状态更新为下线失败")
except Exception as e:
print(f"[{current_time}] 客户端{ip}: 心跳超时更新下线状态异常 - {str(e)}")
# 2. 原有逻辑:标记连接无效并清理资源
conn.is_connected = False
if conn.consumer_task and not conn.consumer_task.done():
conn.consumer_task.cancel()
await conn.websocket.close(code=1008, reason="心跳超时30秒无响应")
print(f"[{current_time}] 客户端{ip}: 已关闭超时连接")
except Exception as e:
print(f"[{current_time}] 客户端{ip}: 超时连接清理失败 - {str(e)}")
finally:
# 从连接管理列表移除
if ip in connected_clients:
connected_clients.pop(ip, None)
else:
print(f"[{current_time}] 心跳检查: {len(connected_clients)}个客户端在线")
await asyncio.sleep(HEARTBEAT_INTERVAL)
# ------------------------------
# 心跳确认发送(保持不变)
# ------------------------------
async def send_heartbeat_ack(conn: ClientConnection):
"""发送加密的心跳确认(服务器→客户端)"""
try:
heartbeat_ack_msg = {
"type": "heart",
"timestamp": get_current_time_str(),
"client_ip": conn.client_ip
}
encrypted_msg = aes_encrypt(json.dumps(heartbeat_ack_msg))
await conn.websocket.send_json(encrypted_msg)
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 已发送加密心跳确认")
return True
except Exception as e:
# 心跳确认失败时清理连接
if conn.client_ip in connected_clients:
connected_clients.pop(conn.client_ip, None)
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 心跳确认失败 - {str(e)}")
return False
# ------------------------------
# 文本消息处理(新增:心跳时更新设备为在线)
# ------------------------------
async def handle_text_msg(conn: ClientConnection, text: str):
"""处理客户端明文文本消息(心跳+状态更新)"""
try:
msg = json.loads(text)
if msg.get("type") == "heart":
# 1. 原有逻辑:更新心跳时间
conn.update_heartbeat()
# 2. 新增逻辑:更新设备状态为在线
try:
update_success = update_online_status_by_ip(conn.client_ip, ONLINE_STATUS)
if update_success:
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 设备在线状态更新为在线成功")
else:
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 设备在线状态更新为在线失败")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 更新在线状态时异常 - {str(e)}")
# 3. 原有逻辑:发送心跳确认
await send_heartbeat_ack(conn)
else:
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 未知明文文本类型({msg.get('type')}")
except json.JSONDecodeError:
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 无效JSON格式明文文本")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 明文文本消息处理失败 - {str(e)}")
# ------------------------------
# 路由与生命周期(保持不变)
# ------------------------------
ws_router = APIRouter()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期:启动/停止心跳任务"""
global heartbeat_task
# 启动心跳检查任务
heartbeat_task = asyncio.create_task(heartbeat_checker())
print(f"[{get_current_time_str()}] 心跳检查任务启动ID: {id(heartbeat_task)}")
yield
# 应用关闭时清理资源
if heartbeat_task and not heartbeat_task.done():
heartbeat_task.cancel()
await heartbeat_task
print(f"[{get_current_time_str()}] 心跳检查任务已取消")
# 清理所有活跃连接 + 标记为下线
for conn in connected_clients.values():
conn.is_connected = False
# 关闭前更新设备为下线
try:
update_online_status_by_ip(conn.client_ip, OFFLINE_STATUS)
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 服务关闭,设备状态更新为下线")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 服务关闭更新状态异常 - {str(e)}")
# 原有清理逻辑
if conn.consumer_task and not conn.consumer_task.done():
conn.consumer_task.cancel()
try:
await conn.websocket.close(code=1001, reason="服务关闭")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{conn.client_ip}: 服务关闭时连接清理失败 - {str(e)}")
connected_clients.clear()
# ------------------------------
# WebSocket端点核心修改连接校验 + 断开时状态更新)
# ------------------------------
@ws_router.websocket(WS_ENDPOINT)
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket连接处理新增设备校验 + 断开下线)"""
client_ip = websocket.client.host if websocket.client else "unknown_ip"
current_time = get_current_time_str()
new_conn: Optional[ClientConnection] = None # 用于后续清理
try:
# ------------------------------
# 新增1设备合法性校验连接前拦截
# ------------------------------
try:
valid_ips = get_unique_client_ips() # 获取合法设备列表
if_handler = get_is_need_handler_by_ip(client_ip)
except Exception as e:
print(f"[{current_time}] 客户端{client_ip}: 合法设备列表获取失败 - {str(e)}")
await websocket.close(code=1011, reason="服务器内部错误(合法设备校验失败)")
return
if client_ip not in valid_ips:
print(f"[{current_time}] 客户端{client_ip}: 不在合法设备列表中,拒绝连接")
await websocket.close(code=1008, reason="未授权设备(不在合法设备列表)")
return
if if_handler:
print(f"[{current_time}] 客户端{client_ip}: 设备需要处理,拒绝连接")
await websocket.close(code=1008, reason="未授权设备(需要处理)")
return
# 校验通过,接受连接
await websocket.accept()
print(f"[{current_time}] 客户端{client_ip}: 已接受连接请求(合法设备)")
# ------------------------------
# 原有逻辑:处理旧连接替换
# ------------------------------
if client_ip in connected_clients:
old_conn = connected_clients[client_ip]
old_conn.is_connected = False
# 旧连接下线更新(新增)
try:
update_online_status_by_ip(client_ip, OFFLINE_STATUS)
print(f"[{current_time}] 客户端{client_ip}: 旧连接替换,设备状态更新为下线")
except Exception as e:
print(f"[{current_time}] 客户端{client_ip}: 旧连接替换更新状态异常 - {str(e)}")
# 原有清理旧连接
if old_conn.consumer_task and not old_conn.consumer_task.done():
old_conn.consumer_task.cancel()
await old_conn.websocket.close(code=1008, reason="新连接建立(替换旧连接)")
connected_clients.pop(client_ip)
print(f"[{current_time}] 客户端{client_ip}: 已关闭旧连接(新连接替换)")
# ------------------------------
# 原有逻辑:初始化新连接
# ------------------------------
new_conn = ClientConnection(websocket, client_ip)
new_conn.is_connected = True
connected_clients[client_ip] = new_conn
new_conn.start_consumer()
# 发送初始帧许可
await new_conn.send_frame_permit()
# 新连接上线更新(新增)
try:
update_online_status_by_ip(client_ip, ONLINE_STATUS)
print(f"[{get_current_time_str()}] 客户端{client_ip}: 新连接初始化,设备状态更新为在线")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 新连接初始化更新状态异常 - {str(e)}")
print(f"[{get_current_time_str()}] 客户端{client_ip}: 连接初始化完成!当前在线数: {len(connected_clients)}")
# ------------------------------
# 原有逻辑:循环接收消息
# ------------------------------
while new_conn.is_connected:
data = await websocket.receive()
if "text" in data:
await handle_text_msg(new_conn, data["text"])
elif "bytes" in data:
frame_data = data["bytes"]
try:
new_conn.frame_queue.put_nowait(frame_data)
print(f"[{get_current_time_str()}] 客户端{client_ip}: 明文图像({len(frame_data)}字节)入队")
except asyncio.QueueFull:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 帧队列已满,丢弃当前图像数据")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 明文图像处理失败 - {str(e)}")
# ------------------------------
# 新增2主动断开连接时更新下线
# ------------------------------
except WebSocketDisconnect as e:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 主动断开连接(代码: {e.code}")
# 主动断开时更新下线
try:
update_success = update_online_status_by_ip(client_ip, OFFLINE_STATUS)
if update_success:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 主动断开,设备状态更新为下线成功")
else:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 主动断开,设备状态更新为下线失败")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 主动断开更新下线状态异常 - {str(e)}")
# ------------------------------
# 原有异常处理
# ------------------------------
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 连接异常 - {str(e)[:50]}")
# 异常断开时更新下线(新增)
try:
update_online_status_by_ip(client_ip, OFFLINE_STATUS)
print(f"[{get_current_time_str()}] 客户端{client_ip}: 异常断开,设备状态更新为下线")
except Exception as ex:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 异常断开更新状态异常 - {str(ex)}")
# ------------------------------
# 新增3最终清理时确保状态下线
# ------------------------------
finally:
# 连接结束时清理资源(仅当连接曾有效初始化)
if client_ip in connected_clients:
conn = connected_clients[client_ip]
conn.is_connected = False
# 再次确认下线(防止漏更)
try:
update_online_status_by_ip(client_ip, OFFLINE_STATUS)
print(f"[{get_current_time_str()}] 客户端{client_ip}: 连接清理,设备状态确认下线")
except Exception as e:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 连接清理确认状态异常 - {str(e)}")
# 原有清理逻辑
if conn.consumer_task and not conn.consumer_task.done():
conn.consumer_task.cancel()
connected_clients.pop(client_ip, None)
print(f"[{get_current_time_str()}] 客户端{client_ip}: 连接资源已清理!当前在线数: {len(connected_clients)}")
# ------------------------------
# 原有功能:客户端连接状态判断(保持不变)
# ------------------------------
async def send_message_to_client(client_ip: str, json_data: str) -> bool:
"""向指定IP的客户端发送加密消息保留基础通信能力"""
try:
if client_ip not in connected_clients:
print(f"[{get_current_time_str()}] 发送消息失败: 客户端{client_ip}不在线")
return False
client_conn = connected_clients[client_ip]
if not client_conn.is_connected or not client_conn.is_alive():
print(f"[{get_current_time_str()}] 发送消息失败: 客户端{client_ip}连接无效(已断开/超时)")
connected_clients.pop(client_ip, None)
return False
encrypted_msg = aes_encrypt(json_data)
await client_conn.websocket.send_json(encrypted_msg)
parsed_data = json.loads(json_data)
print(f"[{get_current_time_str()}] 已向客户端{client_ip}发送消息: {parsed_data.get('type')}")
return True
except Exception as e:
print(f"[{get_current_time_str()}] 向客户端{client_ip}发送消息失败: {str(e)}")
if client_ip in connected_clients:
connected_clients.pop(client_ip, None)
return False
def is_client_connected(client_ip: str) -> bool:
"""判断指定客户端IP是否处于有效连接状态"""
if client_ip not in connected_clients:
print(f"[{get_current_time_str()}] 客户端{client_ip}: 未在活跃连接列表中连接状态False")
return False
client_conn = connected_clients[client_ip]
connection_valid = client_conn.is_connected and client_conn.is_alive()
print(
f"[{get_current_time_str()}] 客户端{client_ip}: "
f"连接标记={client_conn.is_connected}, 心跳存活={client_conn.is_alive()}, "
f"最终有效状态={connection_valid}"
)
return connection_valid