import cv2 import numpy as np import insightface from insightface.app import FaceAnalysis from io import BytesIO from PIL import Image # 全局变量存储InsightFace引擎和特征列表 _insightface_app = None _feature_list = [] def init_insightface(): """初始化InsightFace引擎""" global _insightface_app try: print("正在初始化InsightFace引擎...") app = FaceAnalysis(name='buffalo_l', root='~/.insightface') app.prepare(ctx_id=0, det_size=(640, 640)) print("InsightFace引擎初始化完成") _insightface_app = app return app except Exception as e: print(f"InsightFace初始化失败: {e}") return None def add_binary_data(binary_data): """ 接收单张图片的二进制数据、提取特征并保存 参数: binary_data: 图片的二进制数据(bytes类型) 返回: 成功提取特征时返回 (True, 特征值numpy数组) 失败时返回 (False, None) """ global _insightface_app, _feature_list if not _insightface_app: print("引擎未初始化、无法处理") return False, None try: # 直接处理二进制数据: 转换为图像格式 img = Image.open(BytesIO(binary_data)) frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) # 提取特征 faces = _insightface_app.get(frame) if faces: # 获取当前提取的特征值 current_feature = faces[0].embedding # 添加到特征列表 _feature_list.append(current_feature) print(f"已累计 {len(_feature_list)} 个特征") # 返回成功标志和当前特征值 return True, current_feature else: print("二进制数据中未检测到人脸") return False, None except Exception as e: print(f"处理二进制数据出错: {e}") return False, None def get_average_feature(features=None): """ 计算多个特征向量的平均值 参数: features: 可选、特征值列表。如果未提供、则使用全局存储的_feature_list 每个元素可以是字符串格式或numpy数组 返回: 单一平均特征向量的numpy数组、若无可计算数据则返回None """ global _feature_list # 如果未提供features参数、则使用全局特征列表 if features is None: features = _feature_list try: # 验证输入是否为列表且不为空 if not isinstance(features, list) or len(features) == 0: print("输入必须是包含至少一个特征值的列表") return None # 处理每个特征值 processed_features = [] for i, embedding in enumerate(features): try: if isinstance(embedding, str): # 处理包含括号和逗号的字符串格式 embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip() embedding_list = [float(num) for num in embedding_str.split() if num.strip()] embedding_np = np.array(embedding_list, dtype=np.float32) else: embedding_np = np.array(embedding, dtype=np.float32) # 验证特征值格式 if len(embedding_np.shape) == 1: processed_features.append(embedding_np) print(f"已添加第 {i + 1} 个特征值用于计算平均值") else: print(f"跳过第 {i + 1} 个特征值、不是一维数组") except Exception as e: print(f"处理第 {i + 1} 个特征值时出错: {e}") # 确保有有效的特征值 if not processed_features: print("没有有效的特征值用于计算平均值") return None # 检查所有特征向量维度是否相同 dims = {feat.shape[0] for feat in processed_features} if len(dims) > 1: print(f"特征值维度不一致、无法计算平均值。检测到的维度: {dims}") return None # 计算平均值 avg_feature = np.mean(processed_features, axis=0) print(f"成功计算 {len(processed_features)} 个特征值的平均特征向量、维度: {avg_feature.shape[0]}") return avg_feature except Exception as e: print(f"计算平均特征值时出错: {e}") return None def clear_features(): """清空已存储的特征数据""" global _feature_list _feature_list = [] print("已清空所有特征数据") def get_feature_list(): """获取当前存储的特征列表""" global _feature_list return _feature_list.copy() # 返回副本防止外部直接修改