From 49d2c71fdd73708334cd1dc1e05c78968c597c10 Mon Sep 17 00:00:00 2001 From: ninghongbin <2409766686@qq.com> Date: Thu, 4 Sep 2025 10:39:41 +0800 Subject: [PATCH] 1 --- ocr/feature_extraction.py | 156 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 ocr/feature_extraction.py diff --git a/ocr/feature_extraction.py b/ocr/feature_extraction.py new file mode 100644 index 0000000..ac9e1e9 --- /dev/null +++ b/ocr/feature_extraction.py @@ -0,0 +1,156 @@ +import cv2 +import numpy as np +import insightface +from insightface.app import FaceAnalysis +from io import BytesIO +from PIL import Image + + +class BinaryFaceFeatureHandler: + """ + 专门处理图片二进制数据的特征提取器,支持分批次接收二进制数据并累积计算平均特征 + """ + + def __init__(self): + self.app = self._init_insightface() + self.feature_list = [] # 存储所有图片二进制数据提取的特征 + + def _init_insightface(self): + """初始化InsightFace引擎""" + try: + print("正在初始化InsightFace引擎...") + app = FaceAnalysis(name='buffalo_l', root='~/.insightface') + app.prepare(ctx_id=0, det_size=(640, 640)) + print("InsightFace引擎初始化完成") + return app + except Exception as e: + print(f"InsightFace初始化失败: {e}") + return None + + def add_binary_data(self, binary_data): + """ + 接收单张图片的二进制数据,提取特征并保存 + + 参数: + binary_data: 图片的二进制数据(bytes类型) + + 返回: + 成功提取特征时返回 (True, 特征值numpy数组) + 失败时返回 (False, None) + """ + if not self.app: + print("引擎未初始化,无法处理") + return False, None + + try: + # 直接处理二进制数据:转换为图像格式 + img = Image.open(BytesIO(binary_data)) + frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) + + # 提取特征 + faces = self.app.get(frame) + if faces: + # 获取当前提取的特征值 + current_feature = faces[0].embedding + # 添加到特征列表 + self.feature_list.append(current_feature) + print(f"已累计 {len(self.feature_list)} 个特征") + # 返回成功标志和当前特征值 + return True,current_feature + else: + print("二进制数据中未检测到人脸") + return False, None + except Exception as e: + print(f"处理二进制数据出错: {e}") + return False, None + + def get_average_feature(self, features): + """ + 计算多个特征向量的平均值 + + 参数: + features: 特征值列表,每个元素可以是字符串格式或numpy数组 + 例如: [feature1, feature2, ...] + 返回: + 单一平均特征向量的numpy数组,若无可计算数据则返回None + """ + try: + # 验证输入是否为列表且不为空 + if not isinstance(features, list) or len(features) == 0: + print("输入必须是包含至少一个特征值的列表") + return None + + # 处理每个特征值 + processed_features = [] + for i, embedding in enumerate(features): + try: + if isinstance(embedding, str): + # 处理包含括号和逗号的字符串格式 + embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip() + embedding_list = [float(num) for num in embedding_str.split() if num.strip()] + embedding_np = np.array(embedding_list, dtype=np.float32) + else: + embedding_np = np.array(embedding, dtype=np.float32) + + # 验证特征值格式 + if len(embedding_np.shape) == 1: + processed_features.append(embedding_np) + print(f"已添加第 {i + 1} 个特征值用于计算平均值") + else: + print(f"跳过第 {i + 1} 个特征值,不是一维数组") + + except Exception as e: + print(f"处理第 {i + 1} 个特征值时出错: {e}") + + # 确保有有效的特征值 + if not processed_features: + print("没有有效的特征值用于计算平均值") + return None + + # 检查所有特征向量维度是否相同 + dims = {feat.shape[0] for feat in processed_features} + if len(dims) > 1: + print(f"特征值维度不一致,无法计算平均值。检测到的维度: {dims}") + return None + + # 计算平均值 + avg_feature = np.mean(processed_features, axis=0) + print(f"成功计算 {len(processed_features)} 个特征值的平均特征向量,维度: {avg_feature.shape[0]}") + + return avg_feature + + except Exception as e: + print(f"计算平均特征值时出错: {e}") + return None + + # def clear(self): + # """清空已存储的特征数据""" + # self.feature_list = [] + # print("已清空所有特征数据") + + +# # 使用示例 +# if __name__ == "__main__": +# handler = BinaryFaceFeatureHandler() +# +# # 模拟接收图片二进制数据 +# try: +# # 第一次接收 +# with open(r"D:\Git\bin\video\ocr\known_faces\B\102-f.jpg_1140x855.jpg", "rb") as f: +# bin_data1 = f.read() +# success, feature1 = handler.add_binary_data(bin_data1) +# if success: +# print(f"第一次提取的特征值前5个: {feature1[:5]}") +# +# # 第二次接收 +# with open(r"D:\Git\bin\video\ocr\known_faces\B\104-1.jpg", "rb") as f: +# bin_data2 = f.read() +# success, feature2 = handler.add_binary_data(bin_data2) +# if success: +# print(f"第二次提取的特征值前5个: {feature2[:5]}") +# +# # 计算平均值 +# avg_feature = handler.get_average_feature() +# +# except Exception as e: +# print(f"处理过程出错: {e}")