330 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			330 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import asyncio
 | ||
| import json
 | ||
| from datetime import date
 | ||
| 
 | ||
| from fastapi import APIRouter, Query, HTTPException, Request, Path
 | ||
| from mysql.connector import Error as MySQLError
 | ||
| from ds.db import db
 | ||
| from encryption.encrypt_decorator import encrypt_response
 | ||
| from schema.device_schema import (
 | ||
|     DeviceCreateRequest, DeviceResponse, DeviceListResponse,
 | ||
|     DeviceStatusHistoryResponse, DeviceStatusHistoryListResponse
 | ||
| )
 | ||
| from schema.response_schema import APIResponse
 | ||
| from service.device_service import update_online_status_by_ip
 | ||
| from ws.ws import get_current_time_str, aes_encrypt, is_client_connected
 | ||
| 
 | ||
| router = APIRouter(
 | ||
|     prefix="/api/devices",
 | ||
|     tags=["设备管理"]
 | ||
| )
 | ||
| 
 | ||
| 
 | ||
| # 创建设备信息接口
 | ||
| @router.post("/add", response_model=APIResponse, summary="创建设备信息")
 | ||
| @encrypt_response()
 | ||
| async def create_device(device_data: DeviceCreateRequest, request: Request):
 | ||
|     conn = None
 | ||
|     cursor = None
 | ||
|     try:
 | ||
|         conn = db.get_connection()
 | ||
|         cursor = conn.cursor(dictionary=True)
 | ||
| 
 | ||
|         # 检查设备是否已存在
 | ||
|         cursor.execute("SELECT * FROM devices WHERE client_ip = %s", (device_data.ip,))
 | ||
|         existing_device = cursor.fetchone()
 | ||
|         if existing_device:
 | ||
|             # 更新设备为在线状态
 | ||
|             from service.device_service import update_online_status_by_ip
 | ||
|             update_online_status_by_ip(client_ip=device_data.ip, online_status=1)
 | ||
|             return APIResponse(
 | ||
|                 code=200,
 | ||
|                 message=f"设备IP {device_data.ip} 已存在、返回已有设备信息",
 | ||
|                 data=DeviceResponse(**existing_device)
 | ||
|             )
 | ||
| 
 | ||
|         # 通过 User-Agent 判断设备类型
 | ||
|         user_agent = request.headers.get("User-Agent", "").lower()
 | ||
|         device_type = "unknown"
 | ||
|         if user_agent == "default":
 | ||
|             device_type = device_data.params.get("os") if (device_data.params and isinstance(device_data.params, dict)) else "unknown"
 | ||
|         elif "windows" in user_agent:
 | ||
|             device_type = "windows"
 | ||
|         elif "android" in user_agent:
 | ||
|             device_type = "android"
 | ||
|         elif "linux" in user_agent:
 | ||
|             device_type = "linux"
 | ||
| 
 | ||
|         device_params_json = json.dumps(device_data.params) if device_data.params else None
 | ||
| 
 | ||
|         # 插入新设备
 | ||
|         insert_query = """
 | ||
|             INSERT INTO devices 
 | ||
|             (client_ip, hostname, device_online_status, device_type, alarm_count, params, is_need_handler)
 | ||
|             VALUES (%s, %s, %s, %s, %s, %s, %s)
 | ||
|         """
 | ||
|         cursor.execute(insert_query, (
 | ||
|             device_data.ip,
 | ||
|             device_data.hostname,
 | ||
|             0,
 | ||
|             device_type,
 | ||
|             0,
 | ||
|             device_params_json,
 | ||
|             0
 | ||
|         ))
 | ||
|         conn.commit()
 | ||
| 
 | ||
|         # 获取新设备并返回
 | ||
|         device_id = cursor.lastrowid
 | ||
|         cursor.execute("SELECT * FROM devices WHERE id = %s", (device_id,))
 | ||
|         new_device = cursor.fetchone()
 | ||
| 
 | ||
|         return APIResponse(
 | ||
|             code=200,
 | ||
|             message="设备创建成功",
 | ||
|             data=DeviceResponse(**new_device)
 | ||
|         )
 | ||
