简单构建高并发

This commit is contained in:
2025-07-29 18:15:35 +08:00
commit c74eaf8836
11 changed files with 567 additions and 0 deletions

28
Milvus基本配置.md Normal file
View File

@ -0,0 +1,28 @@
# Milvus安装方法
## 通过podman安装Milvus
### 安装podman的方法
1. 通过基本命令来安装podman
`sudo apt-get update` -- 更新Ubuntu现有环境
`sudo apt-get -y install podman` -- 安装podman
`podman info` -- 确认podman安装成功
### 安装Milvus的方法
1. 通过daocloud拉去官方的Milvus镜像
`sudo podman pull docker.m.daocloud.io/milvusdb/milvus:v2.5.15` -- 国内镜像拉取方法
2. 创建docker=podman的系统永久映射
`echo "alias docker=podman" >> ~/.bashrc` -- 创建映射
`source ~/.bashrc` -- 刷新bash
3. 启动Milvus服务
`sed 's/docker/podman/g' milvus_embed.sh > milvus_embed_podman.sh` -- 将脚本中的docker语句替换成docker/podman语句以实现兼容
`chmod +x milvus_embed_podman.sh` -- 设置权限
`mkdir -p "$(pwd)/volumes/milvus"` -- 创建宿主机侧路径
- 脚本里 -v $(pwd)/volumes/milvus:/var/lib/milvus 要求宿主机侧路径必须事先存在Podman 不会像 Docker 那样自动创建。
`./milvus_embed_podman.sh start` -- 启动服务脚本
`./milvus_embed_podman.sh stop` -- 停止服务脚本

0
app/__init__.py Normal file
View File

100
app/api.py Normal file
View File

@ -0,0 +1,100 @@
from fastapi import APIRouter, HTTPException, Body, Depends
from . import schemas
from .services import face_service, FaceRecognitionService
router = APIRouter()
# Dependency to get the service instance
def get_face_service():
return face_service
@router.post("/register", response_model=schemas.RegisterResponse, summary="注册新的人脸")
async def register(request: schemas.RegisterRequest, service: FaceRecognitionService = Depends(get_face_service)):
"""
为用户注册一张新的人脸。元数据存入PostgreSQL向量存入Milvus。
- **功能**: 将一张图片中的人脸与一个用户ID和姓名关联起来。
- **校验**:
- 检查用户ID是否已被他人注册。
- 确保图片中只包含一张清晰可识别的人脸。
- **操作**: 如果是新用户会在PostgreSQL创建用户记录然后将提取到的人脸特征向量存入Milvus。
"""
try:
if not request.url and not request.face_data:
raise ValueError("必须提供图片的url或face_data。")
user_info = await service.register_new_face(request.id, request.name, request)
return schemas.RegisterResponse(data=user_info)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}")
@router.post("/detect", response_model=schemas.DetectResponse, summary="1:N 人脸识别 (Milvus)")
async def detect(request: schemas.ImageSource = Body(...), service: FaceRecognitionService = Depends(get_face_service)):
"""
在一张图片中检测并识别所有已知的人脸。使用Milvus进行高性能向量搜索。
- **功能**: 对比图片中的人脸与Milvus中的所有人脸返回匹配结果。
- **阈值**: 内部使用相似度阈值来判断是否匹配成功。
- **返回**: 返回一个列表,包含所有被识别出的人员信息、位置和置信度。
"""
try:
if not request.url and not request.face_data:
raise ValueError("必须提供图片的url或face_data。")
detected_faces = await service.detect_faces(request)
return schemas.DetectResponse(data=detected_faces)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}")
@router.post("/verify", response_model=schemas.VerifyResponse, summary="1:1 人脸认证 (Milvus)")
async def verify(request: schemas.VerifyRequest, service: FaceRecognitionService = Depends(get_face_service)):
"""
验证一张图片中的人脸是否属于指定的用户ID。在Milvus中进行限定范围的向量搜索。
- **功能**: 精确比对,判断“这张脸是不是这个人”。
- **场景**: 用于人脸登录、刷脸支付等高安全要求的场景。
- **返回**: 返回布尔值 `match` 表示是否匹配,以及具体的相似度 `confidence`。
"""
try:
if not request.url and not request.face_data:
raise ValueError("必须提供图片的url或face_data。")
verification_result = await service.verify_face(request.id, request)
return schemas.VerifyResponse(data=verification_result)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}")
@router.delete("/users/{user_id}", response_model=schemas.StandardResponse, summary="删除用户 (同步删除PG和Milvus)")
async def delete_user(user_id: int, service: FaceRecognitionService = Depends(get_face_service)):
"""
根据用户ID从PostgreSQL和Milvus中同步删除该用户的所有数据。
"""
try:
deleted_user = await service.delete_user(user_id)
return schemas.StandardResponse(message=f"成功删除用户 {deleted_user['name']} (ID: {user_id})。")
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}")
@router.get("/users/{user_id}", response_model=schemas.RegisterResponse, summary="获取单个用户信息")
async def get_user(user_id: int, service: FaceRecognitionService = Depends(get_face_service)):
"""
根据用户ID查询并返回用户的详细信息元数据来自PG人脸数量来自Milvus
"""
user_info = await service.get_user(user_id)
if not user_info:
raise HTTPException(status_code=404, detail=f"ID为 {user_id} 的用户不存在。")
return schemas.RegisterResponse(data=user_info)
@router.get("/users", response_model=schemas.UserListResponse, summary="获取所有用户列表")
async def list_users(skip: int = 0, limit: int = 100, service: FaceRecognitionService = Depends(get_face_service)):
"""
获取数据库中所有已注册用户的列表支持分页元数据来自PG人脸数量来自Milvus
"""
users_list = await service.list_all_users(skip=skip, limit=limit)
return schemas.UserListResponse(data=users_list)

