340 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			340 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import cv2
 | ||
| from io import BytesIO
 | ||
| from PIL import Image
 | ||
| from mysql.connector import Error as MySQLError
 | ||
| 
 | ||
| from ds.db import db
 | ||
| 
 | ||
| import numpy as np
 | ||
| import threading
 | ||
| from insightface.app import FaceAnalysis
 | ||
| 
 | ||
| # 全局变量定义
 | ||
| _insightface_app = None
 | ||
| _known_faces_embeddings = {}  # 存储已知人脸特征 {姓名: 特征向量}
 | ||
| _known_faces_names = []  # 存储已知人脸姓名列表
 | ||
| 
 | ||
| 
 | ||
| def init_insightface():
 | ||
|     """初始化InsightFace引擎"""
 | ||
|     global _insightface_app
 | ||
|     if _insightface_app is not None:
 | ||
|         print("InsightFace引擎已初始化,无需重复执行")
 | ||
|         return _insightface_app
 | ||
| 
 | ||
|     try:
 | ||
|         print("正在初始化 InsightFace 引擎(模型:buffalo_l)...")
 | ||
|         # 初始化引擎,指定模型路径和计算 providers
 | ||
|         app = FaceAnalysis(
 | ||
|             name='buffalo_l',
 | ||
|             root='~/.insightface',
 | ||
|             providers=['CPUExecutionProvider']  # 如需GPU可添加'CUDAExecutionProvider'
 | ||
|         )
 | ||
|         app.prepare(ctx_id=0, det_size=(640, 640))  # 调整检测尺寸
 | ||
|         print("InsightFace 引擎初始化完成")
 | ||
| 
 | ||
|         # 初始化时加载人脸特征库
 | ||
|         init_face_data()
 | ||
| 
 | ||
|         _insightface_app = app
 | ||
|         return app
 | ||
|     except Exception as e:
 | ||
|         print(f"InsightFace 初始化失败:{str(e)}")
 | ||
|         _insightface_app = None
 | ||
|         return None
 | ||
| 
 | ||
| 
 | ||
| def init_face_data():
 | ||
|     """初始化或更新人脸特征库(清空旧数据,避免重复)"""
 | ||
|     global _known_faces_embeddings, _known_faces_names
 | ||
|     # 清空原有数据,防止重复加载
 | ||
|     _known_faces_embeddings.clear()
 | ||
|     _known_faces_names.clear()
 | ||
| 
 | ||
|     try:
 | ||
|         face_data = get_all_face_name_with_eigenvalue()  # 假设该函数已定义
 | ||
|         print(f"已加载 {len(face_data)} 个人脸数据")
 | ||
|         for person_name, eigenvalue_data in face_data.items():
 | ||
|             # 解析特征值(支持numpy数组或字符串格式)
 | ||
|             if isinstance(eigenvalue_data, np.ndarray):
 | ||
|                 eigenvalue = eigenvalue_data.astype(np.float32)
 | ||
|             elif isinstance(eigenvalue_data, str):
 | ||
|                 # 增强字符串解析:支持逗号/空格分隔,清理特殊字符
 | ||
|                 cleaned = (eigenvalue_data
 | ||
|                            .replace("[", "").replace("]", "")
 | ||
|                            .replace("\n", "").replace(",", " ")
 | ||
|                            .strip())
 | ||
|                 values = [v for v in cleaned.split() if v]  # 过滤空字符串
 | ||
|                 if not values:
 | ||
|                     print(f"特征值解析失败(空值),跳过 {person_name}")
 | ||
|                     continue
 | ||
|                 eigenvalue = np.array(list(map(float, values)), dtype=np.float32)
 | ||
|             else:
 | ||
|                 print(f"不支持的特征值类型({type(eigenvalue_data)}),跳过 {person_name}")
 | ||
|                 continue
 | ||
| 
 | ||
|             # 特征值归一化(确保相似度计算一致性)
 | ||
|             norm = np.linalg.norm(eigenvalue)
 | ||
|             if norm == 0:
 | ||
|                 print(f"特征值为零向量,跳过 {person_name}")
 | ||
|                 continue
 | ||
|             eigenvalue = eigenvalue / norm
 | ||
| 
 | ||
|             # 更新全局特征库
 | ||
|             _known_faces_embeddings[person_name] = eigenvalue
 | ||
|             _known_faces_names.append(person_name)
 | ||
