From 805d6b60c4b8e88facce0c756614a21d31e0f07e Mon Sep 17 00:00:00 2001 From: ZZX9599 <536509593@qq.com> Date: Tue, 2 Sep 2025 21:42:09 +0800 Subject: [PATCH] =?UTF-8?q?RTC=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rtc/rtc.py | 33 ++++++++++++++---------- service/device_service.py | 53 +++++++++++++++++++++++++++++++-------- 2 files changed, 62 insertions(+), 24 deletions(-) diff --git a/rtc/rtc.py b/rtc/rtc.py index 09f913f..938d912 100644 --- a/rtc/rtc.py +++ b/rtc/rtc.py @@ -1,6 +1,6 @@ import asyncio import aiohttp -import cv2 # 导入OpenCV库 +import cv2 import numpy as np from aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration from aiortc.mediastreams import MediaStreamTrack @@ -97,7 +97,7 @@ async def frame_consumer(frame_queue): # 创建OCR检测器实例(请替换为实际的违禁词文件路径) ocr_detector = OCRViolationDetector( forbidden_words_path=r"D:\Git\bin\video\ocr\forbidden_words.txt", # 替换为实际路径 - ocr_confidence_threshold=0.5,) + ocr_confidence_threshold=0.5, ) while True: # 从队列中获取cv2帧(队列为空时会阻塞等待新帧) @@ -116,24 +116,31 @@ async def frame_consumer(frame_queue): # print("帧处理完成、队列已清空") -async def main(): - # WebRTC服务器地址 - url = "http://192.168.110.25:1985/rtc/v1/whep/?app=live&stream=677a4845aa48cb8526c811ad56fc5e60" +def process_webrtc_stream(ip, webrtc_url): + """ + 处理WEBRTC流并持续打印OCR检测结果 + Args: + ip: IP地址(预留参数) + webrtc_url: WEBRTC服务器地址 + """ # 创建队列 frame_queue = asyncio.Queue(maxsize=1) - # 创建任务 - receiver_task = asyncio.create_task(rtc_frame_receiver(url, frame_queue)) - consumer_task = asyncio.create_task(frame_consumer(frame_queue)) + # 定义事件循环中的主任务 + async def main_task(): + # 创建任务 + receiver_task = asyncio.create_task(rtc_frame_receiver(webrtc_url, frame_queue)) + consumer_task = asyncio.create_task(frame_consumer(frame_queue)) - # 等待任务完成 - await asyncio.gather(receiver_task, consumer_task) + # 等待任务完成 + await asyncio.gather(receiver_task, consumer_task) - -if __name__ == "__main__": try: - asyncio.run(main()) + # 运行事件循环 + asyncio.run(main_task()) + except KeyboardInterrupt: + print("用户中断处理流程") finally: # 确保关闭所有cv2窗口 cv2.destroyAllWindows() diff --git a/service/device_service.py b/service/device_service.py index 25ce499..115aa50 100644 --- a/service/device_service.py +++ b/service/device_service.py @@ -1,5 +1,5 @@ import json - +import threading from fastapi import HTTPException, Query, APIRouter, Depends, Request from mysql.connector import Error as MySQLError @@ -16,12 +16,24 @@ from schema.device_schema import ( from schema.response_schema import APIResponse from schema.user_schema import UserResponse +# 导入之前封装的WEBRTC处理函数 +from rtc.rtc import process_webrtc_stream + router = APIRouter( prefix="/devices", tags=["设备管理"] ) +# 在后台线程中运行WEBRTC处理 +def run_webrtc_processing(ip, webrtc_url): + try: + print(f"开始处理来自设备 {ip} 的WEBRTC流: {webrtc_url}") + process_webrtc_stream(ip, webrtc_url) + except Exception as e: + print(f"WEBRTC处理出错: {str(e)}") + + # ------------------------------ # 1. 创建设备信息 # ------------------------------ @@ -33,20 +45,31 @@ async def create_device(request: Request, device_data: DeviceCreateRequest): conn = db.get_connection() cursor = conn.cursor(dictionary=True) - # 新增:检查client_ip是否已存在 - cursor.execute("SELECT id FROM devices WHERE client_ip = %s", (device_data.ip,)) + # 检查client_ip是否已存在 + cursor.execute("SELECT * FROM devices WHERE client_ip = %s", (device_data.ip,)) existing_device = cursor.fetchone() if existing_device: - raise Exception(f"客户端IP {device_data.ip} 已存在、无法重复添加") + # 设备创建成功后,在后台线程启动WEBRTC流处理 + threading.Thread( + target=run_webrtc_processing, + args=(device_data.ip, full_webrtc_url), + daemon=True # 设为守护线程,主程序退出时自动结束 + ).start() + # IP已存在时返回该设备信息 + return APIResponse( + code=200, + message=f"客户端IP {device_data.ip} 已存在", + data=DeviceResponse(**existing_device) + ) - # 获取RTMP URL + # 获取RTMP URL和WEBRTC URL配置 rtmp_url = str(LIVE_CONFIG.get("rtmp_url", "")) webrtc_url = str(LIVE_CONFIG.get("webrtc_url", "")) - # 将设备详细信息(params)转换为JSON字符串(对应表中params字段) + # 将设备详细信息(params)转换为JSON字符串 device_params_json = json.dumps(device_data.params) if device_data.params else None - # 对JSON字符串进行MD5加密(用于生成唯一RTMP地址) + # 对JSON字符串进行MD5加密 device_md5 = md5_encrypt(device_params_json) if device_params_json else "" # 解析User-Agent获取设备类型 @@ -68,7 +91,10 @@ async def create_device(request: Request, device_data: DeviceCreateRequest): else: device_type = "unknown" - # SQL字段对齐表结构 + # 构建完整的WEBRTC URL + full_webrtc_url = webrtc_url + device_md5 + + # SQL插入语句 insert_query = """ INSERT INTO devices (client_ip, hostname, rtmp_push_url, live_webrtc_url, detection_webrtc_url, @@ -79,7 +105,7 @@ async def create_device(request: Request, device_data: DeviceCreateRequest): device_data.ip, device_data.hostname, rtmp_url + device_md5, - webrtc_url + device_md5, + full_webrtc_url, # 存储完整的WEBRTC URL "", 1, device_type, @@ -93,9 +119,15 @@ async def create_device(request: Request, device_data: DeviceCreateRequest): cursor.execute("SELECT * FROM devices WHERE id = %s", (device_id,)) device = cursor.fetchone() + # 设备创建成功后,在后台线程启动WEBRTC流处理 + threading.Thread( + target=run_webrtc_processing, + args=(device_data.ip, full_webrtc_url), + daemon=True # 设为守护线程,主程序退出时自动结束 + ).start() return APIResponse( code=200, - message="设备创建成功", + message="设备创建成功,已开始处理WEBRTC流", data=DeviceResponse(**device) ) except MySQLError as e: @@ -105,7 +137,6 @@ async def create_device(request: Request, device_data: DeviceCreateRequest): except json.JSONDecodeError as e: raise Exception(f"设备信息JSON序列化失败:{str(e)}") from e except Exception as e: - # 捕获IP已存在的自定义异常 if conn: conn.rollback() raise e