Files
video/service/user_service.py
2025-09-08 17:34:23 +08:00

155 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException
from mysql.connector import Error as MySQLError
from ds.db import db
from schema.user_schema import UserRegisterRequest, UserLoginRequest, UserResponse
from schema.response_schema import APIResponse
from middle.auth_middleware import (
get_password_hash,
verify_password,
create_access_token,
ACCESS_TOKEN_EXPIRE_MINUTES,
get_current_user
)
# 创建用户接口路由(前缀 /users、标签用于 Swagger 分类)
router = APIRouter(
prefix="/users",
tags=["用户管理"]
)
# ------------------------------
# 1. 用户注册接口
# ------------------------------
@router.post("/register", response_model=APIResponse, summary="用户注册")
async def user_register(request: UserRegisterRequest):
"""
用户注册:
- 校验用户名是否已存在
- 加密密码后插入数据库
- 返回注册成功信息
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 1. 检查用户名是否已存在(唯一索引)
check_query = "SELECT username FROM users WHERE username = %s"
cursor.execute(check_query, (request.username,))
existing_user = cursor.fetchone()
if existing_user:
raise HTTPException(
status_code=400,
detail=f"用户名 '{request.username}' 已存在、请更换其他用户名"
)
# 2. 加密密码
hashed_password = get_password_hash(request.password)
# 3. 插入新用户到数据库
insert_query = """
INSERT INTO users (username, password)
VALUES (%s, %s)
"""
cursor.execute(insert_query, (request.username, hashed_password))
conn.commit() # 提交事务
# 4. 返回注册成功响应
return APIResponse(
code=201, # 201 表示资源创建成功
message=f"用户 '{request.username}' 注册成功",
data=None
)
except MySQLError as e:
conn.rollback() # 数据库错误时回滚事务
raise Exception(f"注册失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 2. 用户登录接口
# ------------------------------
@router.post("/login", response_model=APIResponse, summary="用户登录(获取 Token")
async def user_login(request: UserLoginRequest):
"""
用户登录:
- 校验用户名是否存在
- 校验密码是否正确
- 生成 JWT Token 并返回
"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 修复: SQL查询添加 created_at 和 updated_at 字段
query = """
SELECT id, username, password, created_at, updated_at
FROM users
WHERE username = %s
"""
cursor.execute(query, (request.username,))
user = cursor.fetchone()
# 2. 校验用户名和密码
if not user or not verify_password(request.password, user["password"]):
raise HTTPException(
status_code=401,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
# 3. 生成 Token过期时间从配置读取
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["username"]},
expires_delta=access_token_expires
)
# 4. 返回 Token 和用户基本信息
return APIResponse(
code=200,
message="登录成功",
data={
"access_token": access_token,
"token_type": "bearer",
"user": UserResponse(
id=user["id"],
username=user["username"],
created_at=user.get("created_at"),
updated_at=user.get("updated_at")
)
}
)
except MySQLError as e:
raise Exception(f"登录失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 3. 获取当前登录用户信息(需认证)
# ------------------------------
@router.get("/me", response_model=APIResponse, summary="获取当前用户信息")
async def get_current_user_info(
current_user: UserResponse = Depends(get_current_user) # 依赖认证中间件
):
"""
获取当前登录用户信息:
- 需在请求头携带 Token格式: Bearer <token>
- 认证通过后返回用户信息
"""
return APIResponse(
code=200,
message="获取用户信息成功",
data=current_user
)