| 
 | ||
|         print(f"成功加载 {len(_known_faces_names)} 个人脸的特征库")
 | ||
|     except Exception as e:
 | ||
|         print(f"加载人脸特征库失败: {e}")
 | ||
| 
 | ||
| 
 | ||
| def update_face_data():
 | ||
|     """更新人脸特征库(清空旧数据,加载最新数据)"""
 | ||
|     global _known_faces_embeddings, _known_faces_names
 | ||
| 
 | ||
|     print("开始更新人脸特征库...")
 | ||
| 
 | ||
|     # 清空原有数据
 | ||
|     _known_faces_embeddings.clear()
 | ||
|     _known_faces_names.clear()
 | ||
| 
 | ||
|     try:
 | ||
|         # 获取最新人脸数据
 | ||
|         face_data = get_all_face_name_with_eigenvalue()
 | ||
|         print(f"获取到 {len(face_data)} 条最新人脸数据")
 | ||
| 
 | ||
|         # 处理并加载新数据(复用原有解析逻辑)
 | ||
|         for person_name, eigenvalue_data in face_data.items():
 | ||
|             # 解析特征值(支持numpy数组或字符串格式)
 | ||
|             if isinstance(eigenvalue_data, np.ndarray):
 | ||
|                 eigenvalue = eigenvalue_data.astype(np.float32)
 | ||
|             elif isinstance(eigenvalue_data, str):
 | ||
|                 # 增强字符串解析:支持逗号/空格分隔,清理特殊字符
 | ||
|                 cleaned = (eigenvalue_data
 | ||
|                            .replace("[", "").replace("]", "")
 | ||
|                            .replace("\n", "").replace(",", " ")
 | ||
|                            .strip())
 | ||
|                 values = [v for v in cleaned.split() if v]  # 过滤空字符串
 | ||
|                 if not values:
 | ||
|                     print(f"特征值解析失败(空值),跳过 {person_name}")
 | ||
|                     continue
 | ||
|                 eigenvalue = np.array(list(map(float, values)), dtype=np.float32)
 | ||
|             else:
 | ||
|                 print(f"不支持的特征值类型({type(eigenvalue_data)}),跳过 {person_name}")
 | ||
|                 continue
 | ||
| 
 | ||
|             # 特征值归一化(确保相似度计算一致性)
 | ||
|             norm = np.linalg.norm(eigenvalue)
 | ||
|             if norm == 0:
 | ||
|                 print(f"特征值为零向量,跳过 {person_name}")
 | ||
|                 continue
 | ||
|             eigenvalue = eigenvalue / norm
 | ||
| 
 | ||
|             # 更新全局特征库
 | ||
|             _known_faces_embeddings[person_name] = eigenvalue
 | ||
|             _known_faces_names.append(person_name)
 | ||
| 
 | ||
|         print(f"人脸特征库更新完成,共加载 {len(_known_faces_names)} 个人脸数据")
 | ||
|         return True  # 更新成功
 | ||
|     except Exception as e:
 | ||
|         print(f"人脸特征库更新失败: {e}")
 | ||
|         return False  # 更新失败
 | ||
| 
 | ||
| 
 | ||
| def detect(frame, similarity_threshold=0.4):
 | ||
|     global _insightface_app, _known_faces_embeddings
 | ||
| 
 | ||
|     # 校验输入有效性
 | ||
|     if frame is None or frame.size == 0:
 | ||
|         return (False, "无效的输入帧数据")
 | ||
| 
 | ||
|     # 校验引擎和特征库状态
 | ||
|     if not _insightface_app:
 | ||
|         return (False, "人脸引擎未初始化")
 | ||
|     if not _known_faces_embeddings:
 | ||
|         return (False, "人脸特征库为空")
 | ||
| 
 | ||
|     try:
 | ||
|         # 执行人脸检测与特征提取
 | ||
|         faces = _insightface_app.get(frame)
 | ||
|     except Exception as e:
 | ||
|         return (False, f"检测错误: {str(e)}")
 | ||
| 
 | ||
|     result_parts = []
 | ||
|     has_matched_known_face = False  # 是否有匹配到已知人脸
 | ||
| 
 | ||
|     for face in faces:
 | ||
|         # 归一化当前人脸特征
 | ||
|         face_embedding = face.embedding.astype(np.float32)
 | ||
