Files
video/util/face_util.py
2025-09-08 17:34:23 +08:00

145 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
import insightface
from insightface.app import FaceAnalysis
from io import BytesIO
from PIL import Image
# 全局变量存储InsightFace引擎和特征列表
_insightface_app = None
_feature_list = []
def init_insightface():
"""初始化InsightFace引擎"""
global _insightface_app
try:
print("正在初始化InsightFace引擎...")
app = FaceAnalysis(name='buffalo_l', root='~/.insightface')
app.prepare(ctx_id=0, det_size=(640, 640))
print("InsightFace引擎初始化完成")
_insightface_app = app
return app
except Exception as e:
print(f"InsightFace初始化失败: {e}")
return None
def add_binary_data(binary_data):
"""
接收单张图片的二进制数据、提取特征并保存
参数:
binary_data: 图片的二进制数据bytes类型
返回:
成功提取特征时返回 (True, 特征值numpy数组)
失败时返回 (False, None)
"""
global _insightface_app, _feature_list
if not _insightface_app:
print("引擎未初始化、无法处理")
return False, None
try:
# 直接处理二进制数据: 转换为图像格式
img = Image.open(BytesIO(binary_data))
frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
# 提取特征
faces = _insightface_app.get(frame)
if faces:
# 获取当前提取的特征值
current_feature = faces[0].embedding
# 添加到特征列表
_feature_list.append(current_feature)
print(f"已累计 {len(_feature_list)} 个特征")
# 返回成功标志和当前特征值
return True, current_feature
else:
print("二进制数据中未检测到人脸")
return False, None
except Exception as e:
print(f"处理二进制数据出错: {e}")
return False, None
def get_average_feature(features=None):
"""
计算多个特征向量的平均值
参数:
features: 可选、特征值列表。如果未提供、则使用全局存储的_feature_list
每个元素可以是字符串格式或numpy数组
返回:
单一平均特征向量的numpy数组、若无可计算数据则返回None
"""
global _feature_list
# 如果未提供features参数、则使用全局特征列表
if features is None:
features = _feature_list
try:
# 验证输入是否为列表且不为空
if not isinstance(features, list) or len(features) == 0:
print("输入必须是包含至少一个特征值的列表")
return None
# 处理每个特征值
processed_features = []
for i, embedding in enumerate(features):
try:
if isinstance(embedding, str):
# 处理包含括号和逗号的字符串格式
embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip()
embedding_list = [float(num) for num in embedding_str.split() if num.strip()]
embedding_np = np.array(embedding_list, dtype=np.float32)
else:
embedding_np = np.array(embedding, dtype=np.float32)
# 验证特征值格式
if len(embedding_np.shape) == 1:
processed_features.append(embedding_np)
print(f"已添加第 {i + 1} 个特征值用于计算平均值")
else:
print(f"跳过第 {i + 1} 个特征值、不是一维数组")
except Exception as e:
print(f"处理第 {i + 1} 个特征值时出错: {e}")
# 确保有有效的特征值
if not processed_features:
print("没有有效的特征值用于计算平均值")
return None
# 检查所有特征向量维度是否相同
dims = {feat.shape[0] for feat in processed_features}
if len(dims) > 1:
print(f"特征值维度不一致、无法计算平均值。检测到的维度: {dims}")
return None
# 计算平均值
avg_feature = np.mean(processed_features, axis=0)
print(f"成功计算 {len(processed_features)} 个特征值的平均特征向量、维度: {avg_feature.shape[0]}")
return avg_feature
except Exception as e:
print(f"计算平均特征值时出错: {e}")
return None
def clear_features():
"""清空已存储的特征数据"""
global _feature_list
_feature_list = []
print("已清空所有特征数据")
def get_feature_list():
"""获取当前存储的特征列表"""
global _feature_list
return _feature_list.copy() # 返回副本防止外部直接修改