14
app/config.py Normal file
View File

@ -0,0 +1,14 @@
from pydantic import BaseModel
class Settings(BaseModel):
# PostgreSQL 数据库连接URL
# 格式: postgresql+asyncpg://user:password@host:port/database
DATABASE_URL: str = "postgresql+asyncpg://postgres:123456@localhost:5432/face_db"
# Milvus 连接信息 (将在第二阶段使用)
MILVUS_HOST: str = "localhost"
MILVUS_PORT: int = 19530
# 创建一个全局配置实例
settings = Settings()

32
app/database.py Normal file
View File

@ -0,0 +1,32 @@
import databases
import sqlalchemy
from .config import settings
# 使用 databases 库来提供异步连接池
database = databases.Database(settings.DATABASE_URL)
# 使用 SQLAlchemy Core 定义表结构 (元数据)
metadata = sqlalchemy.MetaData()
users = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String, nullable=False),
)
face_features = sqlalchemy.Table(
"face_features",
metadata,
sqlalchemy.Column("feature_id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("user_id", sqlalchemy.Integer, sqlalchemy.ForeignKey("users.id", ondelete="CASCADE"), nullable=False),
sqlalchemy.Column("embedding", sqlalchemy.LargeBinary, nullable=False),
sqlalchemy.Index("ix_face_features_user_id", "user_id"),
)
# 创建一个引擎,用于在启动时创建表
engine = sqlalchemy.create_engine(settings.DATABASE_URL.replace("+asyncpg", ""))
def create_tables():
"""创建所有定义的表"""
metadata.create_all(engine)

30
app/main.py Normal file
View File

@ -0,0 +1,30 @@
from fastapi import FastAPI
from .api import router as api_router
from .database import database, create_tables
app = FastAPI(
title="人脸识别服务 V2 - PostgreSQL版",
description="一个使用PostgreSQL和Milvus的高性能人脸识别服务。",
version="2.1.0"
)
@app.on_event("startup")
async def on_startup():
"""应用启动时,连接数据库并创建表"""
await database.connect()
create_tables() # SQLAlchemy的create_all是同步的所以这样调用没问题
@app.on_event("shutdown")
async def on_shutdown():
"""应用关闭时,断开数据库连接"""
await database.disconnect()
# 挂载API路由
app.include_router(api_router, prefix="/api", tags=["Face Recognition"])
@app.get("/", summary="服务健康检查")
def read_root():
return {"status": "ok", "message": "欢迎使用人脸识别服务 V2"}

68
app/milvus_helpers.py Normal file
View File

@ -0,0 +1,68 @@
from pymilvus import connections, utility, Collection, CollectionSchema, FieldSchema, DataType
from .config import settings
# 定义集合名称和相关参数
COLLECTION_NAME = "face_features_collection"
FACE_VECTOR_DIM = 512 # InsightFace 'buffalo_l' 模型的特征维度
class MilvusHelper:
def __init__(self):
try:
# 连接 Milvus 服务
connections.connect("default", host=settings.MILVUS_HOST, port=settings.MILVUS_PORT)
print("成功连接到 Milvus 服务。")
except Exception as e:
print(f"连接 Milvus 服务失败: {e}")
raise
def has_collection(self):
"""检查集合是否存在"""
return utility.has_collection(COLLECTION_NAME)
def create_collection(self):
"""创建一个新的集合来存储人脸特征"""
if self.has_collection():
print(f"集合 '{COLLECTION_NAME}' 已存在。")
return
# 定义字段
# 主键字段Milvus 会自动生成ID
pk_field = FieldSchema(name="feature_id", dtype=DataType.INT64, is_primary=True, auto_id=True)
# 对应的用户ID字段
user_id_field = FieldSchema(name="user_id", dtype=DataType.INT64)
# 人脸特征向量字段
embedding_field = FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=FACE_VECTOR_DIM)
# 创建集合 Schema
schema = CollectionSchema(
fields=[pk_field, user_id_field, embedding_field],
description="人脸识别特征集合",
enable_dynamic_field=False
)
# 创建集合
self.collection = Collection(name=COLLECTION_NAME, schema=schema)
print(f"集合 '{COLLECTION_NAME}' 创建成功。")
# 为向量字段创建索引以加速搜索
index_params = {
"metric_type": "IP", # IP (Inner Product) 等价于归一化向量的余弦相似度
"index_type": "IVF_FLAT",
"params": {"nlist": 1024} # nlist 的值需要根据数据量调整
}
self.collection.create_index(field_name="embedding", index_params=index_params)
print("向量索引创建成功。")
return self.collection
def get_collection(self):
"""获取集合对象并加载到内存中以便搜索"""
if not self.has_collection():
self.create_collection()
collection = Collection(COLLECTION_NAME)
collection.load()
return collection
# 在模块加载时创建一个全局实例
milvus_client = MilvusHelper()
collection = milvus_client.get_collection()