| 
 | ||
|     except MySQLError as e:
 | ||
|         if conn:
 | ||
|             conn.rollback()
 | ||
|         raise Exception(f"创建设备失败: {str(e)}") from e
 | ||
|     except json.JSONDecodeError as e:
 | ||
|         raise Exception(f"设备参数JSON序列化失败: {str(e)}") from e
 | ||
|     except Exception as e:
 | ||
|         if conn:
 | ||
|             conn.rollback()
 | ||
|         raise e
 | ||
|     finally:
 | ||
|         db.close_connection(conn, cursor)
 | ||
| 
 | ||
| 
 | ||
| # ------------------------------
 | ||
| # 获取设备列表接口
 | ||
| # ------------------------------
 | ||
| @router.get("/", response_model=APIResponse, summary="获取设备列表(支持筛选分页)")
 | ||
| @encrypt_response()
 | ||
| async def get_device_list(
 | ||
|         page: int = Query(1, ge=1, description="页码、默认第1页"),
 | ||
|         page_size: int = Query(10, ge=1, le=100, description="每页条数、1-100之间"),
 | ||
|         device_type: str = Query(None, description="按设备类型筛选"),
 | ||
|         online_status: int = Query(None, ge=0, le=1, description="按在线状态筛选")
 | ||
| ):
 | ||
|     conn = None
 | ||
|     cursor = None
 | ||
|     try:
 | ||
|         conn = db.get_connection()
 | ||
|         cursor = conn.cursor(dictionary=True)
 | ||
| 
 | ||
|         where_clause = []
 | ||
|         params = []
 | ||
| 
 | ||
|         if device_type:
 | ||
|             where_clause.append("device_type = %s")
 | ||
|             params.append(device_type)
 | ||
|         if online_status is not None:
 | ||
|             where_clause.append("device_online_status = %s")
 | ||
|             params.append(online_status)
 | ||
| 
 | ||
|         # 统计总数
 | ||
|         count_query = "SELECT COUNT(*) AS total FROM devices"
 | ||
|         if where_clause:
 | ||
|             count_query += " WHERE " + " AND ".join(where_clause)
 | ||
|         cursor.execute(count_query, params)
 | ||
|         total = cursor.fetchone()["total"]
 | ||
| 
 | ||
|         # 分页查询列表
 | ||
|         offset = (page - 1) * page_size
 | ||
|         list_query = "SELECT * FROM devices"
 | ||
|         if where_clause:
 | ||
|             list_query += " WHERE " + " AND ".join(where_clause)
 | ||
|         list_query += " ORDER BY id DESC LIMIT %s OFFSET %s"
 | ||
|         params.extend([page_size, offset])
 | ||
| 
 | ||
|         cursor.execute(list_query, params)
 | ||
|         device_list = cursor.fetchall()
 | ||
| 
 | ||
|         return APIResponse(
 | ||
|             code=200,
 | ||
|             message="获取设备列表成功",
 | ||
|             data=DeviceListResponse(
 | ||
|                 total=total,
 | ||
|                 devices=[DeviceResponse(**device) for device in device_list]
 | ||
|             )
 | ||
|         )
 | ||
| 
 | ||
|     except MySQLError as e:
 | ||
|         raise Exception(f"获取设备列表失败: {str(e)}") from e
 | ||
|     finally:
 | ||
|         db.close_connection(conn, cursor)
 | ||
| 
 | ||
| 
 | ||
| # ------------------------------
 | ||
| # 获取设备上下线记录接口
 | ||
| # ------------------------------
 | ||
| @router.get("/status-history", response_model=APIResponse, summary="获取设备上下线记录")
 | ||
| @encrypt_response()
 | ||
| async def get_device_status_history(
 | ||
|         client_ip: str = Query(None, description="客户端IP地址(非必填,为空时返回所有设备记录)"),
 | ||
|         page: int = Query(1, ge=1, description="页码、默认第1页"),
 | ||
|         page_size: int = Query(10, ge=1, le=100, description="每页条数、1-100之间"),
 | ||
|         start_date: date = Query(None, description="开始日期、格式YYYY-MM-DD"),
 | ||
|         end_date: date = Query(None, description="结束日期、格式YYYY-MM-DD")
 | ||
| ):
 | ||
