Files
video/service/face_service.py
2025-09-08 17:34:23 +08:00

341 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from mysql.connector import Error as MySQLError
from ds.db import db
from schema.face_schema import FaceCreateRequest, FaceUpdateRequest, FaceResponse
from schema.response_schema import APIResponse
from middle.auth_middleware import get_current_user
from schema.user_schema import UserResponse
from util.face_util import add_binary_data,get_average_feature
#初始化实例
router = APIRouter(
prefix="/faces",
tags=["人脸管理"]
)
# ------------------------------
# 1. 创建人脸记录(核心修正: ID 数据库自增、前端无需传)
# ------------------------------
@router.post("", response_model=APIResponse, summary="创建人脸记录(传名称+文件、ID自增")
async def create_face(
# 前端仅需传: name可选、Form格式、file必传、文件
name: str = Form(None, max_length=255, description="名称(可选)"),
file: UploadFile = File(..., description="人脸文件(必传、暂不处理内容)")
):
"""
创建人脸记录:
- 需登录认证
- 前端传参: multipart/form-data 表单name 可选、file 必传)
- ID 由数据库自动生成、无需前端传入
- 暂不处理文件内容、eigenvalue 设为 None
"""
# 调用你的方法
conn = None
cursor = None
try:
# 1. 用模型校验 name仅校验长度、无需ID
face_create = FaceCreateRequest(name=name)
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 把文件转为二进制数组
file_content = await file.read()
# 计算特征值
flag, eigenvalue = add_binary_data(file_content)
if flag == False:
raise HTTPException(
status_code=500,
detail="未检测到人脸"
)
# 打印数组长度
print(f"文件大小: {len(file_content)} 字节")
# 2. 插入数据库: 无需传 ID自增、只传 name 和 eigenvalueNone
insert_query = """
INSERT INTO face (name, eigenvalue)
VALUES (%s, %s)
"""
cursor.execute(insert_query, (face_create.name, str(eigenvalue)))
conn.commit()
# 3. 获取数据库自动生成的 ID关键: 用 LAST_INSERT_ID() 查刚插入的记录)
select_new_query = "SELECT * FROM face WHERE id = LAST_INSERT_ID()"
cursor.execute(select_new_query)
created_face = cursor.fetchone()
if not created_face:
raise HTTPException(
status_code=500,
detail="创建人脸记录成功、但无法获取新创建的记录"
)
return APIResponse(
code=201,
message=f"人脸记录创建成功ID: {created_face['id']}、文件名: {file.filename}",
data=FaceResponse(** created_face)
)
except MySQLError as e:
if conn:
conn.rollback()
# 改为使用HTTPException
raise HTTPException(
status_code=500,
detail=f"创建人脸记录失败: {str(e)}"
) from e
except Exception as e:
# 捕获其他可能的异常
raise HTTPException(
status_code=500,
detail=f"服务器错误: {str(e)}"
) from e
finally:
await file.close() # 关闭文件流
db.close_connection(conn, cursor)
# 调用人脸识别得到特征值(这里可以添加你的人脸识别逻辑)
flag, eigenvalue = add_binary_data(file_content)
if flag == False:
raise HTTPException(
status_code=500,
detail="未检测到人脸"
)
# 将 eigenvalue 转为 str
eigenvalue = str(eigenvalue)
# ------------------------------
# 2. 获取单个人脸记录不变、用自增ID查询
# ------------------------------
@router.get("/{face_id}", response_model=APIResponse, summary="获取单个人脸记录")
async def get_face(
face_id: int, # 这里的 ID 是数据库自增的、前端从创建响应中获取
current_user: UserResponse = Depends(get_current_user)
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM face WHERE id = %s"
cursor.execute(query, (face_id,))
face = cursor.fetchone()
if not face:
raise HTTPException(
status_code=404,
detail=f"ID为 {face_id} 的人脸记录不存在"
)
return APIResponse(
code=200,
message="人脸记录查询成功",
data=FaceResponse(**face)
)
except MySQLError as e:
# 改为使用HTTPException
raise HTTPException(
status_code=500,
detail=f"查询人脸记录失败: {str(e)}"
) from e
finally:
db.close_connection(conn, cursor)
# 后续 3.获取所有、4.更新、5.删除 接口(修复异常处理)
# ------------------------------
# 3. 获取所有人脸记录(不变)
# ------------------------------
@router.get("", response_model=APIResponse, summary="获取所有人脸记录")
async def get_all_faces(
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM face ORDER BY id" # 按自增ID排序
cursor.execute(query)
faces = cursor.fetchall()
return APIResponse(
code=200,
message="所有人脸记录查询成功",
data=[FaceResponse(** face) for face in faces]
)
except MySQLError as e:
raise HTTPException(
status_code=500,
detail=f"查询所有人脸记录失败: {str(e)}"
) from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 4. 更新人脸记录不变、用自增ID更新
# ------------------------------
@router.put("/{face_id}", response_model=APIResponse, summary="更新人脸记录")
async def update_face(
face_id: int,
face_update: FaceUpdateRequest,
current_user: UserResponse = Depends(get_current_user)
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 检查记录是否存在
check_query = "SELECT id FROM face WHERE id = %s"
cursor.execute(check_query, (face_id,))
existing_face = cursor.fetchone()
if not existing_face:
raise HTTPException(
status_code=404,
detail=f"ID为 {face_id} 的人脸记录不存在"
)
# 构建更新语句
update_fields = []
params = []
if face_update.name is not None:
update_fields.append("name = %s")
params.append(face_update.name)
if face_update.eigenvalue is not None:
update_fields.append("eigenvalue = %s")
params.append(face_update.eigenvalue)
if not update_fields:
raise HTTPException(status_code=400, detail="至少需提供一个更新字段")
params.append(face_id)
update_query = f"UPDATE face SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
cursor.execute(update_query, params)
conn.commit()
# 查询更新后记录
select_query = "SELECT * FROM face WHERE id = %s"
cursor.execute(select_query, (face_id,))
updated_face = cursor.fetchone()
return APIResponse(
code=200,
message="人脸记录更新成功",
data=FaceResponse(**updated_face)
)
except MySQLError as e:
if conn:
conn.rollback()
raise HTTPException(
status_code=500,
detail=f"更新人脸记录失败: {str(e)}"
) from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 5. 删除人脸记录不变、用自增ID删除
# ------------------------------
@router.delete("/{face_id}", response_model=APIResponse, summary="删除人脸记录")
async def delete_face(
face_id: int,
current_user: UserResponse = Depends(get_current_user)
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
check_query = "SELECT id FROM face WHERE id = %s"
cursor.execute(check_query, (face_id,))
existing_face = cursor.fetchone()
if not existing_face:
raise HTTPException(
status_code=404,
detail=f"ID为 {face_id} 的人脸记录不存在"
)
delete_query = "DELETE FROM face WHERE id = %s"
cursor.execute(delete_query, (face_id,))
conn.commit()
return APIResponse(
code=200,
message=f"ID为 {face_id} 的人脸记录删除成功",
data=None
)
except MySQLError as e:
if conn:
conn.rollback()
raise HTTPException(
status_code=500,
detail=f"删除人脸记录失败: {str(e)}"
) from e
finally:
db.close_connection(conn, cursor)
def get_all_face_name_with_eigenvalue() -> dict:
"""
获取所有人脸的名称及其对应的特征值、组成字典返回
key: 人脸名称name
value: 人脸特征值eigenvalue、若名称重复则返回平均特征值
注: 过滤掉name为None的记录、避免字典key为None的情况
"""
conn = None
cursor = None
try:
# 1. 建立数据库连接并获取游标dictionary=True使结果以字典形式返回
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 2. 执行SQL查询: 只获取name非空的记录、减少数据传输
query = "SELECT name, eigenvalue FROM face WHERE name IS NOT NULL"
cursor.execute(query)
faces = cursor.fetchall() # 返回结果: 列表套字典、如 [{"name":"张三","eigenvalue":...}, ...]
# 3. 收集同一名称对应的所有特征值(处理名称重复场景)
name_to_eigenvalues = {}
for face in faces:
name = face["name"]
eigenvalue = face["eigenvalue"]
# 若名称已存在、追加特征值;否则新建列表存储
if name in name_to_eigenvalues:
name_to_eigenvalues[name].append(eigenvalue)
else:
name_to_eigenvalues[name] = [eigenvalue]
# 4. 构建最终字典: 重复名称取平均、唯一名称直接取特征值
face_dict = {}
for name, eigenvalues in name_to_eigenvalues.items():
# 处理特征值: 多个则求平均、单个则直接使用
if len(eigenvalues) > 1:
# 调用外部方法计算平均特征值需确保binary_face_feature_handler已正确导入
face_dict[name] = get_average_feature(eigenvalues)
else:
# 取列表中唯一的特征值避免value为列表类型
face_dict[name] = eigenvalues[0]
return face_dict
except MySQLError as e:
# 捕获数据库异常、添加上下文信息后重新抛出(便于定位问题)
raise Exception(f"获取人脸名称与特征值失败: {str(e)}") from e
finally:
# 5. 无论是否异常、均释放数据库连接和游标(避免资源泄漏)
db.close_connection(conn, cursor)