Files
video/service/device_action_service.py

159 lines
5.2 KiB
Python
Raw Permalink Normal View History

2025-09-08 17:34:23 +08:00
from fastapi import APIRouter, Query, Path
2025-09-04 12:29:27 +08:00
from mysql.connector import Error as MySQLError
from ds.db import db
from schema.device_action_schema import (
DeviceActionCreate,
DeviceActionResponse,
DeviceActionListResponse
)
from schema.response_schema import APIResponse
# 路由配置
router = APIRouter(
prefix="/device/actions",
tags=["设备操作记录"]
)
# ------------------------------
2025-09-08 17:34:23 +08:00
# 内部方法: 新增设备操作记录适配id自增
2025-09-04 12:29:27 +08:00
# ------------------------------
def add_device_action(action_data: DeviceActionCreate) -> DeviceActionResponse:
"""
2025-09-08 17:34:23 +08:00
新增设备操作记录内部方法非接口
2025-09-04 12:29:27 +08:00
:param action_data: 含client_ip和action0/1
:return: 新增的完整记录
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
2025-09-08 17:34:23 +08:00
# 插入SQLid自增、依赖数据库自动生成
2025-09-04 12:29:27 +08:00
insert_query = """
INSERT INTO device_action
(client_ip, action, created_at, updated_at)
VALUES (%s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
"""
cursor.execute(insert_query, (
action_data.client_ip,
action_data.action
))
conn.commit()
# 获取新增记录通过自增ID查询
new_id = cursor.lastrowid
cursor.execute("SELECT * FROM device_action WHERE id = %s", (new_id,))
new_action = cursor.fetchone()
return DeviceActionResponse(**new_action)
except MySQLError as e:
if conn:
conn.rollback()
2025-09-08 17:34:23 +08:00
raise Exception(f"新增记录失败: {str(e)}") from e
2025-09-04 12:29:27 +08:00
finally:
db.close_connection(conn, cursor)
# ------------------------------
2025-09-08 17:34:23 +08:00
# 接口: 分页查询操作记录列表(仅返回 total + device_actions
2025-09-04 12:29:27 +08:00
# ------------------------------
@router.get("/list", response_model=APIResponse, summary="分页查询设备操作记录")
async def get_device_action_list(
2025-09-08 17:34:23 +08:00
page: int = Query(1, ge=1, description="页码、默认1"),
page_size: int = Query(10, ge=1, le=100, description="每页条数、1-100"),
2025-09-04 12:29:27 +08:00
client_ip: str = Query(None, description="按客户端IP筛选"),
2025-09-08 17:34:23 +08:00
action: int = Query(None, ge=0, le=1, description="按状态筛选0=离线、1=上线)")
2025-09-04 12:29:27 +08:00
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
2025-09-08 17:34:23 +08:00
# 1. 构建筛选条件(参数化查询、避免注入)
2025-09-04 12:29:27 +08:00
where_clause = []
params = []
if client_ip:
where_clause.append("client_ip = %s")
params.append(client_ip)
if action is not None:
where_clause.append("action = %s")
params.append(action)
# 2. 查询总记录数(用于返回 total
count_sql = "SELECT COUNT(*) AS total FROM device_action"
if where_clause:
count_sql += " WHERE " + " AND ".join(where_clause)
cursor.execute(count_sql, params)
total = cursor.fetchone()["total"]
2025-09-08 17:34:23 +08:00
# 3. 分页查询记录(按创建时间倒序、确保最新记录在前)
2025-09-04 12:29:27 +08:00
offset = (page - 1) * page_size
list_sql = "SELECT * FROM device_action"
if where_clause:
list_sql += " WHERE " + " AND ".join(where_clause)
list_sql += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
2025-09-08 17:34:23 +08:00
params.extend([page_size, offset]) # 追加分页参数page/page_size仅用于查询、不返回
2025-09-04 12:29:27 +08:00
cursor.execute(list_sql, params)
action_list = cursor.fetchall()
# 4. 仅返回 total + device_actions
return APIResponse(
code=200,
message="查询成功",
data=DeviceActionListResponse(
total=total,
device_actions=[DeviceActionResponse(**item) for item in action_list]
)
)
except MySQLError as e:
2025-09-08 17:34:23 +08:00
raise Exception(f"查询记录失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
@router.get("/{client_ip}", response_model=APIResponse, summary="根据IP查询设备操作记录")
async def get_device_actions_by_ip(
client_ip: str = Path(..., description="客户端IP地址")
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 查询总记录数
count_sql = "SELECT COUNT(*) AS total FROM device_action WHERE client_ip = %s"
cursor.execute(count_sql, (client_ip,))
total = cursor.fetchone()["total"]
# 2. 查询该IP的所有记录按创建时间倒序
list_sql = """
SELECT * FROM device_action
WHERE client_ip = %s
ORDER BY created_at DESC
"""
cursor.execute(list_sql, (client_ip,))
action_list = cursor.fetchall()
# 3. 返回结果
return APIResponse(
code=200,
message="查询成功",
data=DeviceActionListResponse(
total=total,
device_actions=[DeviceActionResponse(**item) for item in action_list]
)
)
except MySQLError as e:
raise Exception(f"查询记录失败: {str(e)}") from e
2025-09-04 12:29:27 +08:00
finally:
2025-09-08 17:34:23 +08:00
db.close_connection(conn, cursor)