import cv2 import numpy as np import insightface from insightface.app import FaceAnalysis from io import BytesIO from PIL import Image import logging # 配置日志(便于排查) logging.basicConfig(level=logging.INFO, format='%(asctime)s - [FaceUtil] - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 全局变量存储InsightFace引擎和特征列表 _insightface_app = None _feature_list = [] def init_insightface(): """初始化InsightFace引擎(确保成功后再使用)""" global _insightface_app try: if _insightface_app is not None: logger.info("InsightFace引擎已初始化,无需重复执行") return _insightface_app logger.info("正在初始化InsightFace引擎(模型:buffalo_l)...") # 手动指定模型下载路径(避免权限问题,可选) app = FaceAnalysis( name='buffalo_l', root='~/.insightface', # 模型默认下载路径 providers=['CPUExecutionProvider'] # 强制用CPU(若有GPU可加'CUDAExecutionProvider') ) app.prepare(ctx_id=0, det_size=(640, 640)) # det_size越大,小人脸检测越准 logger.info("InsightFace引擎初始化完成") _insightface_app = app return app except Exception as e: logger.error(f"InsightFace初始化失败:{str(e)}", exc_info=True) # 打印详细堆栈 _insightface_app = None return None def add_binary_data(binary_data): """ 接收单张图片的二进制数据、提取特征并保存 返回:(True, 特征值numpy数组) 或 (False, 错误信息字符串) """ global _insightface_app, _feature_list # 1. 先检查引擎是否初始化成功 if not _insightface_app: init_result = init_insightface() # 尝试重新初始化 if not init_result: error_msg = "InsightFace引擎未初始化,无法检测人脸" logger.error(error_msg) return False, error_msg try: # 2. 验证二进制数据有效性 if len(binary_data) < 1024: # 过滤过小的无效图片(小于1KB) error_msg = f"图片过小({len(binary_data)}字节),可能不是有效图片" logger.warning(error_msg) return False, error_msg # 3. 二进制数据转CV2格式(关键步骤,避免通道错误) try: img = Image.open(BytesIO(binary_data)).convert("RGB") # 强制转RGB frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) # InsightFace需要BGR格式 except Exception as e: error_msg = f"图片格式转换失败:{str(e)}" logger.error(error_msg, exc_info=True) return False, error_msg # 4. 检查图片尺寸(避免极端尺寸导致检测失败) height, width = frame.shape[:2] if height < 64 or width < 64: # 人脸检测最小建议尺寸 error_msg = f"图片尺寸过小({width}x{height}),需至少64x64像素" logger.warning(error_msg) return False, error_msg # 5. 调用InsightFace检测人脸 logger.info(f"开始检测人脸(图片尺寸:{width}x{height},格式:BGR)") faces = _insightface_app.get(frame) if not faces: error_msg = "未检测到人脸(请确保图片包含清晰正面人脸,无遮挡、不模糊)" logger.warning(error_msg) return False, error_msg # 6. 提取特征并保存 current_feature = faces[0].embedding _feature_list.append(current_feature) logger.info(f"人脸检测成功,提取特征值(维度:{current_feature.shape[0]}),累计特征数:{len(_feature_list)}") return True, current_feature except Exception as e: error_msg = f"处理图片时发生异常:{str(e)}" logger.error(error_msg, exc_info=True) return False, error_msg # 以下函数保持不变(get_average_feature/clear_features/get_feature_list) def get_average_feature(features=None): global _feature_list try: if features is None: features = _feature_list if not isinstance(features, list) or len(features) == 0: logger.warning("输入必须是包含至少一个特征值的列表") return None processed_features = [] for i, embedding in enumerate(features): try: if isinstance(embedding, str): embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip() embedding_list = [float(num) for num in embedding_str.split() if num.strip()] embedding_np = np.array(embedding_list, dtype=np.float32) else: embedding_np = np.array(embedding, dtype=np.float32) if len(embedding_np.shape) == 1: processed_features.append(embedding_np) logger.info(f"已添加第 {i + 1} 个特征值用于计算平均值") else: logger.warning(f"跳过第 {i + 1} 个特征值:不是一维数组") except Exception as e: logger.error(f"处理第 {i + 1} 个特征值时出错:{str(e)}") if not processed_features: logger.warning("没有有效的特征值用于计算平均值") return None dims = {feat.shape[0] for feat in processed_features} if len(dims) > 1: logger.error(f"特征值维度不一致:{dims},无法计算平均值") return None avg_feature = np.mean(processed_features, axis=0) logger.info(f"计算成功:{len(processed_features)} 个特征值的平均向量(维度:{avg_feature.shape[0]})") return avg_feature except Exception as e: logger.error(f"计算平均特征值出错:{str(e)}", exc_info=True) return None def clear_features(): global _feature_list _feature_list = [] logger.info("已清空所有特征数据") def get_feature_list(): global _feature_list logger.info(f"当前特征列表长度:{len(_feature_list)}") return _feature_list.copy()