Files
video/core/face.py

326 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import numpy as np
import cv2
import gc
import time
import threading
from PIL import Image
from insightface.app import FaceAnalysis
# 假设service.face_service中get_all_face_name_with_eigenvalue可获取人脸数据
from service.face_service import get_all_face_name_with_eigenvalue
# GPU状态检查支持
try:
import pynvml
pynvml.nvmlInit()
_nvml_available = True
except ImportError:
print("警告: pynvml库未安装无法检测GPU状态默认尝试使用GPU")
_nvml_available = False
# 全局人脸引擎与特征库
_face_app = None
_known_faces_embeddings = {} # 姓名 -> 归一化特征值的映射
_known_faces_names = [] # 已知人脸姓名列表
# GPU使用状态标记
_using_gpu = False # 是否使用GPU
_used_gpu_id = -1 # 使用的GPU ID-1表示CPU
# 资源管理变量
_ref_count = 0 # 引擎引用计数(记录当前使用次数)
_last_used_time = 0 # 最后一次使用引擎的时间
_lock = threading.Lock() # 线程安全锁
_release_timeout = 8 # 闲置超时时间(秒)
_is_releasing = False # 资源释放中标记
_monitor_thread_running = False # 监控线程运行标记
# 调试计数器
_debug_counter = {
"engine_created": 0, # 引擎创建次数
"engine_released": 0, # 引擎释放次数
"detection_calls": 0 # 检测函数调用次数
}
def check_gpu_availability(gpu_id, memory_threshold=0.7):
"""检查指定GPU的内存使用率是否低于阈值判定为“可用”"""
if not _nvml_available:
return True
try:
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_id)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
memory_usage = mem_info.used / mem_info.total
return memory_usage < memory_threshold
except Exception as e:
print(f"检查GPU {gpu_id} 状态失败: {e}")
return False
def select_best_gpu(preferred_gpus=[0, 1]):
"""按优先级选择可用GPU优先0号均不可用则返回-1CPU"""
for gpu_id in preferred_gpus:
try:
# 验证GPU是否存在
if _nvml_available:
pynvml.nvmlDeviceGetHandleByIndex(gpu_id)
# 验证GPU内存是否充足
if check_gpu_availability(gpu_id):
print(f"GPU {gpu_id} 可用将使用该GPU")
return gpu_id
else:
if gpu_id == 0:
print("GPU 0 内存使用率过高尝试其他GPU")
except Exception as e:
print(f"GPU {gpu_id} 不可用或访问失败: {e}")
print("所有指定GPU均不可用将使用CPU计算")
return -1
def _release_engine_resources():
"""释放人脸引擎的所有资源模型、特征库、GPU缓存等"""
global _face_app, _is_releasing, _known_faces_embeddings, _known_faces_names
if not _face_app or _is_releasing:
return
try:
_is_releasing = True
print("开始释放人脸引擎资源...")
# 释放InsightFace模型资源
if hasattr(_face_app, "model"):
_face_app.model = None # 显式置空模型引用
_face_app = None # 释放引擎实例
# 清空人脸特征库
_known_faces_embeddings.clear()
_known_faces_names.clear()
_debug_counter["engine_released"] += 1
print(f"人脸引擎已释放,调试统计: {_debug_counter}")
# 强制垃圾回收
gc.collect()
# 清理各深度学习框架的GPU缓存
# Torch 缓存清理
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
print("Torch GPU缓存已清理")
except ImportError:
pass
# TensorFlow 缓存清理
try:
import tensorflow as tf
tf.keras.backend.clear_session()
print("TensorFlow会话已清理")
except ImportError:
pass
# MXNet 缓存清理InsightFace底层常用MXNet
try:
import mxnet as mx
mx.nd.waitall() # 等待所有计算完成并释放资源
print("MXNet资源已等待释放")
except ImportError:
pass
except Exception as e:
print(f"释放资源过程中出错: {e}")
finally:
_is_releasing = False
def _resource_monitor_thread():
"""后台监控线程:检测引擎闲置超时,触发资源释放"""
global _ref_count, _last_used_time, _face_app, _monitor_thread_running
_monitor_thread_running = True
while _monitor_thread_running:
time.sleep(2) # 缩短检查间隔,加快闲置检测响应
with _lock:
# 当“引擎存在 + 无引用 + 未在释放中”时,检查闲置时间
if _face_app and _ref_count == 0 and not _is_releasing:
idle_time = time.time() - _last_used_time
if idle_time > _release_timeout:
print(f"引擎闲置超时({idle_time:.1f}s > {_release_timeout}s释放资源")
_release_engine_resources()
def load_model(prefer_gpu=True, preferred_gpus=[0, 1]):
"""加载人脸识别引擎及已知人脸特征库默认优先用0号GPU"""
global _face_app, _known_faces_embeddings, _known_faces_names, _using_gpu, _used_gpu_id
# 启动后台监控线程(确保仅启动一次)
if not _monitor_thread_running:
threading.Thread(
target=_resource_monitor_thread,
daemon=True,
name="FaceEngineMonitor"
).start()
print("人脸引擎监控线程已启动")
# 若正在释放资源,等待释放完成
while _is_releasing:
time.sleep(0.1)
# 若引擎已初始化,直接返回
if _face_app:
return True
# 初始化InsightFace引擎
try:
print("正在初始化InsightFace人脸识别引擎...")
_face_app = FaceAnalysis(name="buffalo_l", root=os.path.expanduser("~/.insightface"))
# 选择GPU优先用0号
ctx_id = 0
if prefer_gpu:
ctx_id = select_best_gpu(preferred_gpus)
_using_gpu = ctx_id != -1
_used_gpu_id = ctx_id if _using_gpu else -1
if _using_gpu:
print(f"引擎初始化成功将使用GPU {ctx_id} 计算")
else:
print("引擎初始化成功将使用CPU计算")
# 准备模型(加载到指定设备)
_face_app.prepare(ctx_id=ctx_id, det_size=(640, 640))
print("InsightFace引擎初始化完成")
_debug_counter["engine_created"] += 1
print(f"引擎调试统计: {_debug_counter}")
except Exception as e:
print(f"引擎初始化失败: {e}")
return False
# 从服务加载已知人脸的姓名和特征值
try:
face_data = get_all_face_name_with_eigenvalue()
for person_name, eigenvalue_data in face_data.items():
# 兼容“numpy数组”和“字符串”格式的特征值
if isinstance(eigenvalue_data, np.ndarray):
eigenvalue = eigenvalue_data.astype(np.float32)
elif isinstance(eigenvalue_data, str):
# 清理字符串中的括号、换行等干扰符
cleaned = eigenvalue_data.replace("[", "").replace("]", "").replace("\n", "").strip()
# 分割并转换为浮点数数组
values = [v for v in cleaned.split() if v] # 兼容空格/逗号分隔
eigenvalue = np.array(list(map(float, values)), dtype=np.float32)
else:
print(f"不支持的特征值类型({type(eigenvalue_data)}),跳过 {person_name}")
continue
# 特征值归一化(保证后续相似度计算的一致性)
norm = np.linalg.norm(eigenvalue)
if norm != 0:
eigenvalue = eigenvalue / norm
_known_faces_embeddings[person_name] = eigenvalue
_known_faces_names.append(person_name)
print(f"成功加载 {len(_known_faces_names)} 个人脸的特征库")
except Exception as e:
print(f"加载人脸特征库失败: {e}")
return _face_app is not None
def detect(frame, similarity_threshold=0.4):
"""
检测并识别人脸
返回:(是否匹配到已知人脸, 结果描述字符串)
"""
global _face_app, _known_faces_embeddings, _known_faces_names, _ref_count, _last_used_time
# 校验输入帧有效性
if frame is None or frame.size == 0:
return (False, "无效的输入帧数据")
# 加锁并更新引用计数、最后使用时间
engine = None
with _lock:
_ref_count += 1
_last_used_time = time.time()
_debug_counter["detection_calls"] += 1
# 若引擎未初始化且未在释放中,尝试初始化
if not _face_app and not _is_releasing:
if not load_model(prefer_gpu=True):
# 初始化失败,恢复引用计数
with _lock:
_ref_count = max(0, _ref_count - 1)
return (False, "人脸引擎初始化失败")
engine = _face_app # 获取引擎引用
# 校验引擎可用性
if not engine or len(_known_faces_names) == 0:
with _lock:
_ref_count = max(0, _ref_count - 1)
return (False, "人脸引擎不可用或特征库为空")
try:
# GPU计算时确保帧数据是连续内存避免CUDA错误
if _using_gpu and engine is not None and not frame.flags.contiguous:
frame = np.ascontiguousarray(frame)
# 执行人脸检测与特征提取
faces = engine.get(frame)
except Exception as e:
print(f"人脸检测过程出错: {e}")
# 出错时尝试重新初始化引擎可能是GPU状态变化导致
print("尝试重新初始化人脸引擎...")
with _lock:
_ref_count = max(0, _ref_count - 1)
load_model(prefer_gpu=True)
return (False, f"检测错误: {str(e)}")
result_parts = []
has_matched_known_face = False # 是否有任意人脸匹配到已知库
for face in faces:
# 归一化当前检测到的人脸特征
face_embedding = face.embedding.astype(np.float32)
norm = np.linalg.norm(face_embedding)
if norm == 0:
continue
face_embedding = face_embedding / norm
# 与已知人脸特征逐一比对
max_similarity, best_match_name = -1.0, "Unknown"
for name in _known_faces_names:
known_emb = _known_faces_embeddings[name]
similarity = np.dot(face_embedding, known_emb) # 余弦相似度
if similarity > max_similarity:
max_similarity = similarity
best_match_name = name
# 判断是否匹配成功
is_matched = max_similarity >= similarity_threshold
if is_matched:
has_matched_known_face = True
# 记录该人脸的检测结果
bbox = face.bbox # 人脸边界框
result_parts.append(
f"{'匹配' if is_matched else '未匹配'}: {best_match_name} "
f"(相似度: {max_similarity:.2f}, 边界框: {bbox.astype(int).tolist()})"
)
# 构建最终结果字符串
result_str = "未检测到人脸" if not result_parts else "; ".join(result_parts)
# 释放引用计数(线程安全)
with _lock:
_ref_count = max(0, _ref_count - 1)
# 若仍有引用更新最后使用时间若引用为0也立即标记加快闲置检测
_last_used_time = time.time()
return (has_matched_known_face, result_str)