74
app/schemas.py Normal file
View File

@ -0,0 +1,74 @@
from pydantic import BaseModel, Field
from typing import Optional, List
# ===================================================================
# 基础模型 (Base Models)
# ===================================================================
class UserInfo(BaseModel):
"""用户的基本信息"""
id: int = Field(..., description="用户的唯一ID", example=1001)
name: str = Field(..., description="用户的姓名", example="张三")
registered_faces_count: int = Field(..., description="该用户已注册的人脸数量", example=2)
class FaceLocation(BaseModel):
"""人脸在图片中的位置和尺寸"""
x: int = Field(..., description="人脸框左上角的X坐标")
y: int = Field(..., description="人脸框左上角的Y坐标")
width: int = Field(..., description="人脸框的宽度")
height: int = Field(..., description="人脸框的高度")
# ===================================================================
# API 请求模型 (Request Models)
# ===================================================================
class ImageSource(BaseModel):
"""图片来源可以是URL或Base64编码的数据"""
url: Optional[str] = Field(None, description="图片的URL地址", example="http://example.com/image.jpg")
face_data: Optional[str] = Field(None, description="图片的Base64编码字符串")
class RegisterRequest(ImageSource):
"""注册新用户的请求体"""
id: int = Field(..., description="要注册用户的唯一ID", example=1001)
name: str = Field(..., description="要注册用户的姓名", example="张三")
class VerifyRequest(ImageSource):
"""1:1人脸认证的请求体"""
id: int = Field(..., description="要验证的用户ID", example=1001)
# ===================================================================
# API 响应模型 (Response Models)
# ===================================================================
class StandardResponse(BaseModel):
"""标准API响应模型"""
code: int = Field(0, description="响应码0为成功非0为失败", example=0)
message: str = Field("success", description="响应消息", example="操作成功")
data: Optional[dict] = None
class UserListResponse(StandardResponse):
"""获取用户列表的响应"""
data: List[UserInfo]
class RegisterResponse(StandardResponse):
"""注册成功后的响应"""
data: UserInfo
class VerificationResult(BaseModel):
"""1:1认证结果"""
match: bool = Field(..., description="是否匹配")
confidence: float = Field(..., description="置信度 (0.0 to 1.0)")
class VerifyResponse(StandardResponse):
"""1:1人脸认证的响应"""
data: VerificationResult
class DetectedFace(UserInfo):
"""1:N识别结果中的单个人脸信息"""
location: FaceLocation
confidence: float = Field(..., description="识别的置信度")
class DetectResponse(StandardResponse):
"""1:N人脸识别的响应"""
data: List[DetectedFace]

