Files
video/util/face_util.py

156 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
import insightface
from insightface.app import FaceAnalysis
from io import BytesIO
from PIL import Image
import logging
# 配置日志(便于排查)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - [FaceUtil] - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 全局变量存储InsightFace引擎和特征列表
_insightface_app = None
_feature_list = []
def init_insightface():
"""初始化InsightFace引擎确保成功后再使用"""
global _insightface_app
try:
if _insightface_app is not None:
logger.info("InsightFace引擎已初始化无需重复执行")
return _insightface_app
logger.info("正在初始化InsightFace引擎模型buffalo_l...")
# 手动指定模型下载路径(避免权限问题,可选)
app = FaceAnalysis(
name='buffalo_l',
root='~/.insightface', # 模型默认下载路径
providers=['CPUExecutionProvider'] # 强制用CPU若有GPU可加'CUDAExecutionProvider'
)
app.prepare(ctx_id=0, det_size=(640, 640)) # det_size越大小人脸检测越准
logger.info("InsightFace引擎初始化完成")
_insightface_app = app
return app
except Exception as e:
logger.error(f"InsightFace初始化失败{str(e)}", exc_info=True) # 打印详细堆栈
_insightface_app = None
return None
def add_binary_data(binary_data):
"""
接收单张图片的二进制数据、提取特征并保存
返回:(True, 特征值numpy数组) 或 (False, 错误信息字符串)
"""
global _insightface_app, _feature_list
# 1. 先检查引擎是否初始化成功
if not _insightface_app:
init_result = init_insightface() # 尝试重新初始化
if not init_result:
error_msg = "InsightFace引擎未初始化无法检测人脸"
logger.error(error_msg)
return False, error_msg
try:
# 2. 验证二进制数据有效性
if len(binary_data) < 1024: # 过滤过小的无效图片小于1KB
error_msg = f"图片过小({len(binary_data)}字节),可能不是有效图片"
logger.warning(error_msg)
return False, error_msg
# 3. 二进制数据转CV2格式关键步骤避免通道错误
try:
img = Image.open(BytesIO(binary_data)).convert("RGB") # 强制转RGB
frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) # InsightFace需要BGR格式
except Exception as e:
error_msg = f"图片格式转换失败:{str(e)}"
logger.error(error_msg, exc_info=True)
return False, error_msg
# 4. 检查图片尺寸(避免极端尺寸导致检测失败)
height, width = frame.shape[:2]
if height < 64 or width < 64: # 人脸检测最小建议尺寸
error_msg = f"图片尺寸过小({width}x{height}需至少64x64像素"
logger.warning(error_msg)
return False, error_msg
# 5. 调用InsightFace检测人脸
logger.info(f"开始检测人脸(图片尺寸:{width}x{height}格式BGR")
faces = _insightface_app.get(frame)
if not faces:
error_msg = "未检测到人脸(请确保图片包含清晰正面人脸,无遮挡、不模糊)"
logger.warning(error_msg)
return False, error_msg
# 6. 提取特征并保存
current_feature = faces[0].embedding
_feature_list.append(current_feature)
logger.info(f"人脸检测成功,提取特征值(维度:{current_feature.shape[0]}),累计特征数:{len(_feature_list)}")
return True, current_feature
except Exception as e:
error_msg = f"处理图片时发生异常:{str(e)}"
logger.error(error_msg, exc_info=True)
return False, error_msg
# 以下函数保持不变get_average_feature/clear_features/get_feature_list
def get_average_feature(features=None):
global _feature_list
try:
if features is None:
features = _feature_list
if not isinstance(features, list) or len(features) == 0:
logger.warning("输入必须是包含至少一个特征值的列表")
return None
processed_features = []
for i, embedding in enumerate(features):
try:
if isinstance(embedding, str):
embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip()
embedding_list = [float(num) for num in embedding_str.split() if num.strip()]
embedding_np = np.array(embedding_list, dtype=np.float32)
else:
embedding_np = np.array(embedding, dtype=np.float32)
if len(embedding_np.shape) == 1:
processed_features.append(embedding_np)
logger.info(f"已添加第 {i + 1} 个特征值用于计算平均值")
else:
logger.warning(f"跳过第 {i + 1} 个特征值:不是一维数组")
except Exception as e:
logger.error(f"处理第 {i + 1} 个特征值时出错:{str(e)}")
if not processed_features:
logger.warning("没有有效的特征值用于计算平均值")
return None
dims = {feat.shape[0] for feat in processed_features}
if len(dims) > 1:
logger.error(f"特征值维度不一致:{dims},无法计算平均值")
return None
avg_feature = np.mean(processed_features, axis=0)
logger.info(f"计算成功:{len(processed_features)} 个特征值的平均向量(维度:{avg_feature.shape[0]}")
return avg_feature
except Exception as e:
logger.error(f"计算平均特征值出错:{str(e)}", exc_info=True)
return None
def clear_features():
global _feature_list
_feature_list = []
logger.info("已清空所有特征数据")
def get_feature_list():
global _feature_list
logger.info(f"当前特征列表长度:{len(_feature_list)}")
return _feature_list.copy()