import cv2 import numpy as np import insightface from insightface.app import FaceAnalysis from io import BytesIO from PIL import Image class BinaryFaceFeatureHandler: """ 专门处理图片二进制数据的特征提取器,支持分批次接收二进制数据并累积计算平均特征 """ def __init__(self): self.app = self._init_insightface() self.feature_list = [] # 存储所有图片二进制数据提取的特征 def _init_insightface(self): """初始化InsightFace引擎""" try: print("正在初始化InsightFace引擎...") app = FaceAnalysis(name='buffalo_l', root='~/.insightface') app.prepare(ctx_id=0, det_size=(640, 640)) print("InsightFace引擎初始化完成") return app except Exception as e: print(f"InsightFace初始化失败: {e}") return None def add_binary_data(self, binary_data): """ 接收单张图片的二进制数据,提取特征并保存 参数: binary_data: 图片的二进制数据(bytes类型) 返回: 成功提取特征时返回 (True, 特征值numpy数组) 失败时返回 (False, None) """ if not self.app: print("引擎未初始化,无法处理") return False, None try: # 直接处理二进制数据:转换为图像格式 img = Image.open(BytesIO(binary_data)) frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) # 提取特征 faces = self.app.get(frame) if faces: # 获取当前提取的特征值 current_feature = faces[0].embedding # 添加到特征列表 self.feature_list.append(current_feature) print(f"已累计 {len(self.feature_list)} 个特征") # 返回成功标志和当前特征值 return True,current_feature else: print("二进制数据中未检测到人脸") return False, None except Exception as e: print(f"处理二进制数据出错: {e}") return False, None def get_average_feature(self, features): """ 计算多个特征向量的平均值 参数: features: 特征值列表,每个元素可以是字符串格式或numpy数组 例如: [feature1, feature2, ...] 返回: 单一平均特征向量的numpy数组,若无可计算数据则返回None """ try: # 验证输入是否为列表且不为空 if not isinstance(features, list) or len(features) == 0: print("输入必须是包含至少一个特征值的列表") return None # 处理每个特征值 processed_features = [] for i, embedding in enumerate(features): try: if isinstance(embedding, str): # 处理包含括号和逗号的字符串格式 embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip() embedding_list = [float(num) for num in embedding_str.split() if num.strip()] embedding_np = np.array(embedding_list, dtype=np.float32) else: embedding_np = np.array(embedding, dtype=np.float32) # 验证特征值格式 if len(embedding_np.shape) == 1: processed_features.append(embedding_np) print(f"已添加第 {i + 1} 个特征值用于计算平均值") else: print(f"跳过第 {i + 1} 个特征值,不是一维数组") except Exception as e: print(f"处理第 {i + 1} 个特征值时出错: {e}") # 确保有有效的特征值 if not processed_features: print("没有有效的特征值用于计算平均值") return None # 检查所有特征向量维度是否相同 dims = {feat.shape[0] for feat in processed_features} if len(dims) > 1: print(f"特征值维度不一致,无法计算平均值。检测到的维度: {dims}") return None # 计算平均值 avg_feature = np.mean(processed_features, axis=0) print(f"成功计算 {len(processed_features)} 个特征值的平均特征向量,维度: {avg_feature.shape[0]}") return avg_feature except Exception as e: print(f"计算平均特征值时出错: {e}") return None # def clear(self): # """清空已存储的特征数据""" # self.feature_list = [] # print("已清空所有特征数据") # # 使用示例 # if __name__ == "__main__": # handler = BinaryFaceFeatureHandler() # # # 模拟接收图片二进制数据 # try: # # 第一次接收 # with open(r"D:\Git\bin\video\ocr\known_faces\B\102-f.jpg_1140x855.jpg", "rb") as f: # bin_data1 = f.read() # success, feature1 = handler.add_binary_data(bin_data1) # if success: # print(f"第一次提取的特征值前5个: {feature1[:5]}") # # # 第二次接收 # with open(r"D:\Git\bin\video\ocr\known_faces\B\104-1.jpg", "rb") as f: # bin_data2 = f.read() # success, feature2 = handler.add_binary_data(bin_data2) # if success: # print(f"第二次提取的特征值前5个: {feature2[:5]}") # # # 计算平均值 # avg_feature = handler.get_average_feature() # # except Exception as e: # print(f"处理过程出错: {e}")