Files
video/service/sensitive_service.py

267 lines
8.5 KiB
Python
Raw Normal View History

2025-09-03 20:47:24 +08:00
from fastapi import APIRouter, Depends, HTTPException
from mysql.connector import Error as MySQLError
from ds.db import db
from schema.sensitive_schema import SensitiveCreateRequest, SensitiveUpdateRequest, SensitiveResponse
from schema.response_schema import APIResponse
from middle.auth_middleware import get_current_user
from schema.user_schema import UserResponse
# 创建敏感信息接口路由(前缀 /sensitives、标签用于 Swagger 分类)
router = APIRouter(
prefix="/sensitives",
tags=["敏感信息管理"]
)
# ------------------------------
# 1. 创建敏感信息记录
# ------------------------------
@router.post("", response_model=APIResponse, summary="创建敏感信息记录")
async def create_sensitive(
sensitive: SensitiveCreateRequest,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
创建敏感信息记录
- 需登录认证
- 插入新的敏感信息记录到数据库
- 返回创建成功信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 检查ID是否已存在
check_query = "SELECT id FROM sensitive WHERE id = %s"
cursor.execute(check_query, (sensitive.id,))
existing_sensitive = cursor.fetchone()
if existing_sensitive:
raise HTTPException(
status_code=400,
detail=f"ID为 {sensitive.id} 的敏感信息记录已存在"
)
# 2. 插入新敏感信息记录到数据库
insert_query = """
INSERT INTO sensitive (id, name)
VALUES (%s, %s)
"""
cursor.execute(insert_query, (sensitive.id, sensitive.name))
conn.commit()
# 3. 查询刚创建的记录并返回
select_query = "SELECT * FROM sensitive WHERE id = %s"
cursor.execute(select_query, (sensitive.id,))
created_sensitive = cursor.fetchone()
return APIResponse(
code=201, # 201 表示资源创建成功
message="敏感信息记录创建成功",
data=SensitiveResponse(**created_sensitive)
)
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"创建敏感信息记录失败:{str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 2. 获取单个敏感信息记录
# ------------------------------
@router.get("/{sensitive_id}", response_model=APIResponse, summary="获取单个敏感信息记录")
async def get_sensitive(
sensitive_id: int,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
获取单个敏感信息记录
- 需登录认证
- 根据ID查询敏感信息记录
- 返回查询到的敏感信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM sensitive WHERE id = %s"
cursor.execute(query, (sensitive_id,))
sensitive = cursor.fetchone()
if not sensitive:
raise HTTPException(
status_code=404,
detail=f"ID为 {sensitive_id} 的敏感信息记录不存在"
)
return APIResponse(
code=200,
message="敏感信息记录查询成功",
data=SensitiveResponse(**sensitive)
)
except MySQLError as e:
raise Exception(f"查询敏感信息记录失败:{str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 3. 获取所有敏感信息记录
# ------------------------------
@router.get("", response_model=APIResponse, summary="获取所有敏感信息记录")
async def get_all_sensitives(
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
获取所有敏感信息记录
- 需登录认证
- 查询所有敏感信息记录不需要分页
- 返回所有敏感信息列表
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM sensitive ORDER BY id"
cursor.execute(query)
sensitives = cursor.fetchall()
return APIResponse(
code=200,
message="所有敏感信息记录查询成功",
data=[SensitiveResponse(**sensitive) for sensitive in sensitives]
)
except MySQLError as e:
raise Exception(f"查询所有敏感信息记录失败:{str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 4. 更新敏感信息记录
# ------------------------------
@router.put("/{sensitive_id}", response_model=APIResponse, summary="更新敏感信息记录")
async def update_sensitive(
sensitive_id: int,
sensitive_update: SensitiveUpdateRequest,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
更新敏感信息记录
- 需登录认证
- 根据ID更新敏感信息记录
- 返回更新后的敏感信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 检查记录是否存在
check_query = "SELECT id FROM sensitive WHERE id = %s"
cursor.execute(check_query, (sensitive_id,))
existing_sensitive = cursor.fetchone()
if not existing_sensitive:
raise HTTPException(
status_code=404,
detail=f"ID为 {sensitive_id} 的敏感信息记录不存在"
)
# 2. 构建更新语句(只更新提供的字段)
update_fields = []
params = []
if sensitive_update.name is not None:
update_fields.append("name = %s")
params.append(sensitive_update.name)
if not update_fields:
raise HTTPException(
status_code=400,
detail="至少需要提供一个字段进行更新"
)
params.append(sensitive_id) # WHERE条件的参数
update_query = f"""
UPDATE sensitive
SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
cursor.execute(update_query, params)
conn.commit()
# 3. 查询更新后的记录并返回
select_query = "SELECT * FROM sensitive WHERE id = %s"
cursor.execute(select_query, (sensitive_id,))
updated_sensitive = cursor.fetchone()
return APIResponse(
code=200,
message="敏感信息记录更新成功",
data=SensitiveResponse(**updated_sensitive)
)
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"更新敏感信息记录失败:{str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 5. 删除敏感信息记录
# ------------------------------
@router.delete("/{sensitive_id}", response_model=APIResponse, summary="删除敏感信息记录")
async def delete_sensitive(
sensitive_id: int,
current_user: UserResponse = Depends(get_current_user) # 需登录认证
):
"""
删除敏感信息记录
- 需登录认证
- 根据ID删除敏感信息记录
- 返回删除成功信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 检查记录是否存在
check_query = "SELECT id FROM sensitive WHERE id = %s"
cursor.execute(check_query, (sensitive_id,))
existing_sensitive = cursor.fetchone()
if not existing_sensitive:
raise HTTPException(
status_code=404,
detail=f"ID为 {sensitive_id} 的敏感信息记录不存在"
)
# 2. 执行删除操作
delete_query = "DELETE FROM sensitive WHERE id = %s"
cursor.execute(delete_query, (sensitive_id,))
conn.commit()
return APIResponse(
code=200,
message=f"ID为 {sensitive_id} 的敏感信息记录删除成功",
data=None
)
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"删除敏感信息记录失败:{str(e)}") from e
finally:
db.close_connection(conn, cursor)