@(>y-mOcd;DStRW{*y28;sCZ-0-`1 z0Ul%(hVqTI3>U^VFoS4XYW>w09`;Q6!jr!6NN{v%!T0pE&p#1p8{Rz?XqybQjT{)= zUkI!%_|_F4iVPNfYl<~tv9YE?p#2de{>bpADSu?r9~s#>nl1P@6q_T(=3sF}>%8CX zcbs~m6he;1c|nmo@q!4lWXVzxcmu%C`Jykp_9Vb!j@1A9AWkoDc?Bn!A#y93R~~ zMwUx8bdWZ3kvKAvMobgL3L3O(t-?h=8U;z=6#ndw2&u5Th!Gc&WcyF9{ZS-8ojvj> zMF^0eE^vD@v$He1v-A3g$KymWMn3-Q&;buZUvR`6_6qRe5don|gb@~0ltNLI5~4!N z6151NW>LkI6qO8ERIMpn)RwYG?I}4b8#GCEq?}Qw0b5m9${ls5JW-E<+tiwrH|jND zyXs5%qkaRH)!Nji=q3YpsDV^nv`#=4M7-EJC}7u&`>LgUwrD-^W6u^qqMHG#F`x$E z+ya=FV^`tqd_=bb)royG{wgJiYe^&C1YdwO4cKtq%Tjb3t~c<;0SRvgd^_HTw-5zy zB@z+1Jm75v-r&pD=njx*BSBN%fPfo~y>+m+3D%oO8$#RfapfVA$vCbPJ((iR!DmcV zJ %QkB}@$V~u$_`}&@aoJ$a6joCCkuIDu14xBoi(}xZ| z9oMvx48=bpaY|^&!sKJ8`y!NPC=-WAbYOIc>1b9DXVN-3$8jex#KNb#k9Iu;m<6BZ zJpCG>Jw(f9(i&m5vkAV1N%5SX0g-ksF+@^CV@^!6lqBFDm^n3)phS;7Pez#|t`25s zQXfjOU_6l^nikVD&y#eFoXaLDiKUY1oKCd!?tMAXqwP~OgUK|gl?|?bs 4$Vo29>4-(vT3D6#j`opFoa=@J%D_>F-)H| zOwj^cmal!Z-VS%KzjOEfSCz6SC}oqtO`Cv|N&Im_0&NCF<7ZGPKR5Faf c*3Etb9<9qMCd1YmM zLV;wU^zm4|$3;U-+AG1zM0gE&FVn|8M4xyz(sd)R)?O6Rk&^C*JrLWj$@F% zG_FSBqwHauAmxCX*!m oryx}Tz{o5Y+%)e!_@p9w4Ls!(d z)p`H!=~T)8_(j{&2J%euQ_Dg}xa1Fy+ipvqn^K@81tv)GnTgnf)V5SxH`#u hq5&{Hc)mimw{`w}T!~(Wpc<^;kCOD98 zmSOZwu9`=>QNTeX(z*GKF(}MftIuu$=c+o6$pMATW;)gwxLE;$<|WXE?PC&rR?y31 z))>Q8XEG`4f(@K%c=H +i2zxb{WiLb*cSEBxl}^;ecBF5SKU=JGr5E&uI}<%_SCIdcsl$B1BMWpZle z*W)YWZ!LfL7hqp`_sV(S>C>n8@ U?|R&AK*@ubS^LnMx& zMnfiWR;#+kL@jr=yN?rbGGTNnboBhO-pGkaxKByqJqj=E>3E9lQOY5TV{sj*Mpodk za %Zw$N?v8&+dM8U`s*fxmxI&NL~!1} zcigt ymQ@kJuO7A{cO9qh2I{nbPHz8- zFYNvb#BGr9rM4d|H1$Z>jsD(vcuP2lZn)jy2JuGF5#A%-Xcfb|#2dS;9Nr^__n{m6 z8jje-kkB1!F-n+Gv&tpRrjKTm=|RdfHnR`JAts<@ WRcoc6ImnMS7Vh!<)>5v2wEx5vx-soy}xa zs(@Of6N5erqSq*xhM& Q3!YV{Mfkx~coo5Kw#E2;+WLTFtFlGd zIVEz^&S{a8cHXaX3!9606~XV%kIr_LzI)Vw=X&}daN7N5)Y3k^cfR@2`>0K5FM?S3 M&7OJ4f5xT%2Tn5GLI3~& literal 0 HcmV?d00001 diff --git a/middle/__pycache__/error_handler.cpython-312.pyc b/middle/__pycache__/error_handler.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..7728b6ce389654b25a461a0d744bf9aeff9369e6 GIT binary patch literal 2482 zcmb_e+i%lW7(b2^JJ+UZlg@1?7O2=11Z@XQ8yyS<8b%8gNCKTACd-RWo6Jsv?TEH& zrK;T8aT#bpS11CcJ!ES)(mkxzVB!zhORIGvvWj>Z3GLG=5owpFofF4N8AwB7N6PuG z=R3dio$q|k&o-MGz;XQU58VM10KXwY8~P&il!eSFU;&HqK)^znpbP0RB 6Q{Zxl=+bAdPT7Qq^_7I=cU2^FCV$deq#8aew>J!?9t4^{S>J(eZ3 zrAMcbe(qS3lVpXEWJXR(9!#;e2ljXPIVlm3NgQ;s?G6SH_ `mv}%^;tPj6opps@oE4D@P1vCTxnPNew@W>A ybj)aX_I z20l^T9RueY^gxDJrVhDqjfjt0i>ehwwrc`wU )?itHBOo);F-~g zR;zU7tes`jtergYY-XIAw`Lc8WwbCqHJZIl%P3!Lhgz!at#Pl6v`&juLi%1A>59C* zuKAT|88K^NtuKp&wXqcuY|^gf_g|S?rNuAd@xBm`Tk|W^6?iIHhuol9`mtBO+ZR>Z ze<7&)KM{P3AICh^$vyB)*;A?P)L{1BP |cZf>S`cw1?|@8BU{ zASj|%qv)gYNK7#cFbO=(CWVAZpk)ZELa?07M0gR^jN&TBc$+@d;XM@Gz2Dy!^0g}F zeZ7H$dkgJYSyga(2X52W5%l>xynE?@&wtqGr+t3^e!qfCvgpx^7PJBCZi=mdm{zxu z6dZ~Y0_=w4Z?~`I$l(af#g7OPmgTu4s&h9JgM!$Y=vAmLKK=>A)3DpCUk|!y6yJnF z(D9@%VSX3EpMxMz0Atm~hWXbv%#)27s&3epsjeAqxzuuX>+sHuwf2G4J!f^_+LF#x)g6@10%k{nff-83#`HV|)Dqwdw)2X^z^o;Cv(FqB3<&Y5lZm z$~3d#m*(BGO>MJOd&X6pC-tt1Ka9X$I}$z1=WS%Ac^Qx )(wioF zQqfuGmW;y%GvP49Ojuka>&~u+i>(CAgn{@=LX|eOYum2vSck` `jVPcSPN>=?POaw}DcsEo zOer+2%Id2ymHv $6+V_E&wU^^LlM>&kk-Lz8OJc}5omY_);$8P d4?)vIP@4sf%R~!Cq&{2*kj!}Uh=@(l{U7#SWcUC8 literal 0 HcmV?d00001 diff --git a/middle/auth_middleware.py b/middle/auth_middleware.py new file mode 100644 index 0000000..9cac02d --- /dev/null +++ b/middle/auth_middleware.py @@ -0,0 +1,96 @@ +from datetime import datetime, timedelta, timezone +from typing import Optional + +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer +from jose import JWTError, jwt +from passlib.context import CryptContext + +from ds.config import JWT_CONFIG +from ds.db import db +from service.user_service import UserResponse + +# ------------------------------ +# 密码加密配置 +# ------------------------------ +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +# ------------------------------ +# JWT 配置 +# ------------------------------ +SECRET_KEY = JWT_CONFIG["secret_key"] +ALGORITHM = JWT_CONFIG["algorithm"] +ACCESS_TOKEN_EXPIRE_MINUTES = int(JWT_CONFIG["access_token_expire_minutes"]) + +# OAuth2 依赖(从请求头获取 Token、格式:Bearer ) +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/login") + +# ------------------------------ +# 密码工具函数 +# ------------------------------ +def verify_password(plain_password: str, hashed_password: str) -> bool: + """验证明文密码与加密密码是否匹配""" + return pwd_context.verify(plain_password, hashed_password) + +def get_password_hash(password: str) -> str: + """对明文密码进行 bcrypt 加密""" + return pwd_context.hash(password) + +# ------------------------------ +# JWT 工具函数 +# ------------------------------ +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: + """生成 JWT Token""" + to_encode = data.copy() + # 设置过期时间 + if expires_delta: + expire = datetime.now(timezone.utc) + expires_delta + else: + expire = datetime.now(timezone.utc) + timedelta(minutes=15) + # 添加过期时间到 Token 数据 + to_encode.update({"exp": expire}) + # 生成 Token + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + +# ------------------------------ +# 认证依赖(获取当前登录用户) +# ------------------------------ +def get_current_user(token: str = Depends(oauth2_scheme)) -> UserResponse: + """从 Token 中解析用户信息、验证通过后返回当前用户""" + # 认证失败异常 + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Token 无效或已过期", + headers={"WWW-Authenticate": "Bearer"}, + ) + + try: + # 解码 Token + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + # 获取 Token 中的用户名 + username: str = payload.get("sub") + if username is None: + raise credentials_exception + except JWTError: + raise credentials_exception + + # 从数据库查询用户(验证用户是否存在) + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) # 返回字典格式结果 + query = "SELECT id, username, created_at, updated_at FROM users WHERE username = %s" + cursor.execute(query, (username,)) + user = cursor.fetchone() + + if user is None: + raise credentials_exception # 用户不存在 + + # 转换为 UserResponse 模型(自动校验字段) + return UserResponse(** user) + except Exception as e: + raise credentials_exception from e + finally: + db.close_connection(conn, cursor) \ No newline at end of file diff --git a/middle/error_handler.py b/middle/error_handler.py new file mode 100644 index 0000000..11521eb --- /dev/null +++ b/middle/error_handler.py @@ -0,0 +1,68 @@ +from fastapi import Request, status +from fastapi.responses import JSONResponse +from fastapi.exceptions import HTTPException, RequestValidationError +from mysql.connector import Error as MySQLError +from jose import JWTError + +from schema.response_schema import APIResponse + + +async def global_exception_handler(request: Request, exc: Exception): + """全局异常处理器:所有未捕获的异常都会在这里统一处理""" + # 1. 请求参数验证错误(Pydantic 校验失败) + if isinstance(exc, RequestValidationError): + error_details = [] + for err in exc.errors(): + error_details.append(f"{err['loc'][1]}: {err['msg']}") + return JSONResponse( + status_code=status.HTTP_400_BAD_REQUEST, + content=APIResponse( + code=400, + message=f"请求参数错误:{'; '.join(error_details)}", + data=None + ).model_dump() + ) + + # 2. HTTP 异常(主动抛出的业务错误、如 401/404) + if isinstance(exc, HTTPException): + return JSONResponse( + status_code=exc.status_code, + content=APIResponse( + code=exc.status_code, + message=exc.detail, + data=None + ).model_dump() + ) + + # 3. JWT 相关错误(Token 无效/过期) + if isinstance(exc, JWTError): + return JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content=APIResponse( + code=401, + message="Token 无效或已过期", + data=None + ).model_dump(), + headers={"WWW-Authenticate": "Bearer"}, + ) + + # 4. MySQL 数据库错误 + if isinstance(exc, MySQLError): + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=APIResponse( + code=500, + message=f"数据库错误:{str(exc)}", + data=None + ).model_dump() + ) + + # 5. 其他未知错误(兜底处理) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=APIResponse( + code=500, + message=f"服务器内部错误:{str(exc)}", + data=None + ).model_dump() + ) diff --git a/schema/__pycache__/device_schema.cpython-312.pyc b/schema/__pycache__/device_schema.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..01a7f7e65ec6a505544d84d70752b3b1a216c313 GIT binary patch literal 3109 zcmbVOU2GKB6`t9h{r|(PF(%l?jv--5?2rJ0rg0*H1Gb561ST{qs?jp{!VcQM%umWT zL|R8Au^}N#Q);)+XssmHA{*>Pfl!>3zV~I(u2lB6RaJ%AwVyz;dFfN{nXxw$QeHX_ z-@S9rH|PG$Ip4j1_4(WkeNN5Yh+cFu%=hrJ^4OL;i|1))j6n=yV@w!YON@=PVU~ry zHD-xh!`3(#=Hj-nO}B9|d)yIr#GPR$%UGCw46>bJkR3awtVYbR3pubGIkEdVhg@g4 zut#UPDa&JGd3CFoT74#~FU^Mh3lL}^#1bcpC|D8WIAkTx!IUB<6OkC qm_rt1MclNlHhUYU zYgtFx*oo{f(N%;eMPgI6yw2@2N+^zYUZz^BDTggvopPuVnK$NEXYuw%g z+T~g8kC`hL;*Lj7@iCk@p+t!XVOfwweY=Q7ObtBr);`fjrnKxw#2HP>N+J@+4_DV* z^D~pxx%Z91udosfL+q)D6p70rFLClb@OhrNc|M**Y7F{bo_|w~#Fjk{o<~W6=OvnL zj1&OiAc|F1Bog5HNFtF`BJdYwN~!~YJ%F9U6=snl^PkpNUp)G{h;Z_#EJSfUaun&? z#T##(sk8)1t%HYl hO2vv;H?J2W~_;`*efmEeR^@F|7ZC9J4YLSU(V z<$;`nTwlym7-LQ|GMi?V8sp^h|6id)JGK1s4tttCxssD6!fEED!}Pk`MV6`Ms4FeY z(C=M8Wv;|O#DuKsdg|9^{-j+Ut4>eeyZP1p?Mw3$ANF_e(9Y(xnfD3vYn!SZ@7NWx z5l1v4M`PkKVvVC6#IB~G;D{Y31X@tU6~(8Jc!DNq$Rf4SsKiNWJbWX=B3oOfX0Up| zO23iPDMi{$Es#|5P6`?3-=4tTK;1tA4IjTf`u4eZGCg-W?+1g!gW2w}{TKG<9L4(0 zB`$Q=6a1T}`HrW#=-E) z;y3i{AeK|KuCe+GB|Iu+pM0d<`mG^i_rD#XiCMXJ{oc*<^Aq{%+$WV!&uG^^)ZWX} zoL%@Xd;i pT+Yl-4p+|S7c#@uKb$!@ z)W58Z#3w276rWP%D6dK}bqys~FMVD4%~eV}f{#gxU`Qwl#KgBS|0~^8qgFo4RIZ0KNj0HJP<6VX zs)ne~YTsQnPLaykjfJtViCfSQ9mLb2exOw7e*A8BgpWQXS=R#MJJ5t`XCBMDUHB{PkI5U|2#kg?cmvByOd2W^~?0%!PC0yVd z$2aG;{l25fh0Cp*azbGzDz#!Tw--5FZhI=H+-xbe^&Esp dVEWEI{w0>WvFY7FF`+jP9U}x6Q5emU) zhhH=H0=D5tDhV47&*Bh;|3f7$$Zo25vqn0g?$geYi>x;49qwSYns1}dM{9m$jFz`% zs{Bq71Rx8HoZw6QO y={wBRRc6zYmt*&{ c Y46q&9H T0MvU3k9NvnE(I) literal 0 HcmV?d00001 diff --git a/schema/__pycache__/response_schema.cpython-312.pyc b/schema/__pycache__/response_schema.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..fae4d8b4931c76ad3b139528dbc35b5736229cab GIT binary patch literal 967 zcmYjQOHUI~6uvW^KA^3T5EOAC!T6{VB&dlYU;;sjQNj3HOh|6V-b wr_((J;WrYNq^ar>hfeo{90W*aaB IoZv-8Nh-bfLt#C;xISs za Nl29j>DUXNN! n1GXFHp>@|m4{x;fTG)gudbB3aBPck=IBTUt8o)R;9gy^%;> z9vbSf=04h~pT*hS_V~N4T#jU~kgPp9YK_bsWJ8SUM_3PQV(CUrE=(>l)=0dab~Xz$ zb}~oo9qWsZmz~LM73NK^jCEaz;NDp4fxvz X-tsHj2A+rh9T5E z oc(P!!b==#1!u_m?Q$(6j)pq$8!#nJ@#y8A~v%Dg$EV8`vW`@UQWD zTmK^wVYOcm4`Ny9*UBFZI=DF+*L>81ZuDsV7TJcuI@h&cT{{+DIs0U-T1s@MPOWm% zdQIKinw9gtYc+j|p49zSu5Z2mWR_cL9ayW6keYL2m(oqETx8b+YR_bim6{95+uE^x z_ui1#^ceA|s6-BR?Tf~dpcrCUqkk;t?Mn?ErLZKfBa!KAcP{Rlc7i~LJUS(JT@1tg V1`S1U`j5xUG>wJ+0`gN5{s+W3NCN-> literal 0 HcmV?d00001 diff --git a/schema/__pycache__/user_schema.cpython-312.pyc b/schema/__pycache__/user_schema.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..3d6f4878c5f65b1dc300258df1a9dc601edb0d49 GIT binary patch literal 1674 zcmb_c%WoS+7@vLEUORSDY8NP_RUhJEi9{(hltP8nmPd e zX8g_fm|s(=1c7n=yBm{pDj~n)q&p%VvN{f=PAH*Lo)m~qB;;~lD#*HAP;^BiGI^I! z Yb3PBz^e6Fqm_6z(Z-Eg$=9gqGS@D!tn9}RnhrZwqAc%6 z-mzJpTH ~vib^t@^3$UZ E>{A^IY8T>TcJpI|d#hby89|yCS zo8Npo)W#UJUk30tKU&wUoxOjz`pV$QaPvyNwebD@yJxbBpD5TxBhQKx?j&&Yqb~2G z%(1v#cI{Hp*F5l`Xckx-Z``JM@KXTI#nY{sX*l)O!QI1uAM0yn({Vm5ahjWp` SZUhEXU{FONKF7$-e5-%( =%pd}P`ScmuqcE$5N9{QCuBw4xsus@ao6mQTk_IjtC68o zhpX1IO1oBN>Bx8^W1+R7S{%a{p7B`TJGmsQIgtM=?+fa@3^I&P&LcfJkM^h{-(f{m zv~1kpfcZXJny`!iOm^$5JI$XjJeKT-cs)@Hkm5OsCt)7O(8Fwr9EE|)@h4Ei(h(PX z2HCS+7y#nD0MHYrHyW9}B0_sbgkEc8_K4u@5oh4`e-F;*p!^TPiLY;$p3o&&AE@!3 zz$SXsWT=LXM3XdSOM0r(pB?a%qM@8J)D-jfftw _^PY%X!7uz|z!#HH(e^|k%tF?o}}2&x3Xu*V-Lk|g~`wzkM~Ye_|- d(#7l=!7t86Y89M}tZme!!P>ru1U{iNe*xLT*uDS& literal 0 HcmV?d00001 diff --git a/schema/device_schema.py b/schema/device_schema.py new file mode 100644 index 0000000..bf80632 --- /dev/null +++ b/schema/device_schema.py @@ -0,0 +1,51 @@ +import hashlib +from datetime import datetime +from typing import Optional, List, Dict + +from pydantic import BaseModel, Field + + +# ------------------------------ +# 请求模型(前端传参校验) +# ------------------------------ +class DeviceCreateRequest(BaseModel): + """设备流信息创建请求模型""" + ip: Optional[str] = Field(..., max_length=100, description="设备IP地址") + hostname: Optional[str] = Field(None, max_length=100, description="设备别名") + params: Optional[Dict] = Field(None, description="设备详细信息") + + +def md5_encrypt(text: str) -> str: + """对字符串进行MD5加密""" + if not text: + return "" + md5_hash = hashlib.md5() + md5_hash.update(text.encode('utf-8')) + return md5_hash.hexdigest() + + +# ------------------------------ +# 响应模型(后端返回设备数据) +# ------------------------------ +class DeviceResponse(BaseModel): + """设备流信息响应模型(字段与表结构完全对齐)""" + id: int = Field(..., description="设备ID") + hostname: Optional[str] = Field(None, max_length=100, description="设备别名") + rtmp_push_url: Optional[str] = Field(None, description="需要推送的RTMP地址") + live_webrtc_url: Optional[str] = Field(None, description="直播的Webrtc地址") + detection_webrtc_url: Optional[str] = Field(None, description="检测的Webrtc地址") + device_online_status: int = Field(..., description="设备在线状态(1-在线、0-离线)") + device_type: Optional[str] = Field(None, description="设备类型") + alarm_count: int = Field(..., description="报警次数") + params: Optional[str] = Field(None, description="设备详细信息") + created_at: datetime = Field(..., description="记录创建时间") + updated_at: datetime = Field(..., description="记录更新时间") + + # 支持从数据库查询结果转换 + model_config = {"from_attributes": True} + + +class DeviceListResponse(BaseModel): + """设备流信息列表响应模型""" + total: int = Field(..., description="设备总数") + devices: List[DeviceResponse] = Field(..., description="设备列表") diff --git a/schema/response_schema.py b/schema/response_schema.py new file mode 100644 index 0000000..0461a3b --- /dev/null +++ b/schema/response_schema.py @@ -0,0 +1,13 @@ +from typing import Optional, Any + +from pydantic import BaseModel, Field + + +class APIResponse(BaseModel): + """统一 API 响应模型(所有接口必返此格式)""" + code: int = Field(..., description="状态码:200=成功、4xx=客户端错误、5xx=服务端错误") + message: str = Field(..., description="响应信息:成功/错误描述") + data: Optional[Any] = Field(None, description="响应数据:成功时返回、错误时为 None") + + # Pydantic V2 配置(支持从 ORM 对象转换) + model_config = {"from_attributes": True} diff --git a/schema/user_schema.py b/schema/user_schema.py new file mode 100644 index 0000000..6d8d9b1 --- /dev/null +++ b/schema/user_schema.py @@ -0,0 +1,32 @@ +from datetime import datetime + +from pydantic import BaseModel, Field + + +# ------------------------------ +# 请求模型(前端传参校验) +# ------------------------------ +class UserRegisterRequest(BaseModel): + """用户注册请求模型""" + username: str = Field(..., min_length=3, max_length=50, description="用户名(3-50字符)") + password: str = Field(..., min_length=6, max_length=100, description="密码(6-100字符)") + + +class UserLoginRequest(BaseModel): + """用户登录请求模型""" + username: str = Field(..., description="用户名") + password: str = Field(..., description="密码") + + +# ------------------------------ +# 响应模型(后端返回用户数据) +# ------------------------------ +class UserResponse(BaseModel): + """用户信息响应模型(隐藏密码等敏感字段)""" + id: int = Field(..., description="用户ID") + username: str = Field(..., description="用户名") + created_at: datetime = Field(..., description="创建时间") + updated_at: datetime = Field(..., description="更新时间") + + # Pydantic V2 配置(支持从数据库查询结果转换) + model_config = {"from_attributes": True} diff --git a/service/__pycache__/device_service.cpython-312.pyc b/service/__pycache__/device_service.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..dba856b53db1229c04709ac4aa766599652bf53a GIT binary patch literal 9367 zcmcIqd2kbFmhV=pb#+U!Y*~^mY|976GKX`JgCl_P0mfhla3DC@bt&q$k)y+DwPJAE z1Txu;$dUo7GJ`WoHe!mMaSD?+T6BvUr zco6GFPfZXHX?iswqL&D1d$l35mkjB8bvWuHg8Go5*C>^>K~sq8rKB<$q(kOjvsBgv zEg@^KRVwR)MIl?SEmYiF94hH83E6w?QoA8o8gleHq_Q#S47qw;IHti^En{M+0sIVo z8l*z0w~Q?tyh|?U=Zu+kFqSjc(}dj06tOzS#@hO|O!3p&(+X*?8%D3nAE!{4z-W7J zG|WserSQ*j#;J0Y+Fa09#+09NpVrDFrB<9-bf!X;P-(${vNA^pa#XPuj0b+zY$a2} zR`eT~+Na5^WL4iGSsh5Wc%Eb=Ydww2v-jgnJ#bvYx`2N}9ybqgYbmf!#Lat4+#2(^ z)$GT;ORpk!U0zZ&Z|>>2zipqN9pVF#ut;u?vD|*q*nEG-j!2AWIZ@xr4zXb-D(ZKz zkHy$1@5M#3jpHI56sWBoyW09%y1UvtHbGUxJOX*q0L%CJV;skZ`My|`<-A(4q?LU< z;AdMn*2l97Bhex^?qH)sk#Lk1i{<*(K$Oq6Q6c88J~r&<_79<%DI}Oe4^{dBV6WCk z3{8wQOEgBq>aM*0yXjNUTz>uJ (!4M z1Z;4T+LwC{mgLH6i81En9;{MZbI 2gm&BM`Y&?Zj#^{x-F@ z4&$0eErJHlOEY+<+M0XjR-5 U4wc*BO0|@G z!u%DZmOS3BAH*B!MJWNFvRhjR$cElQNw#5ZAF&4;#$iWtI`pPHsE&G#7{ lFL^2o%$M*Taq+ljbUh3%D*|wv{)6vz_on65^f`;r$mU;F@qI}pFVwZU&$%bOl zy-1*Co?zf{wl62)GGN8C(oW55oyS<2t}hY>(b&Ey@8e@pG!ohRK7RiYOydjsxKN)z z5)1RoJVQRt7m9l4G1=9;^?|mXo`%IR^80_eaaYkD;E)OZYprsIjfIX74i=r=)$>3p zS$o;e%Qk-|er)>LS6(GVtsjiMs1LExsBeH3waBK&N?}MoHkdy3t1EB)$5-$DEQYje zK3>?ev%72houkw76Vpe2nk9P`^SU{N!$pgPha%yyyq%&Buu(Ju$co*^`T=8!n!u1~ z=x2HV-bk1gjX5+%M0t)gA|3?r00<*(pQs&-BB;j1LPJpwSzS@Tm-R6$7lk_#83xd1 z3Pb~8IF7I%Ee9^VXiyIxrjLX|0bVo)eZW349ALO2;K3n-FBbWOkthqtm6g`CZU4^g zTP45;w>c6FKH~E~Dq7K^TUoSmG8VLQNO^F`-lF{shdHDMMJu}x&|Dxq&?jFC4w+J} z0(BKD*F;`apR^{E(gvVUG$H8N2L}SLh>Et*l@e)K8q4v0fJPBiRffO|qLYgv&PLI$ z %O_}7}Y)er9qFl^*u7>;h}!^%*lx?*U*X!c78Rh4!dut5es`g6EzSFmpnVwWiE z!Pbn~cCdX`vmGZJFIsI=R!_?6Nn2~;+KjFAg!{OATzkx&wk?nAW_6g+dem~*l61Dd z|HOYC_``u@`|e4@gPC&AD@`vnrOTJbtrw{Bi^cA-;P{$1*1fte*|<8ndQWmuZ@T!Q zB>m8w4zoL-f8xZG$Dd4$oV_FMygOZTPuy^^)b*14m8uu2#uuIC(q*gDrEB6Qm}~>y ziBGNH{@MENFZI0A`$F$HHPx^>)v$W9Voh@QLrFvV*E(!fC;t3H$?7{NOIIfE-ysci zRGjEL-Zxd+kScAEIF_!SD&3GO-H fx#T#NF+0W@Q|7u%dF|oO z1r__4DP^w7lvf|_yi{Hr@0`_ %n(#qx?6Ed{(XxHH`cudir6z$2-_G2R{x;g`b zm8a<13~kBKX6Tmk%GnZAy=e|J8L3$ZW;7qA4pT?Wusx;jDSKVYUKclJ3>NuV)=uo7 z2u>P0GdAams^e8@TTNW|rL{a`vuErL7j0E3n>SGp*_>WmWSXnNjMS9Doiez`S`+nu zGI+n8E5m5VHH;ve@e7nSLl+(0c6i&7?u!;@vb^an;%xJqy77HU*S%@Ws)@&vh7H#* zQr0P|Dn(UY!*H?-mk8IsxAT45ySpYfCtYo6OZ)leq+uHn+OJ_+vgnIEwi$~xuD?EO zgy!pWus!pI`CAm-*Qdx%Egs*DZ}wp4jZP?^_ZYX-6X(5+9VNskB@QTm;&pAoiBEAF z%AcC;TdIjqt4UO@C$@O8Pra^AocJS7z*rG)lJ(|~-=04DQ&oGOxN>rwLtrhYgY4?b z-&{@{0 sX#stZx9qin z-*i*2jU2#6b@0;zj;j%Lg*~{_3c>;0sxL$#WYHT!wuiopkM^c~vfdknYDn;AbevN# zApdL-8Jz`5(5d_o9v=a70VBvlo}^#^KM5IUNFX6O2($~6idGqe G!}k?&Bh&Lm-XE0651l$atu>w^l}0MtCcz$*C#)&tgITt(6Iu=h=6 zQAbJGySS@|LkB%lq}W_!$t7$yQl_-bdpYH<#R|4->3*QAr=ij7@kKp+g!cvKVZ9N( z0@z1LPjgqRM ohd=q(bda%4fkc3da4 zL=l2qfVVu{Rzzz>=r^C<53R#!%x^%=NtkNZg3(1;RJGz3K$WXJPG@W-aorb`BSX8U z=&BT5mGs;(N#B{Fi;)Q)TRKVCU!q(|sxFUU%_O~6C0IU5HvlGZu8UI_C;$%5u~muT zUkWFMvs=?knp5uP6x|%}yd(pKV^1X7Ck;z8MUE5J bstoN_sN@cD0KOpAH5Tn|{F(W}MU<+bK1#D>6YLEgMweb|lEhR?F zkOBq~Ro% !6dYgYJOtJFLS^C75R&%!+m37K7ARpNB*I3j&NX9_(oSa^fA&Uyx1pT>jb9 ziswD?oE&y}RU=Yz+yR_c@Z!M- $5c9_4&eDda<}Fv#2Ulwm1<;Je0hx1zguTk|?pvVuaZOPDhbj za*jaTEO3r)P&ePs>HmLv27MZQifnP>@#d8qORztzbwc@EiE-m9;@l$F#+Agml_V;! zA~vqY&aHK|*ohD91kgx&WICR>`obyYzDQqo<|z%d<&x5{095@)DviPl13uLJ0EKZY z%7QW6vU1XVA0)@IJVj$v6qD&?g2B#Ee-~xbG)gLA>?{<9vrv+%B0-Am#~X&`5qOjm zz+KP}D9uwyq@<9@1LWUTA&u01Z-iJ0FLM1L4L5+Yy(kMnHd5URvE`n&c}P{ksp>EZ zUZREN^csYdW%VRqURg<@UCO`AS@q;K-$XO5MdC>6DF+faptc-R)D==sA`M?XA^xkN zK?&j^S6^09vW7A%B(nA9BG7#Seo;LfPXX=Zz6I^1#>(z{-#8&m8n$MvrAMDU{N$w7 z^Sw%Gmd0Fb01zrrO#sF9@TB36jDeOTKMFu{)Ff6VmL`|CC+(XcP$y#~`$BZ{?Hu(3 zBRaEs6#cnAXIjYWZ&6u+wUT&zW6{PY?3~L9<#SEOmL fg(Td@B@SCf7J* zl4Ua|xgE&8799#@_ItTPi|(9QtaM6&T%`lK(xQ%&|F%Fo1YaY#ZHO=Wpd}K4FfaKp z9C8vkWblwdM>kQrUy|qa4YXqN6*R1c&-1Y$yN(-$Hl!A!!;k}V#qs~fsLNQ}pRl&i zv8HLP`g6?wId *L*?<71eLt2<6wCj z-|~gMY^*wgkMEnb-+Q5~a*Rv(##<)-d9rNl1-B<*Pu{s{(!KdYMNMMmc=OrnWOI13 zBJ#EIZoDyLvCR_jyj0?tCE*FKhNEm&k4gs2<-Ue!(Da=!J!;$se`6!h-Zj1^xuFNm z{s4Fm^~;mX?@KRU_cd7wOo15z&r6P)Yp9MG>;eW*oZT!js5n#+KQR7ivSs(diZuS< ZJO
g=MjoeQ^(IW)0m=4XIEg!@rGND8olcV7 zxXkpw?nt}ucHi55yZ7Gvy}jR?Oa=s{_mhvhnu`(ok~}Cyogr>EQwY6|Sj1966rwy7 zMQA!mhZqkNQh8J%wMQ-WF+oj8>(NQHDyR<`JO+tY2Ma<*k1=HOm_lZcS?bdSEg`GN zD$&|tVW`Mc6e{) @r$z;&d&}#>!L+X59T@oeWFV6b@KiCMx5DG9&h?rqp4?40IwNmRQHA= zaCoAYkA^}%d_>e@`2_r#7eG!KY2p7A{BOPh2J`@;h$&M86#QvD60)4#8S^3{N>N_g z02QOU6?aeup=IfDMtRu-G^-k5VstlIZ&`^kG1RSF)(MXtlw#Br0?C5^ DQf_Q%>@su%S!KSI3}teCT+@6=F{dXMR)T$boD zSkK^uBKw1E<6mEXL+Yz{rbo`t|Kfx^mKr*l9-c@Iy_355%hY?rsk5UQrquHzsflOd zH#>4PHS|LI)Jv&n-cG;ze){Bv)W 4|sl-|cR7=SH?WYxuwroIsS=Z&p?3 z`sg}%Uhm9m`{m%#xrvMEGasf;{w(#(M_+yNT9z4{x_}paLX`LV!KFnF%L%?fu>XLh z9XTFS_O!IQTic;&e=wuU?TUVNP9=P~mWVvEvw5Gp&3StbUk7UD#^!$qx#;P2x6hrNIk5H3wzAK*qH z+$lw8(Lw@0$L2!5F#u?Rg$y^v@nnnAn}F({PWRmh9|^Er a z-1Mgc`?aFV >(felVwoTP-o37X%w=8HF zyD@HD&>_7=S|m2oIkRr_)Vj@=EQy-kS9N=mI^(H=;{_*7iyCBgBnwNDWow3eXDS<~ zDjO4PzW2U4QF0GxzNl4M4U0ucUofL{Oz9lMO(X06sB?X@s7J=K8;DkKB@>yBn@(7+ znQg<3<7-EKW9vrk3Hu$><~uJHCUkdyyPyWPZx=zArIY-NCuey~?N(FqT}8V#qAL|P zps#GyH?3!`+ a>{`KYKOHh=Ur-Y&}{lBAVc@9MZfylm=m6(cEmqVX|YrIt`R9ILI zt0f6(fFz`uL=zA;i+Vzd!Z}_rmO+I9wLsiZDy$J^H<8%`R7^c?R@NBM2&7XgtcA74 zG;E=eMJ }VJplAMz)k1<``cv?ai)I#kqoL5LrXg*uc-fN=}C!_85j+)by>j7K~ {*u`LCeRvgAqPC0kv5>dJtwS4v&>?U<;7jxnCa>5< pS_g;0hWT4cxF USChP2O74x}@0iJ%Q zF~mv!B?D*b)0b1nPs+X~=hmVz|N6TGPswfaDfr(cCMe%ZbJ3TjSUQH_Zb@trN*j`H z3##Z#@E|$)k`*Nn%U?Hlf0#cz2=@e%c7Jdz_11^!p^sCS-cb-fo*ucJx_k<_Wv6#; zlbmV?V866Ty>sruTXPdf=Lg@I`|LTmS;*loe F}e1;D7+gQ z9?TgzWqU7O{++-Z2zNyA{lIa8uoB0XOdql-qb+H&CatweQ#sh6Q3E!p*8GDfXm-p} zNVT|Di#ifU$#)e@PEXW!_~2#%p^5Nda_o|ui IOz!QHXNzi WM%KPx3q|2;e3{ku!N0-Gmy+XQ$f+pFpBPZU=q@`pBX3e}HO% zp(yGLv}P7 @@LdKg<8Kr4S%OK6#X{@)%7A|C{7j>Ck^(5z8pN&Ko4pb ziV&@P&T-Tce{|GwDLm+yruHt-byRKQw!3a1Xm2uzZlR>E1s1;1$;<~v4kxxgO1KX$ z&|83eO~VZYE#ZC)xPe^gm$)mAR>a*S8xmU{9;}$ATK`&8m#E)9U9;mlW27|6vZ@=T PC1P#Vbs9*#4|eo#Ll&Ub literal 0 HcmV?d00001 diff --git a/service/device_service.py b/service/device_service.py new file mode 100644 index 0000000..6431193 --- /dev/null +++ b/service/device_service.py @@ -0,0 +1,251 @@ +import json + +from fastapi import HTTPException, Query, APIRouter, Depends, Request +from mysql.connector import Error as MySQLError + +from ds.config import LIVE_CONFIG +from ds.db import db +from middle.auth_middleware import get_current_user +# 注意:导入的Schema已更新字段 +from schema.device_schema import ( + DeviceCreateRequest, + DeviceResponse, + DeviceListResponse, + md5_encrypt +) +from schema.response_schema import APIResponse +from schema.user_schema import UserResponse + +router = APIRouter( + prefix="/devices", + tags=["设备管理"] +) + + +# ------------------------------ +# 1. 创建设备信息 +# ------------------------------ +@router.post("/add", response_model=APIResponse, summary="创建设备信息") +async def create_device(request: Request, device_data: DeviceCreateRequest): + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + # 新增:检查client_ip是否已存在 + cursor.execute("SELECT id FROM devices WHERE client_ip = %s", (device_data.ip,)) + existing_device = cursor.fetchone() + if existing_device: + raise Exception(f"客户端IP {device_data.ip} 已存在,无法重复添加") + + # 获取RTMP URL + rtmp_url = str(LIVE_CONFIG.get("rtmp_url", "")) + webrtc_url = str(LIVE_CONFIG.get("webrtc_url", "")) + + # 将设备详细信息(params)转换为JSON字符串(对应表中params字段) + device_params_json = json.dumps(device_data.params) if device_data.params else None + + # 对JSON字符串进行MD5加密(用于生成唯一RTMP地址) + device_md5 = md5_encrypt(device_params_json) if device_params_json else "" + + # 解析User-Agent获取设备类型 + user_agent = request.headers.get("User-Agent", "").lower() + + # 优先处理User-Agent为default的情况 + if user_agent == "default": + # 检查params中是否存在os键 + if device_data.params and isinstance(device_data.params, dict) and "os" in device_data.params: + device_type = device_data.params["os"] + else: + device_type = "unknown" + elif "windows" in user_agent: + device_type = "windows" + elif "android" in user_agent: + device_type = "android" + elif "linux" in user_agent: + device_type = "linux" + else: + device_type = "unknown" + + # SQL字段对齐表结构 + insert_query = """ + INSERT INTO devices + (client_ip, hostname, rtmp_push_url, live_webrtc_url, detection_webrtc_url, + device_online_status, device_type, alarm_count, params) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + cursor.execute(insert_query, ( + device_data.ip, + device_data.hostname, + rtmp_url + device_md5, + webrtc_url + device_md5, + "", + 1, + device_type, + 0, + device_params_json + )) + conn.commit() + + # 获取刚创建的设备信息 + device_id = cursor.lastrowid + cursor.execute("SELECT * FROM devices WHERE id = %s", (device_id,)) + device = cursor.fetchone() + + return APIResponse( + code=200, + message="设备创建成功", + data=DeviceResponse(**device) + ) + except MySQLError as e: + if conn: + conn.rollback() + raise Exception(f"创建设备失败:{str(e)}") from e + except json.JSONDecodeError as e: + raise Exception(f"设备信息JSON序列化失败:{str(e)}") from e + except Exception as e: + # 捕获IP已存在的自定义异常 + if conn: + conn.rollback() + raise e + finally: + db.close_connection(conn, cursor) + + +# ------------------------------ +# 2. 获取设备列表 +# ------------------------------ +@router.get("/", response_model=APIResponse, summary="获取设备列表") +async def get_device_list( + page: int = Query(1, ge=1, description="页码"), + page_size: int = Query(10, ge=1, le=100, description="每页条数"), + device_type: str = Query(None, description="设备类型筛选"), + online_status: int = Query(None, ge=0, le=1, description="在线状态筛选(1-在线、0-离线)") +): + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + # 构建查询条件 + where_clause = [] + params = [] + + if device_type: + where_clause.append("device_type = %s") + params.append(device_type) + + if online_status is not None: + where_clause.append("device_online_status = %s") + params.append(online_status) + + # 总条数查询 + count_query = "SELECT COUNT(*) as total FROM devices" + if where_clause: + count_query += " WHERE " + " AND ".join(where_clause) + + cursor.execute(count_query, params) + total = cursor.fetchone()["total"] + + # 分页查询(SELECT * 会自动匹配表字段、响应模型已对齐) + offset = (page - 1) * page_size + query = "SELECT * FROM devices" + if where_clause: + query += " WHERE " + " AND ".join(where_clause) + query += " ORDER BY id DESC LIMIT %s OFFSET %s" + params.extend([page_size, offset]) + + cursor.execute(query, params) + devices = cursor.fetchall() + + # 响应模型已更新为params字段、直接转换即可 + device_list = [DeviceResponse(**device) for device in devices] + + return APIResponse( + code=200, + message="获取设备列表成功", + data=DeviceListResponse(total=total, devices=device_list) + ) + except MySQLError as e: + raise Exception(f"获取设备列表失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor) + + +# ------------------------------ +# 3. 获取单个设备详情 +# ------------------------------ +@router.get("/{device_id}", response_model=APIResponse, summary="获取设备详情") +async def get_device_detail( + device_id: int, + current_user: UserResponse = Depends(get_current_user) +): + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + # 查询设备信息(SELECT * 匹配表字段) + query = "SELECT * FROM devices WHERE id = %s" + cursor.execute(query, (device_id,)) + device = cursor.fetchone() + + if not device: + raise HTTPException( + status_code=404, + detail=f"设备ID为 {device_id} 的设备不存在" + ) + + # 响应模型已更新为params字段 + return APIResponse( + code=200, + message="获取设备详情成功", + data=DeviceResponse(**device) + ) + except MySQLError as e: + raise Exception(f"获取设备详情失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor) + + +# ------------------------------ +# 4. 删除设备信息 +# ------------------------------ +@router.delete("/{device_id}", response_model=APIResponse, summary="删除设备信息") +async def delete_device( + device_id: int, + current_user: UserResponse = Depends(get_current_user) +): + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + # 检查设备是否存在 + cursor.execute("SELECT id FROM devices WHERE id = %s", (device_id,)) + if not cursor.fetchone(): + raise HTTPException( + status_code=404, + detail=f"设备ID为 {device_id} 的设备不存在" + ) + + # 执行删除 + delete_query = "DELETE FROM devices WHERE id = %s" + cursor.execute(delete_query, (device_id,)) + conn.commit() + + return APIResponse( + code=200, + message=f"设备ID为 {device_id} 的设备已成功删除", + data=None + ) + except MySQLError as e: + if conn: + conn.rollback() + raise Exception(f"删除设备失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor) diff --git a/service/user_service.py b/service/user_service.py new file mode 100644 index 0000000..4ab04ec --- /dev/null +++ b/service/user_service.py @@ -0,0 +1,154 @@ +from datetime import timedelta + +from fastapi import APIRouter, Depends, HTTPException +from mysql.connector import Error as MySQLError + +from ds.db import db +from schema.user_schema import UserRegisterRequest, UserLoginRequest, UserResponse +from schema.response_schema import APIResponse +from middle.auth_middleware import ( + get_password_hash, + verify_password, + create_access_token, + ACCESS_TOKEN_EXPIRE_MINUTES, + get_current_user +) + +# 创建用户接口路由(前缀 /users、标签用于 Swagger 分类) +router = APIRouter( + prefix="/users", + tags=["用户管理"] +) + + +# ------------------------------ +# 1. 用户注册接口 +# ------------------------------ +@router.post("/register", response_model=APIResponse, summary="用户注册") +async def user_register(request: UserRegisterRequest): + """ + 用户注册: + - 校验用户名是否已存在 + - 加密密码后插入数据库 + - 返回注册成功信息 + """ + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + # 1. 检查用户名是否已存在(唯一索引) + check_query = "SELECT username FROM users WHERE username = %s" + cursor.execute(check_query, (request.username,)) + existing_user = cursor.fetchone() + if existing_user: + raise HTTPException( + status_code=400, + detail=f"用户名 '{request.username}' 已存在、请更换其他用户名" + ) + + # 2. 加密密码 + hashed_password = get_password_hash(request.password) + + # 3. 插入新用户到数据库 + insert_query = """ + INSERT INTO users (username, password) + VALUES (%s, %s) + """ + cursor.execute(insert_query, (request.username, hashed_password)) + conn.commit() # 提交事务 + + # 4. 返回注册成功响应 + return APIResponse( + code=201, # 201 表示资源创建成功 + message=f"用户 '{request.username}' 注册成功", + data=None + ) + except MySQLError as e: + conn.rollback() # 数据库错误时回滚事务 + raise Exception(f"注册失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor) + + +# ------------------------------ +# 2. 用户登录接口 +# ------------------------------ +@router.post("/login", response_model=APIResponse, summary="用户登录(获取 Token)") +async def user_login(request: UserLoginRequest): + """ + 用户登录: + - 校验用户名是否存在 + - 校验密码是否正确 + - 生成 JWT Token 并返回 + """ + conn = None + cursor = None + try: + conn = db.get_connection() + cursor = conn.cursor(dictionary=True) + + # 修复:SQL查询添加 created_at 和 updated_at 字段 + query = """ + SELECT id, username, password, created_at, updated_at + FROM users + WHERE username = %s + """ + cursor.execute(query, (request.username,)) + user = cursor.fetchone() + + # 2. 校验用户名和密码 + if not user or not verify_password(request.password, user["password"]): + raise HTTPException( + status_code=401, + detail="用户名或密码错误", + headers={"WWW-Authenticate": "Bearer"}, + ) + + # 3. 生成 Token(过期时间从配置读取) + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user["username"]}, + expires_delta=access_token_expires + ) + + # 4. 返回 Token 和用户基本信息 + return APIResponse( + code=200, + message="登录成功", + data={ + "access_token": access_token, + "token_type": "bearer", + "user": UserResponse( + id=user["id"], + username=user["username"], + created_at=user.get("created_at"), + updated_at=user.get("updated_at") + ) + } + ) + except MySQLError as e: + raise Exception(f"登录失败:{str(e)}") from e + finally: + db.close_connection(conn, cursor) + + +# ------------------------------ +# 3. 获取当前登录用户信息(需认证) +# ------------------------------ +@router.get("/me", response_model=APIResponse, summary="获取当前用户信息") +async def get_current_user_info( + current_user: UserResponse = Depends(get_current_user) # 依赖认证中间件 +): + """ + 获取当前登录用户信息: + - 需在请求头携带 Token(格式:Bearer ) + - 认证通过后返回用户信息 + """ + return APIResponse( + code=200, + message="获取用户信息成功", + data=current_user + ) + diff --git a/ws.html b/ws.html new file mode 100644 index 0000000..d81ceb2 --- /dev/null +++ b/ws.html @@ -0,0 +1,482 @@ + + + + + + WebSocket 测试工具 + + + +++ + + + \ No newline at end of file diff --git a/ws/__pycache__/ws.cpython-312.pyc b/ws/__pycache__/ws.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c5b72cf4c6ac903710b8a29dae318e4f77ae7fcb GIT binary patch literal 12940 zcmd5@dvsIBnZNhyZOf7@$q#I7%P)+;7zmhzH^yMhBM<@!-~@!xz1Wgv*}al*3^F(g z={gAwW|syMl0cfIahvc+nWebSocket 测试工具
+ + + + + ++ + + + ++ + + + + + ++ + + + + ++7Y*G$!N+D(#)nzNR=La zdsb5xO)(V5ypkBC$-q!6C|0$fV%4uOVquffMKzf?HJ8m9cQO;8kTbKI1(33eW(%j~ zGTf6Bucsu5c3PpGZa>wO!)37 8GGuVYghtO1m3ZZYLQEq zxqQwJua|TBqnd_hpC8)T6;b6f7ZfP!yxmdlx=z2V!`tjxYp0`@W?#402`~7$2mI~L z-sTpLcj8D=P$e?gXG7))#ZgT(OEobp%_&$0W~^itFe??S1TLsq9jk)bsyQu`Yxb*} zblqBeM$}aAadBRMy+{^}9o2PlJA8>rIE6}wtJA5F289&Y2cNI+gy;wrq=S?La$*QF z@RZ;eMfsIdxm(UjB^2e>B=b^ES|R8~exsYR(`)T2J_{Q028g0sw%O15UF}>{;qB;( z8a%*FM=RIN`*(27eq^eT^XyE%;6r+CHrtok%jeyHuZ!h6?(g!!Uri@4-QjR~U4DmS z_k?t2)JRo0sILm5Uh3k6vOeVjLn0Y2o)jsb9x9$5DV`N7o)s>x?NffHExZm*DW}W^ zc)+O&OgjgeBa|OvF}S5nO6pE!ccMInF^LZZ*ef%TrN4WREOQLDN|17EQ%Fc;kHH7~ ztV;R-XsTOfR|iU=a`f2iqfefR{pR7zZ~k)h$QxIVygoAcR_xdR5_|MV;$xJ)@5}7U zEz>I7r&Y4H%Eoz>ch0Na@Mk@|yVVvu^^?)QH^*K%Z42Z=fi1P4*qgr)JI5tAK`oz+ zkr|?>s*`tl{lXrw)4T )khX9{ zo7GPro*Xfkhs@;>b5+P(6*k++l=&ok!Qh$;*54I>R7@H-lg(aY==eWToM% KqekcKDCv%2JVg42R-1CDHJhPe)%r zd-=_W|8%ZT+$NXbcm%iD(Y{#kxj&tIdd`fom;d=s=N{$r;e8&@f4%^sKsiLx#u9pk zwj+a&jlTMPqMJ9L9Xb1$ZMULk=FR}_Gn>#)pWJZ(AyDkwu1?>HS^f@xvqxH8z7U#< z+fpHHOTGvSqMF1ekd7a5n`&J?N3+MZiyP;_k*aVSzFvr6TUrVumWq(2B4U{wvP=$J zrVMP1Oj{CyKg*Jcd0oi7E^J=kr}>uqG)~c5=!%OKQwQo#EF?3QoG&3u)|2&{;tJZh zg}y=2x+eOPG52Um$T*p3Ckwk7;OGhhDgXwYmfq%e&f$n=I2`RAY`}wgy~FW?K(j}z z;c-hx6+S=D=Rl61C_KV<8&&S;= eVXyfFe#oa%STVOvJ9RmuNP8YzVL%>kz@q7d6PZ;P5y5+MLi# z4tTCV48QA70L`l@8a_hZ-*GngP<{!NL0AfcV1y^@vx?+~PpMZX>Q%{8+%KQN(i_L9 zXO&W(rdp^TI@!jJ$46R_P+}Ri72xWA__TVM9z{^$M%JbUsVz?*$P3bLV_G#l2ASm8 z2kc6Kdj7yTGe~^c$$126={@-1wa;p#CmMPH%36>r;Ilm2pdj7}bmE7Y7TkD7rv(sa zGG5J;M8$_GR=)>U@JZzn#Sc{~SjTaw%nC9 vEq)+V2W%)(puVoB}g zfJ;evP2-Gcy=mp~L=b|zSjr_+Hb>%-JQqfqOFw;J)$P;)PdVjBm=i(;#V_trjx(lF zn5D^|C$$X9`v+#9?Z<;f2+49c9+znW+L}`j>Mg+G9_3!;fv3f^i|(QxP;8;P=yKR4 zNv`dp_`>AVx94FOCG`h0-E&gWMcZ=&_X1I|Q%{dRb36?e!~+}vVf6IDS$O=94EDmg z4j}Ny-Umf^7z6p_iI-xJ9=`I_i_man@JF$OhpzNKJ$C#hX!a$Oai0Jcfl3%-t8L-J zg|^ta-q_2JgHQvVUH;+ZvY`I7384VA_?Dq#zd9Q4wgenqS@Es zT`R(zC_laVi`XN-ymI6uydaJUT?SBO0y=~WnF3n?QQ8)J@wLmZ11p~XGq!@)t{i^q z^6ArKubquO_c#^ 6Fa5ey1BMdQm m!?gwJ^>)2>gLW)&dUlid{pK1bojWa zZutXFPEe6}INhkI(%;?5@fDcW2-=dXBdYRwIIh#Kit3#Ko(J8=A?OLRJeGh3QIE(- zzy&HkH0Q&^8?*#lLY~Y{B8Rh;1HF#tmqH7ivu_a`t(U34_EMKJ48wUPk-W*FyvdQg z>QG*FIIpHpKMYc%qJIr3y^UCAky)FGVat%V2}|kzYEm+TWX~is*ALx&&(OUs;k#SO zdKWRchqP^RHC0&DXB@7a6RBJfs$B5<%0(f4Y2V8J-4`uI5zC~IWzuJs$rnu}5!0lQ zY0`=A&rCDpDr(Z=s~MEOa9E$+U)S#;<+DjnEva2c?sAZI+sXEwLoII7wmaOihwKc- z;rONN8AvS4m>U$8wVe69sJdSlXY|>AdbsqqGjl_w3;S1HDyWDQ)PxFZA_cQU1+xb$ z$ckoCFgskZBVye_tUHFS6OPs$^^mCxNYTQ<`!Cop6q1c@QqVSJ^~Cj5e(6v}?WcLQ z!xN_sxX8pihVzR~&?l U}#mN}LDZN%&t z(r>?^ri{4<+aGN|x-@JkiD*lRw&ZFZO=TM66;zIs{$eE0c8y}PH!?uc#e%}4YlwXw zDV$H{-zg5VW5~Iaw6unuF0#W-+(DT0UV14qpq^RAfUdTeUe5dlleNrMF%fvQ4x6lF z?#5)o2IemcI%gv^!1$}U0tfsmo*~XN^zG&eE0sUz%eaBk*}n7v3{nr(&0nFS-(EBq z;&<+_FE=someR`&>UYcXFh5rdw!xzM zz@Ta 4*_yw{ikD(4S;GF>pK=Z`<93wBWKUX9(x|) zM1V1@J^1A48+~GVQs)aGfB@Ku1_=SR#U=n Idh&(X z;Gj5Kz$W%C44q0C((_Mm0d_){_ADNiq-drS02<5?sBH*HX^V!mC2>Zj zYomwFwus&q(%a%1s$_QWUO=&|(l3UMxe;S!$XI!eqIElI0h|uEMslk{xmA(esiEAd z;oNDYW--ZGLY6p*nf<+<8#d-+Yv>Z8?3S&!4>hxd+Zk?dA=_Jt*%i^dLwYxy%J}Nv ze8`H2mS!!RLBC_tEiI?snLsZsRlZZ8#C)*=%S&m@moHv6P4R9S110ZPXfa=X`?AT3 z_sUgJ^4??>l)N_$+l%soNt7N0k;3P1Ztvt#q`AqagNoL}@VjmW2}(RzgDD5YE}Ea= zN7+gUV0BSS`SC0`-( >^l~g4`;Z{p<1LjUW9PF#% zU`HWY2yn1I)uZ|zY0MI6H2Bd}B7~r9ZiSUKrYLf9xf|7lv;b{QsS4XodyeAgB%vut zfnxjp?Z9k-esiiq3_zgmQSAj-6w@>yumsRu5bHlA>m;#b&&6K+k$A3-J%8%*DN$v6 zY4pUoD-ZrE_HRG8?I}YkUijq7<~@L++E#|rf7!e;SxPVKvB4X$a|c0hNkN8E7!A(& zlrd!GN2VZQhekE=nSoS2XJqhXLsTaZVP~Q`nGzl?mTM(73vnhnC7WncOdS;=f{yAr zuP?xJj%J_J<=QB{EE1QrisL>_fSF7zKMM*4+!8gv5~0np6C&RdBMEBV<)6GF>Sq8+ zh>PjiqXpaCPMn6QUFm$FXgS=VQ%03StYQQ0oxUUh2@@6cEHH$k&5U0SQ8drT@w+(Q z;Q_P*|BK8QP{y2Xj&@%QzXaO~u!RzoO$ee9)GUlC^VI=w@Tl)M!_W7&1Y(yorlEX$ zNMjcir&}1|$%3t9TMOCRN?P|4!@e67rK@8u8m%|9R7q8ucP3K2C{( mS}5% zn2QGEVQs`z8ZwneOqC&1W!O|jCeJ0td1T(23tKL%CYxG@T0N0gKGez+-|jd?tzy=p zw&kMlLVco@UXMxE-OLTZtPRX1qXn>2H|GnWTAy=`(f}#nc9=HeX%Dx0NlORx*a?~q z3{%gn!2TPVm8js t >oHn>H!LPYRyIiSVk2a|7wT?qwo-pV9)Kx$5J_vmX zuxUYB2hexRr7CdtX{CK3e@`k&kM@?0Gvb6>&??Uf#GZmwdcanmbvp&|r$XEU;@khg zZzhOvve4v4gJD_#vCiJ38;1gMXBPTV4--OAc3;BS6H|Q~AokEby1lvs^ BaqE zS4zn8XWtro-oYHyVBLH25$^55F-zQ6rd@)O^SNl)8Q?_6BO$|03M~4cY$XqF(haT z0zEu_B%)slwLj2NBsWfqUV;uP+t}0lM~}QLz7B(ay({$8AlcZ(=r0FiKRgiYlZF%g zHE2N-Zs&+ztAwXY^bi3zpmoZaC|U_)3j6|I1`uk&&m%lve))yb=lZ3YKHYEyu^wzx z>mZ4$U0xQPE~3V8CzSAOAqt4|yL|Geu|tpH+9Wj{JXq0JcBS`0m}No-Joi@Y%%Lh- zp{R Y?cB0y4mF zHTvVeRU6i=g-+yBLB*03Ghq{@kc(1|)LDw{PteT*f<`tn2o}qM(I-!WLs4F_k+Uz1 zop=yAiJulO#uLY5$4+2@342WJjB^ix$q@^swH(*$Htt&53zB{{aGp8QQ3j?(;Rat+ z+1ddrpKwV90Ywm|`MH=wa`-tAMfE)A v~YgmDu GuA`_iqE;*@i#!U{Jbper9{TnSaoj~%V1OuU= *w1VUPgvY%xL8#?q@8fo9@18bnqxXm{-MAFHz;a1y&6@*UGy4M`vUZxg7&vo z(Eh;VrKOCPOGT3+MYBRhv%*ESk-XZz#^HQh1T?~YdnA8aD1TZwe|q0aR0`{l>PXov zk~f>oZaCk4zKyKkMjS0<8=7Q?^zOL2AnRUcxS;s>l&7Yg*c>jX>DLUKaxUiPCEr{9 z@5_F->Z4U;(>Bu1lRd$qH9chYUSiz`dU19g(qk<=GXK#0qr1YENdr?qwbWj!s(E?O z^Lx(B4_7TF){2XHRU~h^=v}mv>V>3q5m~f>Y}`sVY=a3p=mwl%$qLkpCp0n_^KGEv z=9EXQvqIKcL)O~ig6e@S Zlwdd+A;8)yEsfxaco2jg3`ipo0`1t-BpG@VfqQ3wuyfr^!sR~)D zz(tw0f*CR89Ho!iNkuK0Fz1Zt{M_?X$(`LpyL-sqeXuHfXqYmtk>H+O$}GpHyarU> zvsa+*4)IsRmWkIW1#lKRyNQ>HHDHIhR^oDpxi-QArF&uGOTf_yJuPP%aP60XaR7Q; z!rX;jVghyhM2p0F(JGL05BfW^??rzn#9zf#IN(<>AAJc7u7bhQJdu41n+#lxoNytN zYyjJbKjP~S`ML>@Mvy(UY?c558btyyfw~opU=m3XkR=eH69|C#tC37Aj(g9y8y7qo zEu_^IX5GZuM%sFS1^Yyw_cGuquwWl-3!q>hy$)G`2~e c52R_4zNI%^vPj$-|kzxK`jnjPjJG0xw7{{@HQL$|HFgMOWER4acq zA98QcoVhB8dZ&zT)G6O7ud~C$dwRN2uY51-cF4c4s=K|>M18P;hKCOpmO%c)iFBhu z`C(BN `#0DB`O_o(=#f9>cl^VrIj0$Q$(&(Y&BTMb7EXBuK zEcrM~1tlMw6`0S_-Dy#LTwJGHU9I?}O0~9<`lMP9MW5VJmwl%}@mn1h{nns@qTgB+ zn9r>{3N1NNHQ19~-WFbi#dtT3 zM{kITs%WfwzmM|3EbiQ4N2f zCWWX;pHs%qDZ}Sf=6_OCu7V3L_wm)ey1!`6wEZdtQM{VEZ8o8@`)40jkfLb>&T>ml zn3@IW;gp(QL%;TD-tmb~O@wPDVQShnbs24ouY&)lkrQW(bQ^tCbB)6Ex bool: + """检查客户端是否活跃(心跳超时阈值:60秒)""" + timeout = (datetime.datetime.now() - self.last_heartbeat).total_seconds() + # 打印心跳检查明细(便于排查超时原因) + print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {self.client_ip} 心跳检查:" + f"上次心跳距今 {timeout:.1f} 秒(阈值:{timeout_seconds}秒)") + return timeout < timeout_seconds + + +# 存储所有已连接的客户端(key:客户端IP、value:ClientConnection对象) +connected_clients: Dict[str, ClientConnection] = {} + +# 心跳检查任务引用(全局变量、用于应用关闭时取消任务) +heartbeat_task: Optional[asyncio.Task] = None + + +async def heartbeat_checker(): + """定期检查客户端心跳(每30秒一次)、超时直接剔除(不发通知)""" + while True: + current_time = datetime.datetime.now() + print(f"\n[{current_time:%Y-%m-%d %H:%M:%S}] === 开始新一轮心跳检查 ===") + print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 当前在线客户端总数:{len(connected_clients)}") + + # 1. 收集超时客户端IP(避免遍历中修改字典) + timeout_clients = [] + for client_ip, connection in connected_clients.items(): + if not connection.is_alive(): + timeout_clients.append(client_ip) + + # 2. 处理超时客户端(关闭连接+移除记录) + if timeout_clients: + print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 发现超时客户端:{timeout_clients}(共{len(timeout_clients)}个)") + for client_ip in timeout_clients: + try: + connection = connected_clients[client_ip] + # 直接关闭连接(不发送任何通知) + await connection.websocket.close(code=1008, reason="心跳超时(>60秒)") + print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已关闭(超时)") + except Exception as e: + print( + f"[{current_time:%Y-%m-%d %H:%M:%S}] 关闭客户端 {client_ip} 失败:{str(e)}(错误类型:{type(e).__name__})") + finally: + # 确保从客户端列表中移除(无论关闭是否成功) + if client_ip in connected_clients: + del connected_clients[client_ip] + print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已从连接列表移除") + else: + print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 无超时客户端、心跳检查完成") + + # 3. 等待30秒后进行下一轮检查 + await asyncio.sleep(30) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """应用生命周期管理:启动时创建心跳任务、关闭时取消任务""" + global heartbeat_task + # 启动阶段:创建心跳检查任务 + heartbeat_task = asyncio.create_task(heartbeat_checker()) + print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务已启动(任务ID:{id(heartbeat_task)})") + yield # 应用运行中 + # 关闭阶段:取消心跳任务 + if heartbeat_task and not heartbeat_task.done(): + heartbeat_task.cancel() + try: + await heartbeat_task # 等待任务优雅退出 + except asyncio.CancelledError: + print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 心跳检查任务已正常取消") + except Exception as e: + print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 取消心跳任务时出错:{str(e)}") + + +async def send_heartbeat_ack(client_ip: str, client_timestamp: Any) -> bool: + """向客户端回复心跳确认(严格遵循 {"timestamp":xxxxx, "type":"heartbeat"} 格式)""" + if client_ip not in connected_clients: + print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复心跳失败:客户端 {client_ip} 不在连接列表中") + return False + + # 修复:将这部分代码移出if语句块,确保始终定义ack_msg + # 服务端当前格式化时间戳(字符串类型,与日志时间格式匹配) + server_latest_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ack_msg = { + "timestamp": server_latest_timestamp, + "type": "heartbeat" + } + + try: + connection = connected_clients[client_ip] + await connection.websocket.send_json(ack_msg) + print( + f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 已向客户端 {client_ip} 回复心跳:{json.dumps(ack_msg, ensure_ascii=False)}") + return True + except Exception as e: + print( + f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复客户端 {client_ip} 心跳失败:{str(e)}(错误类型:{type(e).__name__})") + # 发送失败时移除客户端(避免无效连接残留) + if client_ip in connected_clients: + del connected_clients[client_ip] + print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 因心跳回复失败被移除") + return False + + +@ws_router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """WebSocket核心端点:处理连接建立/消息接收/连接关闭""" + current_time = datetime.datetime.now() + # 1. 接受客户端连接请求 + await websocket.accept() + # 获取客户端IP(作为唯一标识) + client_ip = websocket.client.host if websocket.client else "unknown_ip" + print(f"\n[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 连接请求已接受(WebSocket握手成功)") + + try: + # 2. 处理"同一IP重复连接"场景:关闭旧连接、保留新连接 + if client_ip in connected_clients: + old_connection = connected_clients[client_ip] + await old_connection.websocket.close(code=1008, reason="同一IP新连接已建立") + del connected_clients[client_ip] + print(f"[{current_time:%Y-%m-%d %H:%M:%S}] 已关闭客户端 {client_ip} 的旧连接(新连接已建立)") + + # 3. 注册新客户端到连接列表 + new_connection = ClientConnection(websocket, client_ip) + connected_clients[client_ip] = new_connection + print( + f"[{current_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已注册到连接列表、当前在线数:{len(connected_clients)}") + + # 4. 循环接收客户端消息(持续监听) + while True: + # 接收原始文本消息(避免提前解析JSON、便于日志打印) + raw_data = await websocket.receive_text() + recv_time = datetime.datetime.now() + print(f"\n[{recv_time:%Y-%m-%d %H:%M:%S}] 收到客户端 {client_ip} 的消息:{raw_data}") + + # 尝试解析JSON消息 + try: + message = json.loads(raw_data) + print( + f"[{recv_time:%Y-%m-%d %H:%M:%S}] 消息解析成功:{json.dumps(message, ensure_ascii=False, indent=2)}") + + # 5. 区分消息类型:仅处理心跳、其他消息不回复 + if message.get("type") == "heartbeat": + # 验证心跳消息是否包含timestamp字段 + client_timestamp = message.get("timestamp") + if client_timestamp is None: + print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 警告:客户端 {client_ip} 发送的心跳缺少'timestamp'字段") + continue # 不回复无效心跳 + + # 更新心跳时间 + 回复心跳确认 + new_connection.update_heartbeat() + await send_heartbeat_ack(client_ip, client_timestamp) + else: + # 非心跳消息:仅打印日志、不回复任何内容 + print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 非心跳消息(类型:{message.get('type')})、不回复") + + except json.JSONDecodeError as e: + # JSON格式错误:仅打印日志、不回复 + print(f"[{recv_time:%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 消息格式错误:无效JSON(错误:{str(e)})") + except Exception as e: + # 其他未知错误:仅打印日志、不回复 + print( + f"[{recv_time:%Y-%m-%d %H:%M:%S}] 处理客户端 {client_ip} 消息时出错:{str(e)}(错误类型:{type(e).__name__})") + + except WebSocketDisconnect as e: + # 客户端主动断开连接(如关闭页面、网络中断) + print( + f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 主动断开连接(代码:{e.code}、原因:{e.reason})") + except Exception as e: + # 其他连接级错误(如网络异常) + print( + f"\n[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 连接异常:{str(e)}(错误类型:{type(e).__name__})") + finally: + # 无论何种退出原因、确保客户端从列表中移除 + if client_ip in connected_clients: + del connected_clients[client_ip] + print( + f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 客户端 {client_ip} 已从连接列表移除、当前在线数:{len(connected_clients)}") \ No newline at end of file