import cv2 from io import BytesIO from PIL import Image from mysql.connector import Error as MySQLError from ds.db import db import numpy as np import threading from insightface.app import FaceAnalysis # 全局变量定义 _insightface_app = None _known_faces_embeddings = {} # 存储已知人脸特征 {姓名: 特征向量} _known_faces_names = [] # 存储已知人脸姓名列表 def init_insightface(): """初始化InsightFace引擎""" global _insightface_app if _insightface_app is not None: print("InsightFace引擎已初始化,无需重复执行") return _insightface_app try: print("正在初始化 InsightFace 引擎(模型:buffalo_l)...") # 初始化引擎,指定模型路径和计算 providers app = FaceAnalysis( name='buffalo_l', root='~/.insightface', providers=['CPUExecutionProvider'] # 如需GPU可添加'CUDAExecutionProvider' ) app.prepare(ctx_id=0, det_size=(640, 640)) # 调整检测尺寸 print("InsightFace 引擎初始化完成") # 初始化时加载人脸特征库 init_face_data() _insightface_app = app return app except Exception as e: print(f"InsightFace 初始化失败:{str(e)}") _insightface_app = None return None def init_face_data(): """初始化或更新人脸特征库(清空旧数据,避免重复)""" global _known_faces_embeddings, _known_faces_names # 清空原有数据,防止重复加载 _known_faces_embeddings.clear() _known_faces_names.clear() try: face_data = get_all_face_name_with_eigenvalue() # 假设该函数已定义 print(f"已加载 {len(face_data)} 个人脸数据") for person_name, eigenvalue_data in face_data.items(): # 解析特征值(支持numpy数组或字符串格式) if isinstance(eigenvalue_data, np.ndarray): eigenvalue = eigenvalue_data.astype(np.float32) elif isinstance(eigenvalue_data, str): # 增强字符串解析:支持逗号/空格分隔,清理特殊字符 cleaned = (eigenvalue_data .replace("[", "").replace("]", "") .replace("\n", "").replace(",", " ") .strip()) values = [v for v in cleaned.split() if v] # 过滤空字符串 if not values: print(f"特征值解析失败(空值),跳过 {person_name}") continue eigenvalue = np.array(list(map(float, values)), dtype=np.float32) else: print(f"不支持的特征值类型({type(eigenvalue_data)}),跳过 {person_name}") continue # 特征值归一化(确保相似度计算一致性) norm = np.linalg.norm(eigenvalue) if norm == 0: print(f"特征值为零向量,跳过 {person_name}") continue eigenvalue = eigenvalue / norm # 更新全局特征库 _known_faces_embeddings[person_name] = eigenvalue _known_faces_names.append(person_name) print(f"成功加载 {len(_known_faces_names)} 个人脸的特征库") except Exception as e: print(f"加载人脸特征库失败: {e}") def update_face_data(): """更新人脸特征库(清空旧数据,加载最新数据)""" global _known_faces_embeddings, _known_faces_names print("开始更新人脸特征库...") # 清空原有数据 _known_faces_embeddings.clear() _known_faces_names.clear() try: # 获取最新人脸数据 face_data = get_all_face_name_with_eigenvalue() print(f"获取到 {len(face_data)} 条最新人脸数据") # 处理并加载新数据(复用原有解析逻辑) for person_name, eigenvalue_data in face_data.items(): # 解析特征值(支持numpy数组或字符串格式) if isinstance(eigenvalue_data, np.ndarray): eigenvalue = eigenvalue_data.astype(np.float32) elif isinstance(eigenvalue_data, str): # 增强字符串解析:支持逗号/空格分隔,清理特殊字符 cleaned = (eigenvalue_data .replace("[", "").replace("]", "") .replace("\n", "").replace(",", " ") .strip()) values = [v for v in cleaned.split() if v] # 过滤空字符串 if not values: print(f"特征值解析失败(空值),跳过 {person_name}") continue eigenvalue = np.array(list(map(float, values)), dtype=np.float32) else: print(f"不支持的特征值类型({type(eigenvalue_data)}),跳过 {person_name}") continue # 特征值归一化(确保相似度计算一致性) norm = np.linalg.norm(eigenvalue) if norm == 0: print(f"特征值为零向量,跳过 {person_name}") continue eigenvalue = eigenvalue / norm # 更新全局特征库 _known_faces_embeddings[person_name] = eigenvalue _known_faces_names.append(person_name) print(f"人脸特征库更新完成,共加载 {len(_known_faces_names)} 个人脸数据") return True # 更新成功 except Exception as e: print(f"人脸特征库更新失败: {e}") return False # 更新失败 def detect(frame, similarity_threshold=0.4): global _insightface_app, _known_faces_embeddings # 校验输入有效性 if frame is None or frame.size == 0: return (False, "无效的输入帧数据") # 校验引擎和特征库状态 if not _insightface_app: return (False, "人脸引擎未初始化") if not _known_faces_embeddings: return (False, "人脸特征库为空") try: # 执行人脸检测与特征提取 faces = _insightface_app.get(frame) except Exception as e: return (False, f"检测错误: {str(e)}") result_parts = [] has_matched_known_face = False # 是否有匹配到已知人脸 for face in faces: # 归一化当前人脸特征 face_embedding = face.embedding.astype(np.float32) norm = np.linalg.norm(face_embedding) if norm == 0: result_parts.append("检测到人脸但特征值为零向量(忽略)") continue face_embedding = face_embedding / norm # 与已知特征库比对 max_similarity, best_match_name = -1.0, "Unknown" for name, known_emb in _known_faces_embeddings.items(): similarity = np.dot(face_embedding, known_emb) # 余弦相似度 if similarity > max_similarity: max_similarity = similarity best_match_name = name # 判断是否匹配成功 is_matched = max_similarity >= similarity_threshold if is_matched: has_matched_known_face = True # 记录结果(边界框转为整数列表) bbox = face.bbox.astype(int).tolist() result_parts.append( f"{'匹配' if is_matched else '未匹配'}: {best_match_name} " f"(相似度: {max_similarity:.2f}, 边界框: {bbox})" ) # 构建最终结果 result_str = "未检测到人脸" if not result_parts else "; ".join(result_parts) return (has_matched_known_face, result_str) # 上传图片并提取特征 def add_binary_data(binary_data): """ 接收单张图片的二进制数据、提取特征并保存 返回:(True, 特征值numpy数组) 或 (False, 错误信息字符串) """ global _insightface_app, _feature_list # 1. 先检查引擎是否初始化成功 if not _insightface_app: init_result = init_insightface() # 尝试重新初始化 if not init_result: error_msg = "InsightFace引擎未初始化、无法检测人脸" print(error_msg) return False, error_msg try: # 2. 验证二进制数据有效性 if len(binary_data) < 1024: # 过滤过小的无效图片(小于1KB) error_msg = f"图片过小({len(binary_data)}字节)、可能不是有效图片" print(error_msg) return False, error_msg # 3. 二进制数据转CV2格式(关键步骤、避免通道错误) try: img = Image.open(BytesIO(binary_data)).convert("RGB") # 强制转RGB frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) # InsightFace需要BGR格式 except Exception as e: error_msg = f"图片格式转换失败:{str(e)}" print(error_msg) return False, error_msg # 4. 检查图片尺寸(避免极端尺寸导致检测失败) height, width = frame.shape[:2] if height < 64 or width < 64: # 人脸检测最小建议尺寸 error_msg = f"图片尺寸过小({width}x{height})、需至少64x64像素" print(error_msg) return False, error_msg # 5. 调用InsightFace检测人脸 print(f"开始检测人脸(图片尺寸:{width}x{height}、格式:BGR)") faces = _insightface_app.get(frame) if not faces: error_msg = "未检测到人脸(请确保图片包含清晰正面人脸、无遮挡、不模糊)" print(error_msg) return False, error_msg # 6. 提取特征并保存 current_feature = faces[0].embedding _feature_list.append(current_feature) print(f"人脸检测成功、提取特征值(维度:{current_feature.shape[0]})、累计特征数:{len(_feature_list)}") return True, current_feature except Exception as e: error_msg = f"处理图片时发生异常:{str(e)}" print(error_msg) return False, error_msg # 获取数据库最新的人脸及其特征 def get_all_face_name_with_eigenvalue() -> dict: conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) query = "SELECT name, eigenvalue FROM face WHERE name IS NOT NULL" cursor.execute(query) faces = cursor.fetchall() name_to_eigenvalues = {} for face in faces: name = face["name"] eigenvalue = face["eigenvalue"] if name in name_to_eigenvalues: name_to_eigenvalues[name].append(eigenvalue) else: name_to_eigenvalues[name] = [eigenvalue] face_dict = {} for name, eigenvalues in name_to_eigenvalues.items(): if len(eigenvalues) > 1: face_dict[name] = get_average_feature(eigenvalues) else: face_dict[name] = eigenvalues[0] return face_dict except MySQLError as e: raise Exception(f"获取人脸特征失败: {str(e)}") from e finally: db.close_connection(conn, cursor) # 获取平均特征值 def get_average_feature(features=None): global _feature_list try: if features is None: features = _feature_list if not isinstance(features, list) or len(features) == 0: print("输入必须是包含至少一个特征值的列表") return None processed_features = [] for i, embedding in enumerate(features): try: if isinstance(embedding, str): embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip() embedding_list = [float(num) for num in embedding_str.split() if num.strip()] embedding_np = np.array(embedding_list, dtype=np.float32) else: embedding_np = np.array(embedding, dtype=np.float32) if len(embedding_np.shape) == 1: processed_features.append(embedding_np) print(f"已添加第 {i + 1} 个特征值用于计算平均值") else: print(f"跳过第 {i + 1} 个特征值:不是一维数组") except Exception as e: print(f"处理第 {i + 1} 个特征值时出错:{str(e)}") if not processed_features: print("没有有效的特征值用于计算平均值") return None dims = {feat.shape[0] for feat in processed_features} if len(dims) > 1: print(f"特征值维度不一致:{dims}、无法计算平均值") return None avg_feature = np.mean(processed_features, axis=0) print(f"计算成功:{len(processed_features)} 个特征值的平均向量(维度:{avg_feature.shape[0]})") return avg_feature except Exception as e: print(f"计算平均特征值出错:{str(e)}") return None