From c74eaf8836c382ee85781b3401019b713fbc3fc7 Mon Sep 17 00:00:00 2001 From: Mashiro50070 <1251294066@qq.com> Date: Tue, 29 Jul 2025 18:15:35 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8D=95=E6=9E=84=E5=BB=BA=E9=AB=98?= =?UTF-8?q?=E5=B9=B6=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Milvus基本配置.md | 28 +++++++ app/__init__.py | 0 app/api.py | 100 ++++++++++++++++++++++ app/config.py | 14 ++++ app/database.py | 32 +++++++ app/main.py | 30 +++++++ app/milvus_helpers.py | 68 +++++++++++++++ app/schemas.py | 74 ++++++++++++++++ app/services.py | 191 ++++++++++++++++++++++++++++++++++++++++++ psql基本配置.md | 14 ++++ requirements.txt | 16 ++++ 11 files changed, 567 insertions(+) create mode 100644 Milvus基本配置.md create mode 100644 app/__init__.py create mode 100644 app/api.py create mode 100644 app/config.py create mode 100644 app/database.py create mode 100644 app/main.py create mode 100644 app/milvus_helpers.py create mode 100644 app/schemas.py create mode 100644 app/services.py create mode 100644 psql基本配置.md create mode 100644 requirements.txt diff --git a/Milvus基本配置.md b/Milvus基本配置.md new file mode 100644 index 0000000..d6ea2c0 --- /dev/null +++ b/Milvus基本配置.md @@ -0,0 +1,28 @@ +# Milvus安装方法 + +## 通过podman安装Milvus + +### 安装podman的方法 +1. 通过基本命令来安装podman + `sudo apt-get update` -- 更新Ubuntu现有环境 + `sudo apt-get -y install podman` -- 安装podman + `podman info` -- 确认podman安装成功 + +### 安装Milvus的方法 +1. 通过daocloud拉去官方的Milvus镜像 + `sudo podman pull docker.m.daocloud.io/milvusdb/milvus:v2.5.15` -- 国内镜像拉取方法 + +2. 创建docker=podman的系统永久映射 + `echo "alias docker=podman" >> ~/.bashrc` -- 创建映射 + `source ~/.bashrc` -- 刷新bash + +3. 启动Milvus服务 + `sed 's/docker/podman/g' milvus_embed.sh > milvus_embed_podman.sh` -- 将脚本中的docker语句替换成docker/podman语句以实现兼容 + `chmod +x milvus_embed_podman.sh` -- 设置权限 + + `mkdir -p "$(pwd)/volumes/milvus"` -- 创建宿主机侧路径 + + - 脚本里 -v $(pwd)/volumes/milvus:/var/lib/milvus 要求宿主机侧路径必须事先存在,Podman 不会像 Docker 那样自动创建。 + + `./milvus_embed_podman.sh start` -- 启动服务脚本 + `./milvus_embed_podman.sh stop` -- 停止服务脚本 diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api.py b/app/api.py new file mode 100644 index 0000000..dbfb6cf --- /dev/null +++ b/app/api.py @@ -0,0 +1,100 @@ + +from fastapi import APIRouter, HTTPException, Body, Depends +from . import schemas +from .services import face_service, FaceRecognitionService + +router = APIRouter() + +# Dependency to get the service instance +def get_face_service(): + return face_service + +@router.post("/register", response_model=schemas.RegisterResponse, summary="注册新的人脸") +async def register(request: schemas.RegisterRequest, service: FaceRecognitionService = Depends(get_face_service)): + """ + 为用户注册一张新的人脸。元数据存入PostgreSQL,向量存入Milvus。 + + - **功能**: 将一张图片中的人脸与一个用户ID和姓名关联起来。 + - **校验**: + - 检查用户ID是否已被他人注册。 + - 确保图片中只包含一张清晰可识别的人脸。 + - **操作**: 如果是新用户,会在PostgreSQL创建用户记录;然后将提取到的人脸特征向量存入Milvus。 + """ + try: + if not request.url and not request.face_data: + raise ValueError("必须提供图片的url或face_data。") + user_info = await service.register_new_face(request.id, request.name, request) + return schemas.RegisterResponse(data=user_info) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}") + +@router.post("/detect", response_model=schemas.DetectResponse, summary="1:N 人脸识别 (Milvus)") +async def detect(request: schemas.ImageSource = Body(...), service: FaceRecognitionService = Depends(get_face_service)): + """ + 在一张图片中检测并识别所有已知的人脸。使用Milvus进行高性能向量搜索。 + + - **功能**: 对比图片中的人脸与Milvus中的所有人脸,返回匹配结果。 + - **阈值**: 内部使用相似度阈值来判断是否匹配成功。 + - **返回**: 返回一个列表,包含所有被识别出的人员信息、位置和置信度。 + """ + try: + if not request.url and not request.face_data: + raise ValueError("必须提供图片的url或face_data。") + detected_faces = await service.detect_faces(request) + return schemas.DetectResponse(data=detected_faces) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}") + +@router.post("/verify", response_model=schemas.VerifyResponse, summary="1:1 人脸认证 (Milvus)") +async def verify(request: schemas.VerifyRequest, service: FaceRecognitionService = Depends(get_face_service)): + """ + 验证一张图片中的人脸是否属于指定的用户ID。在Milvus中进行限定范围的向量搜索。 + + - **功能**: 精确比对,判断“这张脸是不是这个人”。 + - **场景**: 用于人脸登录、刷脸支付等高安全要求的场景。 + - **返回**: 返回布尔值 `match` 表示是否匹配,以及具体的相似度 `confidence`。 + """ + try: + if not request.url and not request.face_data: + raise ValueError("必须提供图片的url或face_data。") + verification_result = await service.verify_face(request.id, request) + return schemas.VerifyResponse(data=verification_result) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}") + +@router.delete("/users/{user_id}", response_model=schemas.StandardResponse, summary="删除用户 (同步删除PG和Milvus)") +async def delete_user(user_id: int, service: FaceRecognitionService = Depends(get_face_service)): + """ + 根据用户ID,从PostgreSQL和Milvus中同步删除该用户的所有数据。 + """ + try: + deleted_user = await service.delete_user(user_id) + return schemas.StandardResponse(message=f"成功删除用户 {deleted_user['name']} (ID: {user_id})。") + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}") + +@router.get("/users/{user_id}", response_model=schemas.RegisterResponse, summary="获取单个用户信息") +async def get_user(user_id: int, service: FaceRecognitionService = Depends(get_face_service)): + """ + 根据用户ID,查询并返回用户的详细信息(元数据来自PG,人脸数量来自Milvus)。 + """ + user_info = await service.get_user(user_id) + if not user_info: + raise HTTPException(status_code=404, detail=f"ID为 {user_id} 的用户不存在。") + return schemas.RegisterResponse(data=user_info) + +@router.get("/users", response_model=schemas.UserListResponse, summary="获取所有用户列表") +async def list_users(skip: int = 0, limit: int = 100, service: FaceRecognitionService = Depends(get_face_service)): + """ + 获取数据库中所有已注册用户的列表,支持分页(元数据来自PG,人脸数量来自Milvus)。 + """ + users_list = await service.list_all_users(skip=skip, limit=limit) + return schemas.UserListResponse(data=users_list) diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..07e5de8 --- /dev/null +++ b/app/config.py @@ -0,0 +1,14 @@ + +from pydantic import BaseModel + +class Settings(BaseModel): + # PostgreSQL 数据库连接URL + # 格式: postgresql+asyncpg://user:password@host:port/database + DATABASE_URL: str = "postgresql+asyncpg://postgres:123456@localhost:5432/face_db" + + # Milvus 连接信息 (将在第二阶段使用) + MILVUS_HOST: str = "localhost" + MILVUS_PORT: int = 19530 + +# 创建一个全局配置实例 +settings = Settings() diff --git a/app/database.py b/app/database.py new file mode 100644 index 0000000..2d2ef2f --- /dev/null +++ b/app/database.py @@ -0,0 +1,32 @@ +import databases +import sqlalchemy +from .config import settings + +# 使用 databases 库来提供异步连接池 +database = databases.Database(settings.DATABASE_URL) + +# 使用 SQLAlchemy Core 定义表结构 (元数据) +metadata = sqlalchemy.MetaData() + +users = sqlalchemy.Table( + "users", + metadata, + sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True), + sqlalchemy.Column("name", sqlalchemy.String, nullable=False), +) + +face_features = sqlalchemy.Table( + "face_features", + metadata, + sqlalchemy.Column("feature_id", sqlalchemy.Integer, primary_key=True, autoincrement=True), + sqlalchemy.Column("user_id", sqlalchemy.Integer, sqlalchemy.ForeignKey("users.id", ondelete="CASCADE"), nullable=False), + sqlalchemy.Column("embedding", sqlalchemy.LargeBinary, nullable=False), + sqlalchemy.Index("ix_face_features_user_id", "user_id"), +) + +# 创建一个引擎,用于在启动时创建表 +engine = sqlalchemy.create_engine(settings.DATABASE_URL.replace("+asyncpg", "")) + +def create_tables(): + """创建所有定义的表""" + metadata.create_all(engine) \ No newline at end of file diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..724dda8 --- /dev/null +++ b/app/main.py @@ -0,0 +1,30 @@ + +from fastapi import FastAPI +from .api import router as api_router + +from .database import database, create_tables + +app = FastAPI( + title="人脸识别服务 V2 - PostgreSQL版", + description="一个使用PostgreSQL和Milvus的高性能人脸识别服务。", + version="2.1.0" +) + +@app.on_event("startup") +async def on_startup(): + """应用启动时,连接数据库并创建表""" + await database.connect() + create_tables() # SQLAlchemy的create_all是同步的,所以这样调用没问题 + +@app.on_event("shutdown") +async def on_shutdown(): + """应用关闭时,断开数据库连接""" + await database.disconnect() + + +# 挂载API路由 +app.include_router(api_router, prefix="/api", tags=["Face Recognition"]) + +@app.get("/", summary="服务健康检查") +def read_root(): + return {"status": "ok", "message": "欢迎使用人脸识别服务 V2"} diff --git a/app/milvus_helpers.py b/app/milvus_helpers.py new file mode 100644 index 0000000..209e0af --- /dev/null +++ b/app/milvus_helpers.py @@ -0,0 +1,68 @@ + +from pymilvus import connections, utility, Collection, CollectionSchema, FieldSchema, DataType +from .config import settings + +# 定义集合名称和相关参数 +COLLECTION_NAME = "face_features_collection" +FACE_VECTOR_DIM = 512 # InsightFace 'buffalo_l' 模型的特征维度 + +class MilvusHelper: + def __init__(self): + try: + # 连接 Milvus 服务 + connections.connect("default", host=settings.MILVUS_HOST, port=settings.MILVUS_PORT) + print("成功连接到 Milvus 服务。") + except Exception as e: + print(f"连接 Milvus 服务失败: {e}") + raise + + def has_collection(self): + """检查集合是否存在""" + return utility.has_collection(COLLECTION_NAME) + + def create_collection(self): + """创建一个新的集合来存储人脸特征""" + if self.has_collection(): + print(f"集合 '{COLLECTION_NAME}' 已存在。") + return + + # 定义字段 + # 主键字段,Milvus 会自动生成ID + pk_field = FieldSchema(name="feature_id", dtype=DataType.INT64, is_primary=True, auto_id=True) + # 对应的用户ID字段 + user_id_field = FieldSchema(name="user_id", dtype=DataType.INT64) + # 人脸特征向量字段 + embedding_field = FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=FACE_VECTOR_DIM) + + # 创建集合 Schema + schema = CollectionSchema( + fields=[pk_field, user_id_field, embedding_field], + description="人脸识别特征集合", + enable_dynamic_field=False + ) + + # 创建集合 + self.collection = Collection(name=COLLECTION_NAME, schema=schema) + print(f"集合 '{COLLECTION_NAME}' 创建成功。") + + # 为向量字段创建索引以加速搜索 + index_params = { + "metric_type": "IP", # IP (Inner Product) 等价于归一化向量的余弦相似度 + "index_type": "IVF_FLAT", + "params": {"nlist": 1024} # nlist 的值需要根据数据量调整 + } + self.collection.create_index(field_name="embedding", index_params=index_params) + print("向量索引创建成功。") + return self.collection + + def get_collection(self): + """获取集合对象并加载到内存中以便搜索""" + if not self.has_collection(): + self.create_collection() + collection = Collection(COLLECTION_NAME) + collection.load() + return collection + +# 在模块加载时创建一个全局实例 +milvus_client = MilvusHelper() +collection = milvus_client.get_collection() diff --git a/app/schemas.py b/app/schemas.py new file mode 100644 index 0000000..7d5f699 --- /dev/null +++ b/app/schemas.py @@ -0,0 +1,74 @@ + +from pydantic import BaseModel, Field +from typing import Optional, List + +# =================================================================== +# 基础模型 (Base Models) +# =================================================================== + +class UserInfo(BaseModel): + """用户的基本信息""" + id: int = Field(..., description="用户的唯一ID", example=1001) + name: str = Field(..., description="用户的姓名", example="张三") + registered_faces_count: int = Field(..., description="该用户已注册的人脸数量", example=2) + +class FaceLocation(BaseModel): + """人脸在图片中的位置和尺寸""" + x: int = Field(..., description="人脸框左上角的X坐标") + y: int = Field(..., description="人脸框左上角的Y坐标") + width: int = Field(..., description="人脸框的宽度") + height: int = Field(..., description="人脸框的高度") + +# =================================================================== +# API 请求模型 (Request Models) +# =================================================================== + +class ImageSource(BaseModel): + """图片来源,可以是URL或Base64编码的数据""" + url: Optional[str] = Field(None, description="图片的URL地址", example="http://example.com/image.jpg") + face_data: Optional[str] = Field(None, description="图片的Base64编码字符串") + +class RegisterRequest(ImageSource): + """注册新用户的请求体""" + id: int = Field(..., description="要注册用户的唯一ID", example=1001) + name: str = Field(..., description="要注册用户的姓名", example="张三") + +class VerifyRequest(ImageSource): + """1:1人脸认证的请求体""" + id: int = Field(..., description="要验证的用户ID", example=1001) + +# =================================================================== +# API 响应模型 (Response Models) +# =================================================================== + +class StandardResponse(BaseModel): + """标准API响应模型""" + code: int = Field(0, description="响应码,0为成功,非0为失败", example=0) + message: str = Field("success", description="响应消息", example="操作成功") + data: Optional[dict] = None + +class UserListResponse(StandardResponse): + """获取用户列表的响应""" + data: List[UserInfo] + +class RegisterResponse(StandardResponse): + """注册成功后的响应""" + data: UserInfo + +class VerificationResult(BaseModel): + """1:1认证结果""" + match: bool = Field(..., description="是否匹配") + confidence: float = Field(..., description="置信度 (0.0 to 1.0)") + +class VerifyResponse(StandardResponse): + """1:1人脸认证的响应""" + data: VerificationResult + +class DetectedFace(UserInfo): + """1:N识别结果中的单个人脸信息""" + location: FaceLocation + confidence: float = Field(..., description="识别的置信度") + +class DetectResponse(StandardResponse): + """1:N人脸识别的响应""" + data: List[DetectedFace] diff --git a/app/services.py b/app/services.py new file mode 100644 index 0000000..ef60f9e --- /dev/null +++ b/app/services.py @@ -0,0 +1,191 @@ + +import cv2 +import numpy as np +from insightface.app import FaceAnalysis +import base64 +import requests +from .database import database, users +from .milvus_helpers import collection as milvus_collection + +class FaceRecognitionService: + """封装所有核心人脸识别和数据库操作的业务逻辑""" + def __init__(self): + self.app = FaceAnalysis(name="buffalo_l", providers=['CPUExecutionProvider']) + self.app.prepare(ctx_id=0, det_size=(640, 640)) + + def _decode_image(self, image_source): + """私有方法,从URL或Base64解码图片""" + img = None + if image_source.face_data: + try: + img_data = base64.b64decode(image_source.face_data) + nparr = np.frombuffer(img_data, np.uint8) + img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + except Exception as e: + raise ValueError(f"Base64解码失败: {e}") + elif image_source.url: + try: + response = requests.get(image_source.url, timeout=10) + response.raise_for_status() + img_array = np.asarray(bytearray(response.content), dtype=np.uint8) + img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) + except Exception as e: + raise ValueError(f"从URL获取图片失败: {e}") + + if img is None: + raise ValueError("无法加载图片") + return img + + async def register_new_face(self, user_id: int, name: str, image_source): + """注册新的人脸,元数据存入PostgreSQL,向量存入Milvus""" + query = users.select().where(users.c.id == user_id) + user = await database.fetch_one(query) + + if user and user.name != name: + raise ValueError(f"ID {user_id} 已被 '{user.name}' 注册,无法更改为 '{name}'。") + + image = self._decode_image(image_source) + faces = self.app.get(image) + + if not faces: + raise ValueError("图片中未检测到人脸。") + if len(faces) > 1: + raise ValueError("注册图片中只能包含一张人脸。") + + embedding = faces[0].embedding + + async with database.transaction(): + if not user: + query = users.insert().values(id=user_id, name=name) + await database.execute(query) + + data_to_insert = [[user_id], [embedding]] + milvus_collection.insert(data_to_insert) + milvus_collection.flush() + + count_expr = f"user_id == {user_id}" + count = milvus_collection.query(expr=count_expr, output_fields=["user_id"]) + + return {"id": user_id, "name": name, "registered_faces_count": len(count)} + + async def detect_faces(self, image_source): + """在Milvus中进行1:N人脸识别""" + image = self._decode_image(image_source) + detected_faces = self.app.get(image) + + if not detected_faces: + return [] + + search_vectors = [face.embedding for face in detected_faces] + search_params = {"metric_type": "IP", "params": {"nprobe": 10}} + + results = milvus_collection.search( + data=search_vectors, anns_field="embedding", param=search_params, + limit=1, output_fields=["user_id"] + ) + + final_results = [] + user_ids_to_fetch = {hits[0].entity.get('user_id') for hits in results if hits} + + if not user_ids_to_fetch: + return [] + + # 批量查询用户信息,提高效率 + user_query = users.select().where(users.c.id.in_(user_ids_to_fetch)) + user_records = await database.fetch_all(user_query) + user_map = {user.id: user.name for user in user_records} + + for i, hits in enumerate(results): + if not hits: + continue + best_hit = hits[0] + similarity = best_hit.distance + + RECOGNITION_THRESHOLD = 0.5 + if similarity > RECOGNITION_THRESHOLD: + matched_user_id = best_hit.entity.get('user_id') + user_name = user_map.get(matched_user_id) + + if user_name: + count_expr = f"user_id == {matched_user_id}" + count_res = milvus_collection.query(expr=count_expr, output_fields=["user_id"]) + x1, y1, x2, y2 = detected_faces[i].bbox.astype(int) + final_results.append({ + "id": matched_user_id, + "name": user_name, + "registered_faces_count": len(count_res), + "confidence": float(similarity), + "location": {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1} + }) + return final_results + + async def verify_face(self, user_id: int, image_source): + """在Milvus中进行1:1人脸认证""" + image = self._decode_image(image_source) + detected_faces = self.app.get(image) + + if not detected_faces: + raise ValueError("图片中未检测到人脸。") + if len(detected_faces) > 1: + raise ValueError("用于认证的图片中只能包含一张人脸。") + + embedding = detected_faces[0].embedding + + search_params = {"metric_type": "IP", "params": {"nprobe": 10}} + # 在指定user_id的范围内进行搜索 + expr = f"user_id == {user_id}" + results = milvus_collection.search( + data=[embedding], anns_field="embedding", param=search_params, + limit=1, expr=expr, output_fields=["user_id"] + ) + + best_similarity = 0.0 + if results and results[0]: + best_similarity = results[0][0].distance + + VERIFICATION_THRESHOLD = 0.6 + match = bool(best_similarity > VERIFICATION_THRESHOLD) + + return {"match": match, "confidence": float(best_similarity)} + + async def delete_user(self, user_id: int): + """从PostgreSQL和Milvus中删除用户""" + query = users.select().where(users.c.id == user_id) + user = await database.fetch_one(query) + if not user: + raise ValueError(f"ID为 {user_id} 的用户不存在。") + + async with database.transaction(): + delete_query = users.delete().where(users.c.id == user_id) + await database.execute(delete_query) + + # 从 Milvus 删除 + expr = f"user_id == {user_id}" + milvus_collection.delete(expr) + + return {"id": user.id, "name": user.name} + + async def get_user(self, user_id: int): + """获取单个用户的元数据和在Milvus中的人脸数量""" + query = users.select().where(users.c.id == user_id) + user = await database.fetch_one(query) + if not user: + return None + + count_expr = f"user_id == {user_id}" + count_res = milvus_collection.query(expr=count_expr, output_fields=["user_id"]) + return {"id": user.id, "name": user.name, "registered_faces_count": len(count_res)} + + async def list_all_users(self, skip: int = 0, limit: int = 100): + """列出所有用户,并从Milvus统计人脸数量""" + query = users.select().offset(skip).limit(limit) + user_records = await database.fetch_all(query) + + results = [] + for user in user_records: + count_expr = f"user_id == {user.id}" + count_res = milvus_collection.query(expr=count_expr, output_fields=["user_id"]) + results.append({"id": user.id, "name": user.name, "registered_faces_count": len(count_res)}) + return results + +face_service = FaceRecognitionService() diff --git a/psql基本配置.md b/psql基本配置.md new file mode 100644 index 0000000..ffd4834 --- /dev/null +++ b/psql基本配置.md @@ -0,0 +1,14 @@ +# Post GRE SQL 数据库命令记录 + +## 安装好后默认创建一个超级用户名叫postgre,通过以下命令显示地用postgre登录psql,没有密码 +`sudo -u postgres psql` + +## 真正的角色用户进入psql的方式如下 +`psql -U myuser -d mydb -h localhost` + +# 进入数据库后修改超级账号的密码 +`ALTER USER postgres WITH PASSWORD '123456';` + +# 通过数据库命令创建第一个数据库 +`CREATE DATABASE face_db;` + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e09109b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,16 @@ + +fastapi +uvicorn[standard] +numpy +opencv-python +insightface +databases +psycopg2-binary +onnxruntime +sqlalchemy + +requests +pydantic +asyncpg +milvus-lite +pymilvus \ No newline at end of file