diff --git a/schema/sensitive_schema.py b/schema/sensitive_schema.py new file mode 100644 index 0000000..7ba5ab7 --- /dev/null +++ b/schema/sensitive_schema.py @@ -0,0 +1,30 @@ +from datetime import datetime +from pydantic import BaseModel, Field + + +# ------------------------------ +# 请求模型(前端传参校验) +# ------------------------------ +class SensitiveCreateRequest(BaseModel): + """创建敏感信息记录请求模型""" + id: int = Field(..., description="主键ID") + name: str = Field(None, max_length=255, description="名称") + + +class SensitiveUpdateRequest(BaseModel): + """更新敏感信息记录请求模型""" + name: str = Field(None, max_length=255, description="名称") + + +# ------------------------------ +# 响应模型(后端返回数据) +# ------------------------------ +class SensitiveResponse(BaseModel): + """敏感信息记录响应模型""" + id: int = Field(..., description="主键ID") + name: str = Field(None, description="名称") + created_at: datetime = Field(..., description="记录创建时间") + updated_at: datetime = Field(..., description="记录更新时间") + + # 支持从数据库查询结果转换 + model_config = {"from_attributes": True} diff --git a/service/sensitive_service.py b/service/sensitive_service.py new file mode 100644 index 0000000..5e1ad07 --- /dev/null +++ b/service/sensitive_service.py @@ -0,0 +1,266 @@ +from fastapi import APIRouter, Depends, HTTPException +from mysql.connector import Error as MySQLError + +from ds.db import db +from schema.sensitive_schema import SensitiveCreateRequest, SensitiveUpdateRequest, SensitiveResponse +from schema.response_schema import APIResponse +from middle.auth_middleware import get_current_user +from schema.user_schema import UserResponse + +# 创建敏感信息接口路由(前缀 /sensitives、标签用于 Swagger 分类) +router = APIRouter( + prefix="/sensitives", + tags=["敏感信息管理"] +) + + +# ------------------------------ +# 1. 创建敏感信息记录 +# ------------------------------ +@router.post("", response_model=APIResponse, summary="创建敏感信息记录") +async def create_sensitive( + sensitive: SensitiveCreateRequest, + current_user: UserResponse = Depends(get_current_user) # 需登录认证 +): + """ + 创建敏感信息记录: + - 需登录认证 + - 插入新的敏感信息记录到数据库 + - 返回创建成功信息 + """ + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + # 1. 检查ID是否已存在 + check_query = "SELECT id FROM sensitive WHERE id = %s" + cursor.execute(check_query, (sensitive.id,)) + existing_sensitive = cursor.fetchone() + if existing_sensitive: + raise HTTPException( + status_code=400, + detail=f"ID为 {sensitive.id} 的敏感信息记录已存在" + ) + + # 2. 插入新敏感信息记录到数据库 + insert_query = """ + INSERT INTO sensitive (id, name) + VALUES (%s, %s) + """ + cursor.execute(insert_query, (sensitive.id, sensitive.name)) + conn.commit() + + # 3. 查询刚创建的记录并返回 + select_query = "SELECT * FROM sensitive WHERE id = %s" + cursor.execute(select_query, (sensitive.id,)) + created_sensitive = cursor.fetchone() + + return APIResponse( + code=201, # 201 表示资源创建成功 + message="敏感信息记录创建成功", + data=SensitiveResponse(**created_sensitive) + ) + except MySQLError as e: + if conn: + conn.rollback() + raise Exception(f"创建敏感信息记录失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor) + + +# ------------------------------ +# 2. 获取单个敏感信息记录 +# ------------------------------ +@router.get("/{sensitive_id}", response_model=APIResponse, summary="获取单个敏感信息记录") +async def get_sensitive( + sensitive_id: int, + current_user: UserResponse = Depends(get_current_user) # 需登录认证 +): + """ + 获取单个敏感信息记录: + - 需登录认证 + - 根据ID查询敏感信息记录 + - 返回查询到的敏感信息 + """ + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + query = "SELECT * FROM sensitive WHERE id = %s" + cursor.execute(query, (sensitive_id,)) + sensitive = cursor.fetchone() + + if not sensitive: + raise HTTPException( + status_code=404, + detail=f"ID为 {sensitive_id} 的敏感信息记录不存在" + ) + + return APIResponse( + code=200, + message="敏感信息记录查询成功", + data=SensitiveResponse(**sensitive) + ) + except MySQLError as e: + raise Exception(f"查询敏感信息记录失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor) + + +# ------------------------------ +# 3. 获取所有敏感信息记录 +# ------------------------------ +@router.get("", response_model=APIResponse, summary="获取所有敏感信息记录") +async def get_all_sensitives( + current_user: UserResponse = Depends(get_current_user) # 需登录认证 +): + """ + 获取所有敏感信息记录: + - 需登录认证 + - 查询所有敏感信息记录(不需要分页) + - 返回所有敏感信息列表 + """ + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + query = "SELECT * FROM sensitive ORDER BY id" + cursor.execute(query) + sensitives = cursor.fetchall() + + return APIResponse( + code=200, + message="所有敏感信息记录查询成功", + data=[SensitiveResponse(**sensitive) for sensitive in sensitives] + ) + except MySQLError as e: + raise Exception(f"查询所有敏感信息记录失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor) + + +# ------------------------------ +# 4. 更新敏感信息记录 +# ------------------------------ +@router.put("/{sensitive_id}", response_model=APIResponse, summary="更新敏感信息记录") +async def update_sensitive( + sensitive_id: int, + sensitive_update: SensitiveUpdateRequest, + current_user: UserResponse = Depends(get_current_user) # 需登录认证 +): + """ + 更新敏感信息记录: + - 需登录认证 + - 根据ID更新敏感信息记录 + - 返回更新后的敏感信息 + """ + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + # 1. 检查记录是否存在 + check_query = "SELECT id FROM sensitive WHERE id = %s" + cursor.execute(check_query, (sensitive_id,)) + existing_sensitive = cursor.fetchone() + if not existing_sensitive: + raise HTTPException( + status_code=404, + detail=f"ID为 {sensitive_id} 的敏感信息记录不存在" + ) + + # 2. 构建更新语句(只更新提供的字段) + update_fields = [] + params = [] + + if sensitive_update.name is not None: + update_fields.append("name = %s") + params.append(sensitive_update.name) + + if not update_fields: + raise HTTPException( + status_code=400, + detail="至少需要提供一个字段进行更新" + ) + + params.append(sensitive_id) # WHERE条件的参数 + + update_query = f""" + UPDATE sensitive + SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP + WHERE id = %s + """ + cursor.execute(update_query, params) + conn.commit() + + # 3. 查询更新后的记录并返回 + select_query = "SELECT * FROM sensitive WHERE id = %s" + cursor.execute(select_query, (sensitive_id,)) + updated_sensitive = cursor.fetchone() + + return APIResponse( + code=200, + message="敏感信息记录更新成功", + data=SensitiveResponse(**updated_sensitive) + ) + except MySQLError as e: + if conn: + conn.rollback() + raise Exception(f"更新敏感信息记录失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor) + + +# ------------------------------ +# 5. 删除敏感信息记录 +# ------------------------------ +@router.delete("/{sensitive_id}", response_model=APIResponse, summary="删除敏感信息记录") +async def delete_sensitive( + sensitive_id: int, + current_user: UserResponse = Depends(get_current_user) # 需登录认证 +): + """ + 删除敏感信息记录: + - 需登录认证 + - 根据ID删除敏感信息记录 + - 返回删除成功信息 + """ + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + # 1. 检查记录是否存在 + check_query = "SELECT id FROM sensitive WHERE id = %s" + cursor.execute(check_query, (sensitive_id,)) + existing_sensitive = cursor.fetchone() + if not existing_sensitive: + raise HTTPException( + status_code=404, + detail=f"ID为 {sensitive_id} 的敏感信息记录不存在" + ) + + # 2. 执行删除操作 + delete_query = "DELETE FROM sensitive WHERE id = %s" + cursor.execute(delete_query, (sensitive_id,)) + conn.commit() + + return APIResponse( + code=200, + message=f"ID为 {sensitive_id} 的敏感信息记录删除成功", + data=None + ) + except MySQLError as e: + if conn: + conn.rollback() + raise Exception(f"删除敏感信息记录失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor)