From 10851b77a5eac33c4c4badf4439f5588330fff0b Mon Sep 17 00:00:00 2001 From: ZZX9599 <536509593@qq.com> Date: Tue, 2 Sep 2025 19:46:34 +0800 Subject: [PATCH] =?UTF-8?q?RTC=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rtc/rtc.py | 114 ++++++++++++++++++++++++++++++++++++++ service/device_service.py | 2 +- ws/ws.py | 4 +- 3 files changed, 117 insertions(+), 3 deletions(-) create mode 100644 rtc/rtc.py diff --git a/rtc/rtc.py b/rtc/rtc.py new file mode 100644 index 0000000..a6fadcc --- /dev/null +++ b/rtc/rtc.py @@ -0,0 +1,114 @@ +import asyncio +import aiohttp +from aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration +from aiortc.mediastreams import MediaStreamTrack + + +class VideoTrack(MediaStreamTrack): + kind = "video" + + def __init__(self, max_frames=1): + super().__init__() + self.frames = asyncio.Queue(maxsize=max_frames) + + async def recv(self): + return await super().recv() + + +async def rtc_frame_receiver(url, frame_queue): + """ + 对每帧进行检查、只要接收到 RTC 帧且队列为空、就往队列放入数据 + """ + pc = RTCPeerConnection(RTCConfiguration(iceServers=[])) + video_track = VideoTrack() + pc.addTrack(video_track) + + # 累计帧计数器 + total_frames = 0 + + @pc.on("track") + async def on_track(track): + nonlocal total_frames + if track.kind == "video": + print("接收到视频轨道、开始接收视频帧") + while True: + # 接收当前帧并累计计数 + frame = await track.recv() + frame_bgr24 = frame.to_ndarray(format='bgr24') + total_frames += 1 + + # 对每帧都检查队列状态、队列为空则放入 + if frame_queue.empty(): + # 队列为空、放入当前帧 + await frame_queue.put(frame_bgr24) + print(f"第{total_frames}帧:队列为空、已放入新帧") + else: + # 队列非空、说明上一帧还未处理、跳过当前帧 + print(f"第{total_frames}帧:队列非空、跳过该帧") + + # 创建并设置本地offer + offer = await pc.createOffer() + print("已创建本地 SDP Offer") + await pc.setLocalDescription(offer) + + # 发送offer到服务器 + async with aiohttp.ClientSession() as session: + print("开始向服务器发送 SDP Offer") + async with session.post( + url, + data=offer.sdp.encode(), + headers={ + "Content-Type": "application/sdp", + "Content-Length": str(len(offer.sdp)) + }, + ssl=False + ) as response: + print("已接收到服务器的响应、开始处理 SDP Answer") + answer_sdp = await response.text() + await pc.setRemoteDescription(RTCSessionDescription(sdp=answer_sdp, type='answer')) + + try: + # 保持连接 + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + print("用户中断") + finally: + print("开始关闭 RTCPeerConnection") + await pc.close() + print("已关闭 RTCPeerConnection") + + +async def frame_consumer(frame_queue): + """ + 从队列中读取帧并处理(队列空时会阻塞等待) + + Args: frame_queue: 帧队列 + """ + while True: + # 从队列中获取帧(队列为空时会阻塞等待新帧) + current_frame = await frame_queue.get() + print(f"从队列获取到帧、尺寸: {current_frame.shape}、进行处理") + + # 标记任务完成 + frame_queue.task_done() + print("帧处理完成、队列已清空") + + +async def main(): + # WebRTC服务器地址 + url = "http://192.168.110.25:1985/rtc/v1/whep/?app=live&stream=677a4845aa48cb8526c811ad56fc5e60" + + # 创建队列 + frame_queue = asyncio.Queue(maxsize=1) + + # 创建任务 + receiver_task = asyncio.create_task(rtc_frame_receiver(url, frame_queue)) + consumer_task = asyncio.create_task(frame_consumer(frame_queue)) + + # 等待任务完成 + await asyncio.gather(receiver_task, consumer_task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/service/device_service.py b/service/device_service.py index 6431193..25ce499 100644 --- a/service/device_service.py +++ b/service/device_service.py @@ -37,7 +37,7 @@ async def create_device(request: Request, device_data: DeviceCreateRequest): cursor.execute("SELECT id FROM devices WHERE client_ip = %s", (device_data.ip,)) existing_device = cursor.fetchone() if existing_device: - raise Exception(f"客户端IP {device_data.ip} 已存在,无法重复添加") + raise Exception(f"客户端IP {device_data.ip} 已存在、无法重复添加") # 获取RTMP URL rtmp_url = str(LIVE_CONFIG.get("rtmp_url", "")) diff --git a/ws/ws.py b/ws/ws.py index 738c4f3..7ff84fa 100644 --- a/ws/ws.py +++ b/ws/ws.py @@ -100,8 +100,8 @@ async def send_heartbeat_ack(client_ip: str, client_timestamp: Any) -> bool: print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] 回复心跳失败:客户端 {client_ip} 不在连接列表中") return False - # 修复:将这部分代码移出if语句块,确保始终定义ack_msg - # 服务端当前格式化时间戳(字符串类型,与日志时间格式匹配) + # 修复:将这部分代码移出if语句块、确保始终定义ack_msg + # 服务端当前格式化时间戳(字符串类型、与日志时间格式匹配) server_latest_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") ack_msg = { "timestamp": server_latest_timestamp,