Compare commits

...

2 Commits

Author SHA1 Message Date
bae7785a97 Merge remote-tracking branch 'origin/master' 2025-09-04 10:40:06 +08:00
49d2c71fdd 1 2025-09-04 10:39:41 +08:00

156
ocr/feature_extraction.py Normal file
View File

@ -0,0 +1,156 @@
import cv2
import numpy as np
import insightface
from insightface.app import FaceAnalysis
from io import BytesIO
from PIL import Image
class BinaryFaceFeatureHandler:
"""
专门处理图片二进制数据的特征提取器,支持分批次接收二进制数据并累积计算平均特征
"""
def __init__(self):
self.app = self._init_insightface()
self.feature_list = [] # 存储所有图片二进制数据提取的特征
def _init_insightface(self):
"""初始化InsightFace引擎"""
try:
print("正在初始化InsightFace引擎...")
app = FaceAnalysis(name='buffalo_l', root='~/.insightface')
app.prepare(ctx_id=0, det_size=(640, 640))
print("InsightFace引擎初始化完成")
return app
except Exception as e:
print(f"InsightFace初始化失败: {e}")
return None
def add_binary_data(self, binary_data):
"""
接收单张图片的二进制数据,提取特征并保存
参数:
binary_data: 图片的二进制数据bytes类型
返回:
成功提取特征时返回 (True, 特征值numpy数组)
失败时返回 (False, None)
"""
if not self.app:
print("引擎未初始化,无法处理")
return False, None
try:
# 直接处理二进制数据:转换为图像格式
img = Image.open(BytesIO(binary_data))
frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
# 提取特征
faces = self.app.get(frame)
if faces:
# 获取当前提取的特征值
current_feature = faces[0].embedding
# 添加到特征列表
self.feature_list.append(current_feature)
print(f"已累计 {len(self.feature_list)} 个特征")
# 返回成功标志和当前特征值
return True,current_feature
else:
print("二进制数据中未检测到人脸")
return False, None
except Exception as e:
print(f"处理二进制数据出错: {e}")
return False, None
def get_average_feature(self, features):
"""
计算多个特征向量的平均值
参数:
features: 特征值列表每个元素可以是字符串格式或numpy数组
例如: [feature1, feature2, ...]
返回:
单一平均特征向量的numpy数组若无可计算数据则返回None
"""
try:
# 验证输入是否为列表且不为空
if not isinstance(features, list) or len(features) == 0:
print("输入必须是包含至少一个特征值的列表")
return None
# 处理每个特征值
processed_features = []
for i, embedding in enumerate(features):
try:
if isinstance(embedding, str):
# 处理包含括号和逗号的字符串格式
embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip()
embedding_list = [float(num) for num in embedding_str.split() if num.strip()]
embedding_np = np.array(embedding_list, dtype=np.float32)
else:
embedding_np = np.array(embedding, dtype=np.float32)
# 验证特征值格式
if len(embedding_np.shape) == 1:
processed_features.append(embedding_np)
print(f"已添加第 {i + 1} 个特征值用于计算平均值")
else:
print(f"跳过第 {i + 1} 个特征值,不是一维数组")
except Exception as e:
print(f"处理第 {i + 1} 个特征值时出错: {e}")
# 确保有有效的特征值
if not processed_features:
print("没有有效的特征值用于计算平均值")
return None
# 检查所有特征向量维度是否相同
dims = {feat.shape[0] for feat in processed_features}
if len(dims) > 1:
print(f"特征值维度不一致,无法计算平均值。检测到的维度: {dims}")
return None
# 计算平均值
avg_feature = np.mean(processed_features, axis=0)
print(f"成功计算 {len(processed_features)} 个特征值的平均特征向量,维度: {avg_feature.shape[0]}")
return avg_feature
except Exception as e:
print(f"计算平均特征值时出错: {e}")
return None
# def clear(self):
# """清空已存储的特征数据"""
# self.feature_list = []
# print("已清空所有特征数据")
# # 使用示例
# if __name__ == "__main__":
# handler = BinaryFaceFeatureHandler()
#
# # 模拟接收图片二进制数据
# try:
# # 第一次接收
# with open(r"D:\Git\bin\video\ocr\known_faces\B\102-f.jpg_1140x855.jpg", "rb") as f:
# bin_data1 = f.read()
# success, feature1 = handler.add_binary_data(bin_data1)
# if success:
# print(f"第一次提取的特征值前5个: {feature1[:5]}")
#
# # 第二次接收
# with open(r"D:\Git\bin\video\ocr\known_faces\B\104-1.jpg", "rb") as f:
# bin_data2 = f.read()
# success, feature2 = handler.add_binary_data(bin_data2)
# if success:
# print(f"第二次提取的特征值前5个: {feature2[:5]}")
#
# # 计算平均值
# avg_feature = handler.get_average_feature()
#
# except Exception as e:
# print(f"处理过程出错: {e}")