|     conn = None
 | ||
|     cursor = None
 | ||
|     try:
 | ||
|         conn = db.get_connection()
 | ||
|         cursor = conn.cursor(dictionary=True)
 | ||
| 
 | ||
|         # 1. 检查设备是否存在(仅传IP时):强制指定Collation
 | ||
|         if client_ip is not None:
 | ||
|             # 关键调整1:WHERE条件中给d.client_ip指定Collation(与da一致或反之)
 | ||
|             check_query = """
 | ||
|                 SELECT id FROM devices 
 | ||
|                 WHERE client_ip COLLATE utf8mb4_general_ci = %s COLLATE utf8mb4_general_ci
 | ||
|             """
 | ||
|             cursor.execute(check_query, (client_ip,))
 | ||
|             device = cursor.fetchone()
 | ||
|             if not device:
 | ||
|                 raise HTTPException(status_code=404, detail=f"客户端IP为 {client_ip} 的设备不存在")
 | ||
| 
 | ||
|         # 2. 构建WHERE条件
 | ||
|         where_clause = []
 | ||
|         params = []
 | ||
| 
 | ||
|         # 关键调整2:传IP时,强制指定da.client_ip的Collation
 | ||
|         if client_ip is not None:
 | ||
|             where_clause.append("da.client_ip COLLATE utf8mb4_general_ci = %s COLLATE utf8mb4_general_ci")
 | ||
|             params.append(client_ip)
 | ||
|         if start_date:
 | ||
|             where_clause.append("DATE(da.created_at) >= %s")
 | ||
|             params.append(start_date.strftime("%Y-%m-%d"))
 | ||
|         if end_date:
 | ||
|             where_clause.append("DATE(da.created_at) <= %s")
 | ||
|             params.append(end_date.strftime("%Y-%m-%d"))
 | ||
| 
 | ||
|         # 3. 统计总数:JOIN时强制统一Collation
 | ||
|         count_query = """
 | ||
|             SELECT COUNT(*) AS total 
 | ||
|             FROM device_action da 
 | ||
|             LEFT JOIN devices d 
 | ||
|                 ON da.client_ip COLLATE utf8mb4_general_ci = d.client_ip COLLATE utf8mb4_general_ci
 | ||
|         """
 | ||
|         if where_clause:
 | ||
|             count_query += " WHERE " + " AND ".join(where_clause)
 | ||
|         cursor.execute(count_query, params)
 | ||
|         total = cursor.fetchone()["total"]
 | ||
| 
 | ||
|         # 4. 分页查询:JOIN时强制统一Collation
 | ||
|         offset = (page - 1) * page_size
 | ||
|         list_query = """
 | ||
|             SELECT da.*, d.id AS device_id 
 | ||
|             FROM device_action da 
 | ||
|             LEFT JOIN devices d 
 | ||
|                 ON da.client_ip COLLATE utf8mb4_general_ci = d.client_ip COLLATE utf8mb4_general_ci
 | ||
|         """
 | ||
|         if where_clause:
 | ||
|             list_query += " WHERE " + " AND ".join(where_clause)
 | ||
|         list_query += " ORDER BY da.created_at DESC LIMIT %s OFFSET %s"
 | ||
|         params.extend([page_size, offset])
 | ||
|         cursor.execute(list_query, params)
 | ||
|         history_list = cursor.fetchall()
 | ||
| 
 | ||
|         # 后续格式化响应逻辑不变...
 | ||
|         formatted_history = []
 | ||
|         for item in history_list:
 | ||
|             formatted_item = {
 | ||
|                 "id": item["id"],
 | ||
|                 "device_id": item["device_id"],  # 可能为None(IP无对应设备)
 | ||
|                 "client_ip": item["client_ip"],
 | ||
|                 "status": item["action"],
 | ||
|                 "status_time": item["created_at"]
 | ||
|             }
 | ||
