Files
video/service/device_service.py
2025-09-02 23:06:36 +08:00

283 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import threading
from fastapi import HTTPException, Query, APIRouter, Depends, Request
from mysql.connector import Error as MySQLError
from ds.config import LIVE_CONFIG
from ds.db import db
from middle.auth_middleware import get_current_user
# 注意导入的Schema已更新字段
from schema.device_schema import (
DeviceCreateRequest,
DeviceResponse,
DeviceListResponse,
md5_encrypt
)
from schema.response_schema import APIResponse
from schema.user_schema import UserResponse
# 导入之前封装的WEBRTC处理函数
from rtc.rtc import process_webrtc_stream
router = APIRouter(
prefix="/devices",
tags=["设备管理"]
)
# 在后台线程中运行WEBRTC处理
def run_webrtc_processing(ip, webrtc_url):
try:
print(f"开始处理来自设备 {ip} 的WEBRTC流: {webrtc_url}")
process_webrtc_stream(ip, webrtc_url)
except Exception as e:
print(f"WEBRTC处理出错: {str(e)}")
# ------------------------------
# 1. 创建设备信息
# ------------------------------
@router.post("/add", response_model=APIResponse, summary="创建设备信息")
async def create_device(request: Request, device_data: DeviceCreateRequest):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 检查client_ip是否已存在
cursor.execute("SELECT * FROM devices WHERE client_ip = %s", (device_data.ip,))
existing_device = cursor.fetchone()
if existing_device:
# 设备创建成功后在后台线程启动WEBRTC流处理
threading.Thread(
target=run_webrtc_processing,
args=(device_data.ip, existing_device["live_webrtc_url"]),
daemon=True # 设为守护线程,主程序退出时自动结束
).start()
# IP已存在时返回该设备信息
return APIResponse(
code=200,
message=f"客户端IP {device_data.ip} 已存在",
data=DeviceResponse(**existing_device)
)
# 获取RTMP URL和WEBRTC URL配置
rtmp_url = str(LIVE_CONFIG.get("rtmp_url", ""))
webrtc_url = str(LIVE_CONFIG.get("webrtc_url", ""))
# 将设备详细信息params转换为JSON字符串
device_params_json = json.dumps(device_data.params) if device_data.params else None
# 对JSON字符串进行MD5加密
device_md5 = md5_encrypt(device_params_json) if device_params_json else ""
# 解析User-Agent获取设备类型
user_agent = request.headers.get("User-Agent", "").lower()
# 优先处理User-Agent为default的情况
if user_agent == "default":
# 检查params中是否存在os键
if device_data.params and isinstance(device_data.params, dict) and "os" in device_data.params:
device_type = device_data.params["os"]
else:
device_type = "unknown"
elif "windows" in user_agent:
device_type = "windows"
elif "android" in user_agent:
device_type = "android"
elif "linux" in user_agent:
device_type = "linux"
else:
device_type = "unknown"
# 构建完整的WEBRTC URL
full_webrtc_url = webrtc_url + device_md5
# SQL插入语句
insert_query = """
INSERT INTO devices
(client_ip, hostname, rtmp_push_url, live_webrtc_url, detection_webrtc_url,
device_online_status, device_type, alarm_count, params)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
device_data.ip,
device_data.hostname,
rtmp_url + device_md5,
full_webrtc_url, # 存储完整的WEBRTC URL
"",
1,
device_type,
0,
device_params_json
))
conn.commit()
# 获取刚创建的设备信息
device_id = cursor.lastrowid
cursor.execute("SELECT * FROM devices WHERE id = %s", (device_id,))
device = cursor.fetchone()
# 设备创建成功后在后台线程启动WEBRTC流处理
threading.Thread(
target=run_webrtc_processing,
args=(device_data.ip, full_webrtc_url),
daemon=True # 设为守护线程,主程序退出时自动结束
).start()
return APIResponse(
code=200,
message="设备创建成功已开始处理WEBRTC流",
data=DeviceResponse(**device)
)
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"创建设备失败:{str(e)}") from e
except json.JSONDecodeError as e:
raise Exception(f"设备信息JSON序列化失败{str(e)}") from e
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 2. 获取设备列表
# ------------------------------
@router.get("/", response_model=APIResponse, summary="获取设备列表")
async def get_device_list(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(10, ge=1, le=100, description="每页条数"),
device_type: str = Query(None, description="设备类型筛选"),
online_status: int = Query(None, ge=0, le=1, description="在线状态筛选1-在线、0-离线)")
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 构建查询条件
where_clause = []
params = []
if device_type:
where_clause.append("device_type = %s")
params.append(device_type)
if online_status is not None:
where_clause.append("device_online_status = %s")
params.append(online_status)
# 总条数查询
count_query = "SELECT COUNT(*) as total FROM devices"
if where_clause:
count_query += " WHERE " + " AND ".join(where_clause)
cursor.execute(count_query, params)
total = cursor.fetchone()["total"]
# 分页查询SELECT * 会自动匹配表字段、响应模型已对齐)
offset = (page - 1) * page_size
query = "SELECT * FROM devices"
if where_clause:
query += " WHERE " + " AND ".join(where_clause)
query += " ORDER BY id DESC LIMIT %s OFFSET %s"
params.extend([page_size, offset])
cursor.execute(query, params)
devices = cursor.fetchall()
# 响应模型已更新为params字段、直接转换即可
device_list = [DeviceResponse(**device) for device in devices]
return APIResponse(
code=200,
message="获取设备列表成功",
data=DeviceListResponse(total=total, devices=device_list)
)
except MySQLError as e:
raise Exception(f"获取设备列表失败:{str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 3. 获取单个设备详情
# ------------------------------
@router.get("/{device_id}", response_model=APIResponse, summary="获取设备详情")
async def get_device_detail(
device_id: int,
current_user: UserResponse = Depends(get_current_user)
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 查询设备信息SELECT * 匹配表字段)
query = "SELECT * FROM devices WHERE id = %s"
cursor.execute(query, (device_id,))
device = cursor.fetchone()
if not device:
raise HTTPException(
status_code=404,
detail=f"设备ID为 {device_id} 的设备不存在"
)
# 响应模型已更新为params字段
return APIResponse(
code=200,
message="获取设备详情成功",
data=DeviceResponse(**device)
)
except MySQLError as e:
raise Exception(f"获取设备详情失败:{str(e)}") from e
finally:
db.close_connection(conn, cursor)
# ------------------------------
# 4. 删除设备信息
# ------------------------------
@router.delete("/{device_id}", response_model=APIResponse, summary="删除设备信息")
async def delete_device(
device_id: int,
current_user: UserResponse = Depends(get_current_user)
):
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 检查设备是否存在
cursor.execute("SELECT id FROM devices WHERE id = %s", (device_id,))
if not cursor.fetchone():
raise HTTPException(
status_code=404,
detail=f"设备ID为 {device_id} 的设备不存在"
)
# 执行删除
delete_query = "DELETE FROM devices WHERE id = %s"
cursor.execute(delete_query, (device_id,))
conn.commit()
return APIResponse(
code=200,
message=f"设备ID为 {device_id} 的设备已成功删除",
data=None
)
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"删除设备失败:{str(e)}") from e
finally:
db.close_connection(conn, cursor)