145 lines
4.8 KiB
Python
145 lines
4.8 KiB
Python
import cv2
|
||
import numpy as np
|
||
import insightface
|
||
from insightface.app import FaceAnalysis
|
||
from io import BytesIO
|
||
from PIL import Image
|
||
|
||
# 全局变量存储InsightFace引擎和特征列表
|
||
_insightface_app = None
|
||
_feature_list = []
|
||
|
||
|
||
def init_insightface():
|
||
"""初始化InsightFace引擎"""
|
||
global _insightface_app
|
||
try:
|
||
print("正在初始化InsightFace引擎...")
|
||
app = FaceAnalysis(name='buffalo_l', root='~/.insightface')
|
||
app.prepare(ctx_id=0, det_size=(640, 640))
|
||
print("InsightFace引擎初始化完成")
|
||
_insightface_app = app
|
||
return app
|
||
except Exception as e:
|
||
print(f"InsightFace初始化失败: {e}")
|
||
return None
|
||
|
||
|
||
def add_binary_data(binary_data):
|
||
"""
|
||
接收单张图片的二进制数据,提取特征并保存
|
||
|
||
参数:
|
||
binary_data: 图片的二进制数据(bytes类型)
|
||
|
||
返回:
|
||
成功提取特征时返回 (True, 特征值numpy数组)
|
||
失败时返回 (False, None)
|
||
"""
|
||
global _insightface_app, _feature_list
|
||
|
||
if not _insightface_app:
|
||
print("引擎未初始化,无法处理")
|
||
return False, None
|
||
|
||
try:
|
||
# 直接处理二进制数据:转换为图像格式
|
||
img = Image.open(BytesIO(binary_data))
|
||
frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
|
||
|
||
# 提取特征
|
||
faces = _insightface_app.get(frame)
|
||
if faces:
|
||
# 获取当前提取的特征值
|
||
current_feature = faces[0].embedding
|
||
# 添加到特征列表
|
||
_feature_list.append(current_feature)
|
||
print(f"已累计 {len(_feature_list)} 个特征")
|
||
# 返回成功标志和当前特征值
|
||
return True, current_feature
|
||
else:
|
||
print("二进制数据中未检测到人脸")
|
||
return False, None
|
||
except Exception as e:
|
||
print(f"处理二进制数据出错: {e}")
|
||
return False, None
|
||
|
||
|
||
def get_average_feature(features=None):
|
||
"""
|
||
计算多个特征向量的平均值
|
||
|
||
参数:
|
||
features: 可选,特征值列表。如果未提供,则使用全局存储的_feature_list
|
||
每个元素可以是字符串格式或numpy数组
|
||
|
||
返回:
|
||
单一平均特征向量的numpy数组,若无可计算数据则返回None
|
||
"""
|
||
global _feature_list
|
||
|
||
# 如果未提供features参数,则使用全局特征列表
|
||
if features is None:
|
||
features = _feature_list
|
||
|
||
try:
|
||
# 验证输入是否为列表且不为空
|
||
if not isinstance(features, list) or len(features) == 0:
|
||
print("输入必须是包含至少一个特征值的列表")
|
||
return None
|
||
|
||
# 处理每个特征值
|
||
processed_features = []
|
||
for i, embedding in enumerate(features):
|
||
try:
|
||
if isinstance(embedding, str):
|
||
# 处理包含括号和逗号的字符串格式
|
||
embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip()
|
||
embedding_list = [float(num) for num in embedding_str.split() if num.strip()]
|
||
embedding_np = np.array(embedding_list, dtype=np.float32)
|
||
else:
|
||
embedding_np = np.array(embedding, dtype=np.float32)
|
||
|
||
# 验证特征值格式
|
||
if len(embedding_np.shape) == 1:
|
||
processed_features.append(embedding_np)
|
||
print(f"已添加第 {i + 1} 个特征值用于计算平均值")
|
||
else:
|
||
print(f"跳过第 {i + 1} 个特征值,不是一维数组")
|
||
|
||
except Exception as e:
|
||
print(f"处理第 {i + 1} 个特征值时出错: {e}")
|
||
|
||
# 确保有有效的特征值
|
||
if not processed_features:
|
||
print("没有有效的特征值用于计算平均值")
|
||
return None
|
||
|
||
# 检查所有特征向量维度是否相同
|
||
dims = {feat.shape[0] for feat in processed_features}
|
||
if len(dims) > 1:
|
||
print(f"特征值维度不一致,无法计算平均值。检测到的维度: {dims}")
|
||
return None
|
||
|
||
# 计算平均值
|
||
avg_feature = np.mean(processed_features, axis=0)
|
||
print(f"成功计算 {len(processed_features)} 个特征值的平均特征向量,维度: {avg_feature.shape[0]}")
|
||
|
||
return avg_feature
|
||
|
||
except Exception as e:
|
||
print(f"计算平均特征值时出错: {e}")
|
||
return None
|
||
|
||
|
||
def clear_features():
|
||
"""清空已存储的特征数据"""
|
||
global _feature_list
|
||
_feature_list = []
|
||
print("已清空所有特征数据")
|
||
|
||
|
||
def get_feature_list():
|
||
"""获取当前存储的特征列表"""
|
||
global _feature_list
|
||
return _feature_list.copy() # 返回副本防止外部直接修改 |