|         norm = np.linalg.norm(face_embedding)
 | ||
|         if norm == 0:
 | ||
|             result_parts.append("检测到人脸但特征值为零向量(忽略)")
 | ||
|             continue
 | ||
|         face_embedding = face_embedding / norm
 | ||
| 
 | ||
|         # 与已知特征库比对
 | ||
|         max_similarity, best_match_name = -1.0, "Unknown"
 | ||
|         for name, known_emb in _known_faces_embeddings.items():
 | ||
|             similarity = np.dot(face_embedding, known_emb)  # 余弦相似度
 | ||
|             if similarity > max_similarity:
 | ||
|                 max_similarity = similarity
 | ||
|                 best_match_name = name
 | ||
| 
 | ||
|         # 判断是否匹配成功
 | ||
|         is_matched = max_similarity >= similarity_threshold
 | ||
| 
 | ||
|         if is_matched:
 | ||
|             has_matched_known_face = True
 | ||
| 
 | ||
|         # 记录结果(边界框转为整数列表)
 | ||
|         bbox = face.bbox.astype(int).tolist()
 | ||
|         result_parts.append(
 | ||
|             f"{'匹配' if is_matched else '未匹配'}: {best_match_name} "
 | ||
|             f"(相似度: {max_similarity:.2f}, 边界框: {bbox})"
 | ||
|         )
 | ||
| 
 | ||
|     # 构建最终结果
 | ||
|     result_str = "未检测到人脸" if not result_parts else "; ".join(result_parts)
 | ||
|     return (has_matched_known_face, result_str)
 | ||
| 
 | ||
| 
 | ||
| # 上传图片并提取特征
 | ||
| def add_binary_data(binary_data):
 | ||
|     """
 | ||
|     接收单张图片的二进制数据、提取特征并保存
 | ||
|     返回:(True, 特征值numpy数组) 或 (False, 错误信息字符串)
 | ||
|     """
 | ||
|     global _insightface_app, _feature_list
 | ||
| 
 | ||
|     # 1. 先检查引擎是否初始化成功
 | ||
|     if not _insightface_app:
 | ||
|         init_result = init_insightface()  # 尝试重新初始化
 | ||
|         if not init_result:
 | ||
|             error_msg = "InsightFace引擎未初始化、无法检测人脸"
 | ||
|             print(error_msg)
 | ||
|             return False, error_msg
 | ||
| 
 | ||
|     try:
 | ||
|         # 2. 验证二进制数据有效性
 | ||
|         if len(binary_data) < 1024:  # 过滤过小的无效图片(小于1KB)
 | ||
|             error_msg = f"图片过小({len(binary_data)}字节)、可能不是有效图片"
 | ||
|             print(error_msg)
 | ||
|             return False, error_msg
 | ||
| 
 | ||
|         # 3. 二进制数据转CV2格式(关键步骤、避免通道错误)
 | ||
|         try:
 | ||
|             img = Image.open(BytesIO(binary_data)).convert("RGB")  # 强制转RGB
 | ||
|             frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)  # InsightFace需要BGR格式
 | ||
|         except Exception as e:
 | ||
|             error_msg = f"图片格式转换失败:{str(e)}"
 | ||
|             print(error_msg)
 | ||
|             return False, error_msg
 | ||
| 
 | ||
|         # 4. 检查图片尺寸(避免极端尺寸导致检测失败)
 | ||
|         height, width = frame.shape[:2]
 | ||
|         if height < 64 or width < 64:  # 人脸检测最小建议尺寸
 | ||
|             error_msg = f"图片尺寸过小({width}x{height})、需至少64x64像素"
 | ||
|             print(error_msg)
 | ||
|             return False, error_msg
 | ||
| 
 | ||
|         # 5. 调用InsightFace检测人脸
 | ||
|         print(f"开始检测人脸(图片尺寸:{width}x{height}、格式:BGR)")
 | ||
|         faces = _insightface_app.get(frame)
 | ||
| 
 | ||
|         if not faces:
 | ||
|             error_msg = "未检测到人脸(请确保图片包含清晰正面人脸、无遮挡、不模糊)"
 | ||
|             print(error_msg)
 | ||
|             return False, error_msg
 | ||
| 
 | ||
|         # 6. 提取特征并保存
 | ||
|         current_feature = faces[0].embedding
 | ||
|         _feature_list.append(current_feature)
 | ||
