import asyncio import aiohttp from aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration from aiortc.mediastreams import MediaStreamTrack class VideoTrack(MediaStreamTrack): kind = "video" def __init__(self, max_frames=1): super().__init__() self.frames = asyncio.Queue(maxsize=max_frames) async def recv(self): return await super().recv() async def rtc_frame_receiver(url, frame_queue): """ 对每帧进行检查、只要接收到 RTC 帧且队列为空、就往队列放入数据 """ pc = RTCPeerConnection(RTCConfiguration(iceServers=[])) video_track = VideoTrack() pc.addTrack(video_track) # 累计帧计数器 total_frames = 0 @pc.on("track") async def on_track(track): nonlocal total_frames if track.kind == "video": print("接收到视频轨道、开始接收视频帧") while True: # 接收当前帧并累计计数 frame = await track.recv() frame_bgr24 = frame.to_ndarray(format='bgr24') total_frames += 1 # 对每帧都检查队列状态、队列为空则放入 if frame_queue.empty(): # 队列为空、放入当前帧 await frame_queue.put(frame_bgr24) print(f"第{total_frames}帧:队列为空、已放入新帧") else: # 队列非空、说明上一帧还未处理、跳过当前帧 print(f"第{total_frames}帧:队列非空、跳过该帧") # 创建并设置本地offer offer = await pc.createOffer() print("已创建本地 SDP Offer") await pc.setLocalDescription(offer) # 发送offer到服务器 async with aiohttp.ClientSession() as session: print("开始向服务器发送 SDP Offer") async with session.post( url, data=offer.sdp.encode(), headers={ "Content-Type": "application/sdp", "Content-Length": str(len(offer.sdp)) }, ssl=False ) as response: print("已接收到服务器的响应、开始处理 SDP Answer") answer_sdp = await response.text() await pc.setRemoteDescription(RTCSessionDescription(sdp=answer_sdp, type='answer')) try: # 保持连接 while True: await asyncio.sleep(1) except KeyboardInterrupt: print("用户中断") finally: print("开始关闭 RTCPeerConnection") await pc.close() print("已关闭 RTCPeerConnection") async def frame_consumer(frame_queue): """ 从队列中读取帧并处理(队列空时会阻塞等待) Args: frame_queue: 帧队列 """ while True: # 从队列中获取帧(队列为空时会阻塞等待新帧) current_frame = await frame_queue.get() print(f"从队列获取到帧、尺寸: {current_frame.shape}、进行处理") # 标记任务完成 frame_queue.task_done() print("帧处理完成、队列已清空") async def main(): # WebRTC服务器地址 url = "http://192.168.110.25:1985/rtc/v1/whep/?app=live&stream=677a4845aa48cb8526c811ad56fc5e60" # 创建队列 frame_queue = asyncio.Queue(maxsize=1) # 创建任务 receiver_task = asyncio.create_task(rtc_frame_receiver(url, frame_queue)) consumer_task = asyncio.create_task(frame_consumer(frame_queue)) # 等待任务完成 await asyncio.gather(receiver_task, consumer_task) if __name__ == "__main__": asyncio.run(main())