from fastapi import APIRouter, Depends, HTTPException from mysql.connector import Error as MySQLError from ds.db import db from schema.face_schema import FaceCreateRequest, FaceUpdateRequest, FaceResponse from schema.response_schema import APIResponse from middle.auth_middleware import get_current_user from schema.user_schema import UserResponse # 创建人脸接口路由(前缀 /faces、标签用于 Swagger 分类) router = APIRouter( prefix="/faces", tags=["人脸管理"] ) # ------------------------------ # 1. 创建人脸记录 # ------------------------------ @router.post("", response_model=APIResponse, summary="创建人脸记录") async def create_face( face: FaceCreateRequest ): """ 创建人脸记录: - 需登录认证 - 插入新的人脸记录到数据库 - 返回创建成功信息 """ conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) # 1. 检查ID是否已存在 check_query = "SELECT id FROM face WHERE id = %s" cursor.execute(check_query, (face.id,)) existing_face = cursor.fetchone() if existing_face: raise HTTPException( status_code=400, detail=f"ID为 {face.id} 的人脸记录已存在" ) # 2. 插入新人脸记录到数据库 insert_query = """ INSERT INTO face (id, name, eigenvalue) VALUES (%s, %s, %s) """ cursor.execute(insert_query, (face.id, face.name, face.eigenvalue)) conn.commit() # 3. 查询刚创建的记录并返回 select_query = "SELECT * FROM face WHERE id = %s" cursor.execute(select_query, (face.id,)) created_face = cursor.fetchone() return APIResponse( code=201, # 201 表示资源创建成功 message="人脸记录创建成功", data=FaceResponse(**created_face) ) except MySQLError as e: if conn: conn.rollback() raise Exception(f"创建人脸记录失败:{str(e)}") from e finally: db.close_connection(conn, cursor) # ------------------------------ # 2. 获取单个人脸记录 # ------------------------------ @router.get("/{face_id}", response_model=APIResponse, summary="获取单个人脸记录") async def get_face( face_id: int, current_user: UserResponse = Depends(get_current_user) # 需登录认证 ): """ 获取单个人脸记录: - 需登录认证 - 根据ID查询人脸记录 - 返回查询到的人脸信息 """ conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) query = "SELECT * FROM face WHERE id = %s" cursor.execute(query, (face_id,)) face = cursor.fetchone() if not face: raise HTTPException( status_code=404, detail=f"ID为 {face_id} 的人脸记录不存在" ) return APIResponse( code=200, message="人脸记录查询成功", data=FaceResponse(**face) ) except MySQLError as e: raise Exception(f"查询人脸记录失败:{str(e)}") from e finally: db.close_connection(conn, cursor) # ------------------------------ # 3. 获取所有人脸记录 # ------------------------------ @router.get("", response_model=APIResponse, summary="获取所有人脸记录") async def get_all_faces( ): """ 获取所有人脸记录: - 需登录认证 - 查询所有人脸记录(不需要分页) - 返回所有人脸信息列表 """ conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) query = "SELECT * FROM face ORDER BY id" cursor.execute(query) faces = cursor.fetchall() return APIResponse( code=200, message="所有人脸记录查询成功", data=[FaceResponse(**face) for face in faces] ) except MySQLError as e: raise Exception(f"查询所有人脸记录失败:{str(e)}") from e finally: db.close_connection(conn, cursor) # ------------------------------ # 4. 更新人脸记录 # ------------------------------ @router.put("/{face_id}", response_model=APIResponse, summary="更新人脸记录") async def update_face( face_id: int, face_update: FaceUpdateRequest, current_user: UserResponse = Depends(get_current_user) # 需登录认证 ): """ 更新人脸记录: - 需登录认证 - 根据ID更新人脸记录信息 - 返回更新后的人脸信息 """ conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) # 1. 检查记录是否存在 check_query = "SELECT id FROM face WHERE id = %s" cursor.execute(check_query, (face_id,)) existing_face = cursor.fetchone() if not existing_face: raise HTTPException( status_code=404, detail=f"ID为 {face_id} 的人脸记录不存在" ) # 2. 构建更新语句(只更新提供的字段) update_fields = [] params = [] if face_update.name is not None: update_fields.append("name = %s") params.append(face_update.name) if face_update.eigenvalue is not None: update_fields.append("eigenvalue = %s") params.append(face_update.eigenvalue) if not update_fields: raise HTTPException( status_code=400, detail="至少需要提供一个字段进行更新" ) params.append(face_id) # WHERE条件的参数 update_query = f""" UPDATE face SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s """ cursor.execute(update_query, params) conn.commit() # 3. 查询更新后的记录并返回 select_query = "SELECT * FROM face WHERE id = %s" cursor.execute(select_query, (face_id,)) updated_face = cursor.fetchone() return APIResponse( code=200, message="人脸记录更新成功", data=FaceResponse(**updated_face) ) except MySQLError as e: if conn: conn.rollback() raise Exception(f"更新人脸记录失败:{str(e)}") from e finally: db.close_connection(conn, cursor) # ------------------------------ # 5. 删除人脸记录 # ------------------------------ @router.delete("/{face_id}", response_model=APIResponse, summary="删除人脸记录") async def delete_face( face_id: int, current_user: UserResponse = Depends(get_current_user) # 需登录认证 ): """ 删除人脸记录: - 需登录认证 - 根据ID删除人脸记录 - 返回删除成功信息 """ conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) # 1. 检查记录是否存在 check_query = "SELECT id FROM face WHERE id = %s" cursor.execute(check_query, (face_id,)) existing_face = cursor.fetchone() if not existing_face: raise HTTPException( status_code=404, detail=f"ID为 {face_id} 的人脸记录不存在" ) # 2. 执行删除操作 delete_query = "DELETE FROM face WHERE id = %s" cursor.execute(delete_query, (face_id,)) conn.commit() return APIResponse( code=200, message=f"ID为 {face_id} 的人脸记录删除成功", data=None ) except MySQLError as e: if conn: conn.rollback() raise Exception(f"删除人脸记录失败:{str(e)}") from e finally: db.close_connection(conn, cursor)