Files
video/service/sensitive_service.py
2025-09-08 17:34:23 +08:00

289 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException
from mysql.connector import Error as MySQLError
from ds.db import db
from schema.sensitive_schema import SensitiveCreateRequest, SensitiveUpdateRequest, SensitiveResponse
from schema.response_schema import APIResponse
from middle.auth_middleware import get_current_user
from schema.user_schema import UserResponse
# 创建敏感信息接口路由(前缀 /sensitives、标签用于 Swagger 分类)
router = APIRouter(
prefix="/sensitives",
tags=["敏感信息管理"]
)
# ------------------------------
# 1. 创建敏感信息记录
# ------------------------------
@router.post("", response_model=APIResponse, summary="创建敏感信息记录")
async def create_sensitive(
sensitive: SensitiveCreateRequest): # 添加了登录认证依赖
"""
创建敏感信息记录:
- 需登录认证
- 插入新的敏感信息记录到数据库ID由数据库自动生成
- 返回创建成功信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 插入新敏感信息记录到数据库不包含ID、由数据库自动生成
insert_query = """
INSERT INTO sensitives (name)
VALUES (%s)
"""
cursor.execute(insert_query, (sensitive.name,))
conn.commit()
# 获取刚插入记录的ID使用LAST_INSERT_ID()函数)
new_id = cursor.lastrowid
# 查询刚创建的记录并返回
select_query = "SELECT * FROM sensitives WHERE id = %s"
cursor.execute(select_query, (new_id,))
created_sensitive = cursor.fetchone()
return APIResponse(
code=201, # 201 表示资源创建成功
message="敏感信息记录创建成功",
data=SensitiveResponse(**created_sensitive)
)
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"创建敏感信息记录失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# 以下接口代码保持不变
# ------------------------------
# 2. 获取单个敏感信息记录
# ------------------------------
@router.get("/{sensitive_id}", response_model=APIResponse, summary="获取单个敏感信息记录")
async def get_sensitive(
sensitive_id: int,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
获取单个敏感信息记录:
- 需登录认证
- 根据ID查询敏感信息记录
- 返回查询到的敏感信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM sensitives WHERE id = %s"
cursor.execute(query, (sensitive_id,))
sensitive = cursor.fetchone()
if not sensitive:
raise HTTPException(
status_code=404,
detail=f"ID为 {sensitive_id} 的敏感信息记录不存在"
)
return APIResponse(
code=200,
message="敏感信息记录查询成功",
data=SensitiveResponse(**sensitive)
)
except MySQLError as e:
raise Exception(f"查询敏感信息记录失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 3. 获取所有敏感信息记录
# ------------------------------
@router.get("", response_model=APIResponse, summary="获取所有敏感信息记录")
async def get_all_sensitives():
"""
获取所有敏感信息记录:
- 需登录认证
- 查询所有敏感信息记录(不需要分页)
- 返回所有敏感信息列表
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM sensitives ORDER BY id"
cursor.execute(query)
sensitives = cursor.fetchall()
return APIResponse(
code=200,
message="所有敏感信息记录查询成功",
data=[SensitiveResponse(**sensitive) for sensitive in sensitives]
)
except MySQLError as e:
raise Exception(f"查询所有敏感信息记录失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 4. 更新敏感信息记录
# ------------------------------
@router.put("/{sensitive_id}", response_model=APIResponse, summary="更新敏感信息记录")
async def update_sensitive(
sensitive_id: int,
sensitive_update: SensitiveUpdateRequest,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
更新敏感信息记录:
- 需登录认证
- 根据ID更新敏感信息记录
- 返回更新后的敏感信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 检查记录是否存在
check_query = "SELECT id FROM sensitives WHERE id = %s"
cursor.execute(check_query, (sensitive_id,))
existing_sensitive = cursor.fetchone()
if not existing_sensitive:
raise HTTPException(
status_code=404,
detail=f"ID为 {sensitive_id} 的敏感信息记录不存在"
)
# 2. 构建更新语句(只更新提供的字段)
update_fields = []
params = []
if sensitive_update.name is not None:
update_fields.append("name = %s")
params.append(sensitive_update.name)
if not update_fields:
raise HTTPException(
status_code=400,
detail="至少需要提供一个字段进行更新"
)
params.append(sensitive_id) # WHERE条件的参数
update_query = f"""
UPDATE sensitives
SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
cursor.execute(update_query, params)
conn.commit()
# 3. 查询更新后的记录并返回
select_query = "SELECT * FROM sensitives WHERE id = %s"
cursor.execute(select_query, (sensitive_id,))
updated_sensitive = cursor.fetchone()
return APIResponse(
code=200,
message="敏感信息记录更新成功",
data=SensitiveResponse(**updated_sensitive)
)
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"更新敏感信息记录失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 5. 删除敏感信息记录
# ------------------------------
@router.delete("/{sensitive_id}", response_model=APIResponse, summary="删除敏感信息记录")
async def delete_sensitive(
sensitive_id: int,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
删除敏感信息记录:
- 需登录认证
- 根据ID删除敏感信息记录
- 返回删除成功信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 检查记录是否存在
check_query = "SELECT id FROM sensitives WHERE id = %s"
cursor.execute(check_query, (sensitive_id,))
existing_sensitive = cursor.fetchone()
if not existing_sensitive:
raise HTTPException(
status_code=404,
detail=f"ID为 {sensitive_id} 的敏感信息记录不存在"
)
# 2. 执行删除操作
delete_query = "DELETE FROM sensitives WHERE id = %s"
cursor.execute(delete_query, (sensitive_id,))
conn.commit()
return APIResponse(
code=200,
message=f"ID为 {sensitive_id} 的敏感信息记录删除成功",
data=None
)
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"删除敏感信息记录失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
def get_all_sensitive_words() -> list[str]:
"""
获取所有敏感词、返回字符串数组
返回:
list[str]: 包含所有敏感词的数组
异常:
MySQLError: 数据库操作相关错误
"""
conn = None
cursor = None
try:
# 获取数据库连接
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 执行查询、只获取敏感词字段
query = "SELECT name FROM sensitives ORDER BY id"
cursor.execute(query)
sensitive_records = cursor.fetchall()
# 提取敏感词到数组中
return [record['name'] for record in sensitive_records]
except MySQLError as e:
# 数据库错误处理
raise MySQLError(f"查询敏感词失败: {str(e)}") from e
finally:
# 确保资源正确释放
db.close_connection(conn, cursor)