Files
video/service/sensitive_service.py
2025-09-15 18:35:43 +08:00

356 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, Query
from mysql.connector import Error as MySQLError
from typing import Optional
from ds.db import db
from encryption.encrypt_decorator import encrypt_response
from schema.sensitive_schema import (
SensitiveCreateRequest,
SensitiveUpdateRequest,
SensitiveResponse,
SensitiveListResponse # 导入新增的分页响应模型
)
from schema.response_schema import APIResponse
from middle.auth_middleware import get_current_user
from schema.user_schema import UserResponse
# 创建敏感信息接口路由(前缀 /sensitives、标签用于 Swagger 分类)
router = APIRouter(
prefix="/sensitives",
tags=["敏感信息管理"]
)
# ------------------------------
# 1. 创建敏感信息记录
# ------------------------------
@router.post("", response_model=APIResponse, summary="创建敏感信息记录")
@encrypt_response()
async def create_sensitive(
sensitive: SensitiveCreateRequest,
current_user: UserResponse = Depends(get_current_user) # 补充登录认证依赖(与其他接口保持一致)
):
"""
创建敏感信息记录:
- 需登录认证
- 插入新的敏感信息记录到数据库ID由数据库自动生成
- 返回创建成功信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 插入新敏感信息记录到数据库不包含ID、由数据库自动生成
insert_query = """
INSERT INTO sensitives (name, created_at, updated_at)
VALUES (%s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
"""
cursor.execute(insert_query, (sensitive.name,))
conn.commit()
# 获取刚插入记录的ID使用LAST_INSERT_ID()函数)
new_id = cursor.lastrowid
# 查询刚创建的记录并返回
select_query = "SELECT * FROM sensitives WHERE id = %s"
cursor.execute(select_query, (new_id,))
created_sensitive = cursor.fetchone()
return APIResponse(
code=201, # 201 表示资源创建成功
message="敏感信息记录创建成功",
data=SensitiveResponse(**created_sensitive)
)
except MySQLError as e:
if conn:
conn.rollback()
raise HTTPException(
status_code=500,
detail=f"创建敏感信息记录失败: {str(e)}"
) from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 2. 获取单个敏感信息记录
# ------------------------------
@router.get("/{sensitive_id}", response_model=APIResponse, summary="获取单个敏感信息记录")
@encrypt_response()
async def get_sensitive(
sensitive_id: int,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
获取单个敏感信息记录:
- 需登录认证
- 根据ID查询敏感信息记录
- 返回查询到的敏感信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM sensitives WHERE id = %s"
cursor.execute(query, (sensitive_id,))
sensitive = cursor.fetchone()
if not sensitive:
raise HTTPException(
status_code=404,
detail=f"ID为 {sensitive_id} 的敏感信息记录不存在"
)
return APIResponse(
code=200,
message="敏感信息记录查询成功",
data=SensitiveResponse(**sensitive)
)
except MySQLError as e:
raise HTTPException(
status_code=500,
detail=f"查询敏感信息记录失败: {str(e)}"
) from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 3. 获取敏感信息分页列表(重构:支持分页+关键词搜索)
# ------------------------------
@router.get("", response_model=APIResponse, summary="获取敏感信息分页列表(支持关键词搜索)")
@encrypt_response()
async def get_sensitive_list(
page: int = Query(1, ge=1, description="页码默认1最小1"),
page_size: int = Query(10, ge=1, le=100, description="每页条数默认101-100"),
name: Optional[str] = Query(None, description="敏感词关键词搜索(模糊匹配)")
):
"""
获取敏感信息分页列表:
- 需登录认证
- 支持分页page/page_size和敏感词关键词模糊搜索name
- 返回总记录数+当前页数据
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 构建查询条件(支持关键词搜索)
where_clause = []
params = []
if name:
where_clause.append("name LIKE %s")
params.append(f"%{name}%") # 模糊匹配关键词
# 2. 查询总记录数(用于分页计算)
count_sql = "SELECT COUNT(*) AS total FROM sensitives"
if where_clause:
count_sql += " WHERE " + " AND ".join(where_clause)
cursor.execute(count_sql, params.copy()) # 复制参数列表,避免后续污染
total = cursor.fetchone()["total"]
# 3. 计算分页偏移量
offset = (page - 1) * page_size
# 4. 分页查询敏感词数据(按更新时间倒序,最新的在前)
list_sql = "SELECT * FROM sensitives"
if where_clause:
list_sql += " WHERE " + " AND ".join(where_clause)
# 排序+分页LIMIT 条数 OFFSET 偏移量)
list_sql += " ORDER BY updated_at DESC LIMIT %s OFFSET %s"
# 补充分页参数page_size和offset
params.extend([page_size, offset])
cursor.execute(list_sql, params)
sensitive_list = cursor.fetchall()
# 5. 构造分页响应数据
return APIResponse(
code=200,
message=f"敏感信息列表查询成功(共{total}条记录,当前第{page}页)",
data=SensitiveListResponse(
total=total,
sensitives=[SensitiveResponse(**item) for item in sensitive_list]
)
)
except MySQLError as e:
raise HTTPException(
status_code=500,
detail=f"查询敏感信息列表失败: {str(e)}"
) from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 4. 更新敏感信息记录
# ------------------------------
@router.put("/{sensitive_id}", response_model=APIResponse, summary="更新敏感信息记录")
@encrypt_response()
async def update_sensitive(
sensitive_id: int,
sensitive_update: SensitiveUpdateRequest,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
更新敏感信息记录:
- 需登录认证
- 根据ID更新敏感信息记录
- 返回更新后的敏感信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 检查记录是否存在
check_query = "SELECT id FROM sensitives WHERE id = %s"
cursor.execute(check_query, (sensitive_id,))
existing_sensitive = cursor.fetchone()
if not existing_sensitive:
raise HTTPException(
status_code=404,
detail=f"ID为 {sensitive_id} 的敏感信息记录不存在"
)
# 2. 构建更新语句(只更新提供的字段)
update_fields = []
params = []
if sensitive_update.name is not None:
update_fields.append("name = %s")
params.append(sensitive_update.name)
if not update_fields:
raise HTTPException(
status_code=400,
detail="至少需要提供一个字段进行更新name"
)
# 补充更新时间和WHERE条件参数
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(sensitive_id)
update_query = f"""
UPDATE sensitives
SET {', '.join(update_fields)}
WHERE id = %s
"""
cursor.execute(update_query, params)
conn.commit()
# 3. 查询更新后的记录并返回
select_query = "SELECT * FROM sensitives WHERE id = %s"
cursor.execute(select_query, (sensitive_id,))
updated_sensitive = cursor.fetchone()
return APIResponse(
code=200,
message="敏感信息记录更新成功",
data=SensitiveResponse(**updated_sensitive)
)
except MySQLError as e:
if conn:
conn.rollback()
raise HTTPException(
status_code=500,
detail=f"更新敏感信息记录失败: {str(e)}"
) from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 5. 删除敏感信息记录
# ------------------------------
@router.delete("/{sensitive_id}", response_model=APIResponse, summary="删除敏感信息记录")
@encrypt_response()
async def delete_sensitive(
sensitive_id: int,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
删除敏感信息记录:
- 需登录认证
- 根据ID删除敏感信息记录
- 返回删除成功信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 检查记录是否存在
check_query = "SELECT id FROM sensitives WHERE id = %s"
cursor.execute(check_query, (sensitive_id,))
existing_sensitive = cursor.fetchone()
if not existing_sensitive:
raise HTTPException(
status_code=404,
detail=f"ID为 {sensitive_id} 的敏感信息记录不存在"
)
# 2. 执行删除操作
delete_query = "DELETE FROM sensitives WHERE id = %s"
cursor.execute(delete_query, (sensitive_id,))
conn.commit()
return APIResponse(
code=200,
message=f"ID为 {sensitive_id} 的敏感信息记录删除成功",
data=None
)
except MySQLError as e:
if conn:
conn.rollback()
raise HTTPException(
status_code=500,
detail=f"删除敏感信息记录失败: {str(e)}"
) from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 6. 业务辅助函数:获取所有敏感词(供其他模块调用)
# ------------------------------
def get_all_sensitive_words() -> list[str]:
"""
获取所有敏感词(返回纯字符串列表,用于过滤业务)
返回:
list[str]: 包含所有敏感词的数组
异常:
MySQLError: 数据库操作相关错误
"""
conn = None
cursor = None
try:
# 获取数据库连接
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 执行查询只获取敏感词字段按ID排序
query = "SELECT name FROM sensitives ORDER BY id"
cursor.execute(query)
sensitive_records = cursor.fetchall()
# 提取敏感词到纯字符串数组
return [record['name'] for record in sensitive_records]
except MySQLError as e:
# 数据库错误向上抛出,由调用方处理
raise MySQLError(f"查询敏感词列表失败: {str(e)}") from e
finally:
# 确保数据库连接正确释放
db.close_connection(conn, cursor)