191
app/services.py Normal file
View File

@ -0,0 +1,191 @@
import cv2
import numpy as np
from insightface.app import FaceAnalysis
import base64
import requests
from .database import database, users
from .milvus_helpers import collection as milvus_collection
class FaceRecognitionService:
"""封装所有核心人脸识别和数据库操作的业务逻辑"""
def __init__(self):
self.app = FaceAnalysis(name="buffalo_l", providers=['CPUExecutionProvider'])
self.app.prepare(ctx_id=0, det_size=(640, 640))
def _decode_image(self, image_source):
"""私有方法从URL或Base64解码图片"""
img = None
if image_source.face_data:
try:
img_data = base64.b64decode(image_source.face_data)
nparr = np.frombuffer(img_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
except Exception as e:
raise ValueError(f"Base64解码失败: {e}")
elif image_source.url:
try:
response = requests.get(image_source.url, timeout=10)
response.raise_for_status()
img_array = np.asarray(bytearray(response.content), dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
except Exception as e:
raise ValueError(f"从URL获取图片失败: {e}")
if img is None:
raise ValueError("无法加载图片")
return img
async def register_new_face(self, user_id: int, name: str, image_source):
"""注册新的人脸元数据存入PostgreSQL向量存入Milvus"""
query = users.select().where(users.c.id == user_id)
user = await database.fetch_one(query)
if user and user.name != name:
raise ValueError(f"ID {user_id} 已被 '{user.name}' 注册,无法更改为 '{name}'")
image = self._decode_image(image_source)
faces = self.app.get(image)
if not faces:
raise ValueError("图片中未检测到人脸。")
if len(faces) > 1:
raise ValueError("注册图片中只能包含一张人脸。")
embedding = faces[0].embedding
async with database.transaction():
if not user:
query = users.insert().values(id=user_id, name=name)
await database.execute(query)
data_to_insert = [[user_id], [embedding]]
milvus_collection.insert(data_to_insert)
milvus_collection.flush()
count_expr = f"user_id == {user_id}"
count = milvus_collection.query(expr=count_expr, output_fields=["user_id"])
return {"id": user_id, "name": name, "registered_faces_count": len(count)}
async def detect_faces(self, image_source):
"""在Milvus中进行1:N人脸识别"""
image = self._decode_image(image_source)
detected_faces = self.app.get(image)
if not detected_faces:
return []
search_vectors = [face.embedding for face in detected_faces]
search_params = {"metric_type": "IP", "params": {"nprobe": 10}}
results = milvus_collection.search(
data=search_vectors, anns_field="embedding", param=search_params,
limit=1, output_fields=["user_id"]
)
final_results = []
user_ids_to_fetch = {hits[0].entity.get('user_id') for hits in results if hits}
if not user_ids_to_fetch:
return []
# 批量查询用户信息,提高效率
user_query = users.select().where(users.c.id.in_(user_ids_to_fetch))
user_records = await database.fetch_all(user_query)
user_map = {user.id: user.name for user in user_records}
for i, hits in enumerate(results):
if not hits:
continue
best_hit = hits[0]
similarity = best_hit.distance
RECOGNITION_THRESHOLD = 0.5
if similarity > RECOGNITION_THRESHOLD:
matched_user_id = best_hit.entity.get('user_id')
user_name = user_map.get(matched_user_id)
if user_name:
count_expr = f"user_id == {matched_user_id}"
count_res = milvus_collection.query(expr=count_expr, output_fields=["user_id"])
x1, y1, x2, y2 = detected_faces[i].bbox.astype(int)
final_results.append({
"id": matched_user_id,
"name": user_name,
"registered_faces_count": len(count_res),
"confidence": float(similarity),
"location": {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1}
})
return final_results
async def verify_face(self, user_id: int, image_source):
"""在Milvus中进行1:1人脸认证"""
image = self._decode_image(image_source)
detected_faces = self.app.get(image)
if not detected_faces:
raise ValueError("图片中未检测到人脸。")
if len(detected_faces) > 1:
raise ValueError("用于认证的图片中只能包含一张人脸。")
embedding = detected_faces[0].embedding
search_params = {"metric_type": "IP", "params": {"nprobe": 10}}
# 在指定user_id的范围内进行搜索
expr = f"user_id == {user_id}"
results = milvus_collection.search(
data=[embedding], anns_field="embedding", param=search_params,
limit=1, expr=expr, output_fields=["user_id"]
)
best_similarity = 0.0
if results and results[0]:
best_similarity = results[0][0].distance
VERIFICATION_THRESHOLD = 0.6
match = bool(best_similarity > VERIFICATION_THRESHOLD)
return {"match": match, "confidence": float(best_similarity)}
async def delete_user(self, user_id: int):
"""从PostgreSQL和Milvus中删除用户"""
query = users.select().where(users.c.id == user_id)
user = await database.fetch_one(query)
if not user:
raise ValueError(f"ID为 {user_id} 的用户不存在。")
async with database.transaction():
delete_query = users.delete().where(users.c.id == user_id)
await database.execute(delete_query)
# 从 Milvus 删除
expr = f"user_id == {user_id}"
milvus_collection.delete(expr)
return {"id": user.id, "name": user.name}
async def get_user(self, user_id: int):
"""获取单个用户的元数据和在Milvus中的人脸数量"""
query = users.select().where(users.c.id == user_id)
user = await database.fetch_one(query)
if not user:
return None
count_expr = f"user_id == {user_id}"
count_res = milvus_collection.query(expr=count_expr, output_fields=["user_id"])
return {"id": user.id, "name": user.name, "registered_faces_count": len(count_res)}
async def list_all_users(self, skip: int = 0, limit: int = 100):
"""列出所有用户并从Milvus统计人脸数量"""
query = users.select().offset(skip).limit(limit)
user_records = await database.fetch_all(query)
results = []
for user in user_records:
count_expr = f"user_id == {user.id}"
count_res = milvus_collection.query(expr=count_expr, output_fields=["user_id"])
results.append({"id": user.id, "name": user.name, "registered_faces_count": len(count_res)})
return results
face_service = FaceRecognitionService()

14
psql基本配置.md Normal file
View File

@ -0,0 +1,14 @@
# Post GRE SQL 数据库命令记录
## 安装好后默认创建一个超级用户名叫postgre通过以下命令显示地用postgre登录psql没有密码
`sudo -u postgres psql`
## 真正的角色用户进入psql的方式如下
`psql -U myuser -d mydb -h localhost`
# 进入数据库后修改超级账号的密码
`ALTER USER postgres WITH PASSWORD '123456';`
# 通过数据库命令创建第一个数据库
`CREATE DATABASE face_db;`

16
requirements.txt Normal file
View File

@ -0,0 +1,16 @@
fastapi
uvicorn[standard]
numpy
opencv-python
insightface
databases
psycopg2-binary
onnxruntime
sqlalchemy
requests
pydantic
asyncpg
milvus-lite
pymilvus