|         print(f"人脸检测成功、提取特征值(维度:{current_feature.shape[0]})、累计特征数:{len(_feature_list)}")
 | ||
|         return True, current_feature
 | ||
| 
 | ||
|     except Exception as e:
 | ||
|         error_msg = f"处理图片时发生异常:{str(e)}"
 | ||
|         print(error_msg)
 | ||
|         return False, error_msg
 | ||
| 
 | ||
| 
 | ||
| # 获取数据库最新的人脸及其特征
 | ||
| def get_all_face_name_with_eigenvalue() -> dict:
 | ||
|     conn = None
 | ||
|     cursor = None
 | ||
|     try:
 | ||
|         conn = db.get_connection()
 | ||
|         cursor = conn.cursor(dictionary=True)
 | ||
|         query = "SELECT name, eigenvalue FROM face WHERE name IS NOT NULL"
 | ||
|         cursor.execute(query)
 | ||
|         faces = cursor.fetchall()
 | ||
| 
 | ||
|         name_to_eigenvalues = {}
 | ||
|         for face in faces:
 | ||
|             name = face["name"]
 | ||
|             eigenvalue = face["eigenvalue"]
 | ||
|             if name in name_to_eigenvalues:
 | ||
|                 name_to_eigenvalues[name].append(eigenvalue)
 | ||
|             else:
 | ||
|                 name_to_eigenvalues[name] = [eigenvalue]
 | ||
| 
 | ||
|         face_dict = {}
 | ||
|         for name, eigenvalues in name_to_eigenvalues.items():
 | ||
|             if len(eigenvalues) > 1:
 | ||
|                 face_dict[name] = get_average_feature(eigenvalues)
 | ||
|             else:
 | ||
|                 face_dict[name] = eigenvalues[0]
 | ||
| 
 | ||
|         return face_dict
 | ||
| 
 | ||
|     except MySQLError as e:
 | ||
|         raise Exception(f"获取人脸特征失败: {str(e)}") from e
 | ||
|     finally:
 | ||
|         db.close_connection(conn, cursor)
 | ||
| 
 | ||
| 
 | ||
| # 获取平均特征值
 | ||
| def get_average_feature(features=None):
 | ||
|     global _feature_list
 | ||
|     try:
 | ||
|         if features is None:
 | ||
|             features = _feature_list
 | ||
|         if not isinstance(features, list) or len(features) == 0:
 | ||
|             print("输入必须是包含至少一个特征值的列表")
 | ||
|             return None
 | ||
| 
 | ||
|         processed_features = []
 | ||
|         for i, embedding in enumerate(features):
 | ||
|             try:
 | ||
|                 if isinstance(embedding, str):
 | ||
|                     embedding_str = embedding.replace('[', '').replace(']', '').replace(',', ' ').strip()
 | ||
|                     embedding_list = [float(num) for num in embedding_str.split() if num.strip()]
 | ||
|                     embedding_np = np.array(embedding_list, dtype=np.float32)
 | ||
|                 else:
 | ||
|                     embedding_np = np.array(embedding, dtype=np.float32)
 | ||
| 
 | ||
|                 if len(embedding_np.shape) == 1:
 | ||
|                     processed_features.append(embedding_np)
 | ||
|                     print(f"已添加第 {i + 1} 个特征值用于计算平均值")
 | ||
|                 else:
 | ||
|                     print(f"跳过第 {i + 1} 个特征值:不是一维数组")
 | ||
|             except Exception as e:
 | ||
|                 print(f"处理第 {i + 1} 个特征值时出错:{str(e)}")
 | ||
| 
 | ||
|         if not processed_features:
 | ||
|             print("没有有效的特征值用于计算平均值")
 | ||
|             return None
 | ||
| 
 | ||
|         dims = {feat.shape[0] for feat in processed_features}
 | ||
|         if len(dims) > 1:
 | ||
|             print(f"特征值维度不一致:{dims}、无法计算平均值")
 | ||
|             return None
 | ||
| 
 | ||
|         avg_feature = np.mean(processed_features, axis=0)
 | ||
|         print(f"计算成功:{len(processed_features)} 个特征值的平均向量(维度:{avg_feature.shape[0]})")
 | ||
|         return avg_feature
 | ||
|     except Exception as e:
 | ||
|         print(f"计算平均特征值出错:{str(e)}")
 | ||
|         return None
 |