Files
video_detect/router/device_router.py

330 lines
12 KiB
Python
Raw Permalink Normal View History

2025-09-30 17:17:20 +08:00
import asyncio
import json
from datetime import date
from fastapi import APIRouter, Query, HTTPException, Request, Path
from mysql.connector import Error as MySQLError
from ds.db import db
from encryption.encrypt_decorator import encrypt_response
from schema.device_schema import (
DeviceCreateRequest, DeviceResponse, DeviceListResponse,
DeviceStatusHistoryResponse, DeviceStatusHistoryListResponse
)
from schema.response_schema import APIResponse
from service.device_service import update_online_status_by_ip
from ws.ws import get_current_time_str, aes_encrypt, is_client_connected
router = APIRouter(
prefix="/api/devices",
tags=["设备管理"]
)
# 创建设备信息接口
@router.post("/add", response_model=APIResponse, summary="创建设备信息")
@encrypt_response()
async def create_device(device_data: DeviceCreateRequest, request: Request):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 检查设备是否已存在
cursor.execute("SELECT * FROM devices WHERE client_ip = %s", (device_data.ip,))
existing_device = cursor.fetchone()
if existing_device:
# 更新设备为在线状态
from service.device_service import update_online_status_by_ip
update_online_status_by_ip(client_ip=device_data.ip, online_status=1)
return APIResponse(
code=200,
message=f"设备IP {device_data.ip} 已存在、返回已有设备信息",
data=DeviceResponse(**existing_device)
)
# 通过 User-Agent 判断设备类型
user_agent = request.headers.get("User-Agent", "").lower()
device_type = "unknown"
if user_agent == "default":
device_type = device_data.params.get("os") if (device_data.params and isinstance(device_data.params, dict)) else "unknown"
elif "windows" in user_agent:
device_type = "windows"
elif "android" in user_agent:
device_type = "android"
elif "linux" in user_agent:
device_type = "linux"
device_params_json = json.dumps(device_data.params) if device_data.params else None
# 插入新设备
insert_query = """
INSERT INTO devices
(client_ip, hostname, device_online_status, device_type, alarm_count, params, is_need_handler)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
device_data.ip,
device_data.hostname,
0,
device_type,
0,
device_params_json,
0
))
conn.commit()
# 获取新设备并返回
device_id = cursor.lastrowid
cursor.execute("SELECT * FROM devices WHERE id = %s", (device_id,))
new_device = cursor.fetchone()
return APIResponse(
code=200,
message="设备创建成功",
data=DeviceResponse(**new_device)
)
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"创建设备失败: {str(e)}") from e
except json.JSONDecodeError as e:
raise Exception(f"设备参数JSON序列化失败: {str(e)}") from e
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 获取设备列表接口
# ------------------------------
@router.get("/", response_model=APIResponse, summary="获取设备列表(支持筛选分页)")
@encrypt_response()
async def get_device_list(
page: int = Query(1, ge=1, description="页码、默认第1页"),
page_size: int = Query(10, ge=1, le=100, description="每页条数、1-100之间"),
device_type: str = Query(None, description="按设备类型筛选"),
online_status: int = Query(None, ge=0, le=1, description="按在线状态筛选")
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
where_clause = []
params = []
if device_type:
where_clause.append("device_type = %s")
params.append(device_type)
if online_status is not None:
where_clause.append("device_online_status = %s")
params.append(online_status)
# 统计总数
count_query = "SELECT COUNT(*) AS total FROM devices"
if where_clause:
count_query += " WHERE " + " AND ".join(where_clause)
cursor.execute(count_query, params)
total = cursor.fetchone()["total"]
# 分页查询列表
offset = (page - 1) * page_size
list_query = "SELECT * FROM devices"
if where_clause:
list_query += " WHERE " + " AND ".join(where_clause)
list_query += " ORDER BY id DESC LIMIT %s OFFSET %s"
params.extend([page_size, offset])
cursor.execute(list_query, params)
device_list = cursor.fetchall()
return APIResponse(
code=200,
message="获取设备列表成功",
data=DeviceListResponse(
total=total,
devices=[DeviceResponse(**device) for device in device_list]
)
)
except MySQLError as e:
raise Exception(f"获取设备列表失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 获取设备上下线记录接口
# ------------------------------
@router.get("/status-history", response_model=APIResponse, summary="获取设备上下线记录")
@encrypt_response()
async def get_device_status_history(
client_ip: str = Query(None, description="客户端IP地址非必填为空时返回所有设备记录"),
page: int = Query(1, ge=1, description="页码、默认第1页"),
page_size: int = Query(10, ge=1, le=100, description="每页条数、1-100之间"),
start_date: date = Query(None, description="开始日期、格式YYYY-MM-DD"),
end_date: date = Query(None, description="结束日期、格式YYYY-MM-DD")
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 检查设备是否存在仅传IP时强制指定Collation
if client_ip is not None:
# 关键调整1WHERE条件中给d.client_ip指定Collation与da一致或反之
check_query = """
SELECT id FROM devices
WHERE client_ip COLLATE utf8mb4_general_ci = %s COLLATE utf8mb4_general_ci
"""
cursor.execute(check_query, (client_ip,))
device = cursor.fetchone()
if not device:
raise HTTPException(status_code=404, detail=f"客户端IP为 {client_ip} 的设备不存在")
# 2. 构建WHERE条件
where_clause = []
params = []
# 关键调整2传IP时强制指定da.client_ip的Collation
if client_ip is not None:
where_clause.append("da.client_ip COLLATE utf8mb4_general_ci = %s COLLATE utf8mb4_general_ci")
params.append(client_ip)
if start_date:
where_clause.append("DATE(da.created_at) >= %s")
params.append(start_date.strftime("%Y-%m-%d"))
if end_date:
where_clause.append("DATE(da.created_at) <= %s")
params.append(end_date.strftime("%Y-%m-%d"))
# 3. 统计总数JOIN时强制统一Collation
count_query = """
SELECT COUNT(*) AS total
FROM device_action da
LEFT JOIN devices d
ON da.client_ip COLLATE utf8mb4_general_ci = d.client_ip COLLATE utf8mb4_general_ci
"""
if where_clause:
count_query += " WHERE " + " AND ".join(where_clause)
cursor.execute(count_query, params)
total = cursor.fetchone()["total"]
# 4. 分页查询JOIN时强制统一Collation
offset = (page - 1) * page_size
list_query = """
SELECT da.*, d.id AS device_id
FROM device_action da
LEFT JOIN devices d
ON da.client_ip COLLATE utf8mb4_general_ci = d.client_ip COLLATE utf8mb4_general_ci
"""
if where_clause:
list_query += " WHERE " + " AND ".join(where_clause)
list_query += " ORDER BY da.created_at DESC LIMIT %s OFFSET %s"
params.extend([page_size, offset])
cursor.execute(list_query, params)
history_list = cursor.fetchall()
# 后续格式化响应逻辑不变...
formatted_history = []
for item in history_list:
formatted_item = {
"id": item["id"],
"device_id": item["device_id"], # 可能为NoneIP无对应设备
"client_ip": item["client_ip"],
"status": item["action"],
"status_time": item["created_at"]
}
formatted_history.append(formatted_item)
return APIResponse(
code=200,
message="获取设备上下线记录成功",
data=DeviceStatusHistoryListResponse(
total=total,
history=[DeviceStatusHistoryResponse(**item) for item in formatted_history]
)
)
except MySQLError as e:
raise Exception(f"获取设备上下线记录失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 通过客户端IP设置设备is_need_handler为0接口
# ------------------------------
@router.post("/need-handler/reset", response_model=APIResponse, summary="解封客户端")
@encrypt_response()
async def reset_device_need_handler(
client_ip: str = Query(..., description="目标设备的客户端IP地址必填")
):
try:
from service.device_service import update_is_need_handler_by_client_ip
success = update_is_need_handler_by_client_ip(
client_ip=client_ip,
is_need_handler=0 # 固定设置为0不需要处理
)
if success:
online_status = is_client_connected(client_ip)
# 如果设备在线,则发送消息给前端
if online_status:
# 调用 ws 发送一个消息给前端、告诉他已解锁
unlock_msg = {
"type": "unlock",
"timestamp": get_current_time_str(),
"client_ip": client_ip
}
from ws.ws import send_message_to_client
await send_message_to_client(client_ip, json.dumps(unlock_msg))
# 休眠 100 ms
await asyncio.sleep(0.1)
frame_permit_msg = {
"type": "frame",
"timestamp": get_current_time_str(),
"client_ip": client_ip
}
await send_message_to_client(client_ip, json.dumps(frame_permit_msg))
# 更新设备在线状态为1
update_online_status_by_ip(client_ip, 1)
return APIResponse(
code=200,
message=f"设备已解封",
data={
"client_ip": client_ip,
"is_need_handler": 0,
"status_desc": "设备已解封"
}
)
# 捕获工具方法抛出的业务异常如IP为空、设备不存在
except ValueError as e:
# 业务异常返回400/404状态码与现有接口异常规范一致
raise HTTPException(
status_code=404 if "设备不存在" in str(e) else 400,
detail=str(e)
) from e
# 捕获数据库层面异常如连接失败、SQL执行错误
except MySQLError as e:
raise Exception(f"设置is_need_handler失败数据库操作异常 - {str(e)}") from e
# 捕获其他未知异常
except Exception as e:
raise Exception(f"设置is_need_handler失败未知错误 - {str(e)}") from e