|             formatted_history.append(formatted_item)
 | ||
| 
 | ||
|         return APIResponse(
 | ||
|             code=200,
 | ||
|             message="获取设备上下线记录成功",
 | ||
|             data=DeviceStatusHistoryListResponse(
 | ||
|                 total=total,
 | ||
|                 history=[DeviceStatusHistoryResponse(**item) for item in formatted_history]
 | ||
|             )
 | ||
|         )
 | ||
| 
 | ||
|     except MySQLError as e:
 | ||
|         raise Exception(f"获取设备上下线记录失败: {str(e)}") from e
 | ||
|     finally:
 | ||
|         db.close_connection(conn, cursor)
 | ||
| 
 | ||
| # ------------------------------
 | ||
| # 通过客户端IP设置设备is_need_handler为0接口
 | ||
| # ------------------------------
 | ||
| @router.post("/need-handler/reset", response_model=APIResponse, summary="解封客户端")
 | ||
| @encrypt_response()
 | ||
| async def reset_device_need_handler(
 | ||
|     client_ip: str = Query(..., description="目标设备的客户端IP地址(必填)")
 | ||
| ):
 | ||
|     try:
 | ||
|         from service.device_service import update_is_need_handler_by_client_ip
 | ||
|         success = update_is_need_handler_by_client_ip(
 | ||
|             client_ip=client_ip,
 | ||
|             is_need_handler=0  # 固定设置为0(不需要处理)
 | ||
|         )
 | ||
| 
 | ||
|         if success:
 | ||
|             online_status = is_client_connected(client_ip)
 | ||
| 
 | ||
|             # 如果设备在线,则发送消息给前端
 | ||
|             if online_status:
 | ||
|                 # 调用 ws 发送一个消息给前端、告诉他已解锁
 | ||
|                 unlock_msg = {
 | ||
|                     "type": "unlock",
 | ||
|                     "timestamp": get_current_time_str(),
 | ||
|                     "client_ip": client_ip
 | ||
|                 }
 | ||
|                 from ws.ws import send_message_to_client
 | ||
|                 await send_message_to_client(client_ip, json.dumps(unlock_msg))
 | ||
| 
 | ||
|                 # 休眠 100 ms
 | ||
|                 await asyncio.sleep(0.1)
 | ||
| 
 | ||
|                 frame_permit_msg = {
 | ||
|                     "type": "frame",
 | ||
|                     "timestamp": get_current_time_str(),
 | ||
|                     "client_ip": client_ip
 | ||
|                 }
 | ||
|                 await send_message_to_client(client_ip, json.dumps(frame_permit_msg))
 | ||
| 
 | ||
|                 # 更新设备在线状态为1
 | ||
|                 update_online_status_by_ip(client_ip, 1)
 | ||
| 
 | ||
|             return APIResponse(
 | ||
|                 code=200,
 | ||
|                 message=f"设备已解封",
 | ||
|                 data={
 | ||
|                     "client_ip": client_ip,
 | ||
|                     "is_need_handler": 0,
 | ||
|                     "status_desc": "设备已解封"
 | ||
|                 }
 | ||
|             )
 | ||
| 
 | ||
|     # 捕获工具方法抛出的业务异常(如IP为空、设备不存在)
 | ||
|     except ValueError as e:
 | ||
|         # 业务异常返回400/404状态码(与现有接口异常规范一致)
 | ||
|         raise HTTPException(
 | ||
|             status_code=404 if "设备不存在" in str(e) else 400,
 | ||
|             detail=str(e)
 | ||
|         ) from e
 | ||
| 
 | ||
|     # 捕获数据库层面异常(如连接失败、SQL执行错误)
 | ||
|     except MySQLError as e:
 | ||
|         raise Exception(f"设置is_need_handler失败:数据库操作异常 - {str(e)}") from e
 | ||
| 
 | ||
|     # 捕获其他未知异常
 | ||
|     except Exception as e:
 | ||
|         raise Exception(f"设置is_need_handler失败:未知错误 - {str(e)}") from e
 | ||
| 
 | ||
| 
 | ||
| 
 |