Files
video/service/face_service.py

373 lines
13 KiB
Python
Raw Normal View History

from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Query, Request
from fastapi.responses import FileResponse
2025-09-03 20:07:38 +08:00
from mysql.connector import Error as MySQLError
import os
from pathlib import Path
2025-09-03 20:07:38 +08:00
from ds.db import db
from schema.face_schema import (
FaceCreateRequest,
FaceUpdateRequest,
FaceResponse,
FaceListResponse
)
2025-09-03 20:07:38 +08:00
from schema.response_schema import APIResponse
from util.face_util import add_binary_data, get_average_feature
from util.file_util import save_face_to_up_images
2025-09-03 20:07:38 +08:00
router = APIRouter(prefix="/faces", tags=["人脸管理"])
2025-09-04 10:46:05 +08:00
2025-09-03 20:07:38 +08:00
# ------------------------------
# 1. 创建人脸记录(使用修复后的路径)
2025-09-03 20:07:38 +08:00
# ------------------------------
@router.post("", response_model=APIResponse, summary="创建人脸记录")
2025-09-03 20:07:38 +08:00
async def create_face(
request: Request,
2025-09-03 21:41:26 +08:00
name: str = Form(None, max_length=255, description="名称(可选)"),
file: UploadFile = File(..., description="人脸文件(必传)")
2025-09-03 20:07:38 +08:00
):
conn = None
cursor = None
try:
2025-09-03 21:41:26 +08:00
face_create = FaceCreateRequest(name=name)
client_ip = request.client.host if request.client else ""
if not client_ip:
raise HTTPException(status_code=400, detail="无法获取客户端IP")
2025-09-03 21:41:26 +08:00
2025-09-03 20:07:38 +08:00
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 读取图片并保存(使用修复后的路径逻辑)
2025-09-03 21:41:26 +08:00
file_content = await file.read()
file_ext = file.filename.split(".")[-1].lower() if "." in file.filename else "jpg"
save_result = save_face_to_up_images(
client_ip=client_ip,
face_name=name,
image_bytes=file_content,
image_format=file_ext
)
if not save_result["success"]:
raise HTTPException(status_code=500, detail=f"图片保存失败:{save_result['msg']}")
db_image_path = save_result["db_path"] # 从修复后的方法获取路径
2025-09-03 21:41:26 +08:00
# 提取人脸特征
detect_success, detect_result = add_binary_data(file_content)
if not detect_success:
raise HTTPException(status_code=400, detail=f"人脸检测失败:{detect_result}")
eigenvalue = detect_result
2025-09-03 21:41:26 +08:00
# 插入数据库
2025-09-03 20:07:38 +08:00
insert_query = """
INSERT INTO face (name, eigenvalue, address)
VALUES (%s, %s, %s)
2025-09-03 20:07:38 +08:00
"""
cursor.execute(insert_query, (face_create.name, str(eigenvalue), db_image_path))
2025-09-03 20:07:38 +08:00
conn.commit()
# 查询新记录
cursor.execute("""
SELECT id, name, address, created_at, updated_at
FROM face
WHERE id = LAST_INSERT_ID()
""")
2025-09-03 20:07:38 +08:00
created_face = cursor.fetchone()
2025-09-04 22:59:27 +08:00
if not created_face:
raise HTTPException(status_code=500, detail="创建成功但无法获取记录")
2025-09-04 22:59:27 +08:00
2025-09-03 20:07:38 +08:00
return APIResponse(
2025-09-03 21:41:26 +08:00
code=201,
message=f"人脸记录创建成功ID: {created_face['id']}",
data=FaceResponse(**created_face)
2025-09-03 20:07:38 +08:00
)
except MySQLError as e:
if conn:
conn.rollback()
raise HTTPException(status_code=500, detail=f"创建失败: {str(e)}") from e
2025-09-04 22:59:27 +08:00
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") from e
2025-09-03 20:07:38 +08:00
finally:
await file.close()
2025-09-03 20:07:38 +08:00
db.close_connection(conn, cursor)
# 其他接口(获取单条/列表、更新、删除、获取图片)与之前一致,无需修改
2025-09-03 20:07:38 +08:00
# ------------------------------
# 2. 获取单个人脸记录
2025-09-03 20:07:38 +08:00
# ------------------------------
@router.get("/{face_id}", response_model=APIResponse, summary="获取单个人脸记录")
async def get_face(face_id: int):
2025-09-03 20:07:38 +08:00
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = """
SELECT id, name, address, created_at, updated_at
FROM face
WHERE id = %s
"""
2025-09-03 20:07:38 +08:00
cursor.execute(query, (face_id,))
face = cursor.fetchone()
if not face:
raise HTTPException(status_code=404, detail=f"ID为 {face_id} 的记录不存在")
2025-09-03 20:07:38 +08:00
return APIResponse(
code=200,
message="查询成功",
2025-09-03 20:07:38 +08:00
data=FaceResponse(**face)
)
except MySQLError as e:
raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}") from e
2025-09-03 20:07:38 +08:00
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 3. 获取人脸列表
2025-09-03 20:07:38 +08:00
# ------------------------------
@router.get("", response_model=APIResponse, summary="获取人脸列表(分页+筛选)")
async def get_face_list(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
name: str = Query(None),
has_eigenvalue: bool = Query(None)
2025-09-03 20:07:38 +08:00
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
where_clause = []
params = []
if name:
where_clause.append("name LIKE %s")
params.append(f"%{name}%")
if has_eigenvalue is not None:
where_clause.append("eigenvalue IS NOT NULL" if has_eigenvalue else "eigenvalue IS NULL")
# 总记录数
count_query = "SELECT COUNT(*) AS total FROM face"
if where_clause:
count_query += " WHERE " + " AND ".join(where_clause)
cursor.execute(count_query, params)
total = cursor.fetchone()["total"]
# 列表数据
offset = (page - 1) * page_size
list_query = """
SELECT id, name, address, created_at, updated_at
FROM face
"""
if where_clause:
list_query += " WHERE " + " AND ".join(where_clause)
list_query += " ORDER BY id DESC LIMIT %s OFFSET %s"
params.extend([page_size, offset])
cursor.execute(list_query, params)
face_list = cursor.fetchall()
2025-09-03 20:07:38 +08:00
return APIResponse(
code=200,
message=f"获取成功(共{total}条)",
data=FaceListResponse(
total=total,
faces=[FaceResponse(**face) for face in face_list]
)
2025-09-03 20:07:38 +08:00
)
except MySQLError as e:
raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}") from e
2025-09-03 20:07:38 +08:00
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 4. 更新人脸记录
2025-09-03 20:07:38 +08:00
# ------------------------------
@router.put("/{face_id}", response_model=APIResponse, summary="更新人脸记录")
async def update_face(face_id: int, face_update: FaceUpdateRequest):
2025-09-03 20:07:38 +08:00
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, address FROM face WHERE id = %s", (face_id,))
exist_face = cursor.fetchone()
if not exist_face:
raise HTTPException(status_code=404, detail=f"ID为 {face_id} 的记录不存在")
old_db_path = exist_face["address"]
2025-09-03 20:07:38 +08:00
update_fields = []
params = []
if face_update.name is not None:
update_fields.append("name = %s")
params.append(face_update.name)
if face_update.eigenvalue is not None:
update_fields.append("eigenvalue = %s")
params.append(face_update.eigenvalue)
if face_update.address is not None:
# 删除旧图片(相对路径转绝对路径)
if old_db_path:
old_abs_path = Path(old_db_path).resolve()
if old_abs_path.exists():
try:
old_abs_path.unlink() # 使用Path方法删除更安全
print(f"[FaceRouter] 已删除旧图片:{old_abs_path}")
except Exception as e:
print(f"[FaceRouter] 删除旧图片失败:{str(e)}")
update_fields.append("address = %s")
params.append(face_update.address)
2025-09-03 20:07:38 +08:00
if not update_fields:
2025-09-03 21:41:26 +08:00
raise HTTPException(status_code=400, detail="至少需提供一个更新字段")
2025-09-03 20:07:38 +08:00
2025-09-03 21:41:26 +08:00
params.append(face_id)
update_query = f"UPDATE face SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
2025-09-03 20:07:38 +08:00
cursor.execute(update_query, params)
conn.commit()
cursor.execute("""
SELECT id, name, address, created_at, updated_at
FROM face
WHERE id = %s
""", (face_id,))
2025-09-03 20:07:38 +08:00
updated_face = cursor.fetchone()
return APIResponse(
code=200,
message="更新成功",
2025-09-03 20:07:38 +08:00
data=FaceResponse(**updated_face)
)
except MySQLError as e:
if conn:
conn.rollback()
raise HTTPException(status_code=500, detail=f"更新失败: {str(e)}") from e
2025-09-03 20:07:38 +08:00
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 5. 删除人脸记录
2025-09-03 20:07:38 +08:00
# ------------------------------
@router.delete("/{face_id}", response_model=APIResponse, summary="删除人脸记录")
async def delete_face(face_id: int):
2025-09-03 20:07:38 +08:00
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, address FROM face WHERE id = %s", (face_id,))
exist_face = cursor.fetchone()
if not exist_face:
raise HTTPException(status_code=404, detail=f"ID为 {face_id} 的记录不存在")
old_db_path = exist_face["address"]
2025-09-03 20:07:38 +08:00
cursor.execute("DELETE FROM face WHERE id = %s", (face_id,))
2025-09-03 20:07:38 +08:00
conn.commit()
# 删除图片
extra_msg = ""
if old_db_path:
old_abs_path = Path(old_db_path).resolve()
if old_abs_path.exists():
try:
old_abs_path.unlink()
print(f"[FaceRouter] 已删除图片:{old_abs_path}")
extra_msg = "(已同步删除图片)"
except Exception as e:
print(f"[FaceRouter] 删除图片失败:{str(e)}")
extra_msg = "(图片删除失败)"
else:
extra_msg = "(图片不存在)"
else:
extra_msg = "(无关联图片)"
2025-09-03 20:07:38 +08:00
return APIResponse(
code=200,
message=f"ID为 {face_id} 的记录删除成功 {extra_msg}",
2025-09-03 20:07:38 +08:00
data=None
)
except MySQLError as e:
if conn:
conn.rollback()
raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}") from e
2025-09-03 20:07:38 +08:00
finally:
db.close_connection(conn, cursor)
2025-09-03 22:06:55 +08:00
# ------------------------------
# 6. 获取人脸图片
# ------------------------------
@router.get("/{face_id}/image", summary="获取人脸图片")
async def get_face_image(face_id: int):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT address, name FROM face WHERE id = %s"
cursor.execute(query, (face_id,))
face = cursor.fetchone()
if not face:
raise HTTPException(status_code=404, detail=f"ID为 {face_id} 的记录不存在")
db_path = face["address"]
abs_path = Path(db_path).resolve() # 转为绝对路径
if not db_path or not abs_path.exists():
raise HTTPException(status_code=404, detail=f"图片不存在(路径:{db_path}")
return FileResponse(
path=abs_path,
filename=f"face_{face_id}_{face['name'] or '未命名'}.{db_path.split('.')[-1]}",
media_type=f"image/{db_path.split('.')[-1]}"
)
except MySQLError as e:
raise HTTPException(status_code=500, detail=f"获取图片失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 内部工具方法
# ------------------------------
2025-09-03 22:06:55 +08:00
def get_all_face_name_with_eigenvalue() -> dict:
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT name, eigenvalue FROM face WHERE name IS NOT NULL"
cursor.execute(query)
faces = cursor.fetchall()
2025-09-03 22:06:55 +08:00
2025-09-04 10:46:05 +08:00
name_to_eigenvalues = {}
for face in faces:
name = face["name"]
eigenvalue = face["eigenvalue"]
if name in name_to_eigenvalues:
name_to_eigenvalues[name].append(eigenvalue)
else:
name_to_eigenvalues[name] = [eigenvalue]
face_dict = {}
for name, eigenvalues in name_to_eigenvalues.items():
if len(eigenvalues) > 1:
2025-09-04 22:59:27 +08:00
face_dict[name] = get_average_feature(eigenvalues)
2025-09-04 10:46:05 +08:00
else:
face_dict[name] = eigenvalues[0]
2025-09-03 22:06:55 +08:00
return face_dict
except MySQLError as e:
raise Exception(f"获取人脸特征失败: {str(e)}") from e
2025-09-03 22:06:55 +08:00
finally:
2025-09-04 22:59:27 +08:00
db.close_connection(conn, cursor)