import asyncio import queue from fractions import Fraction from urllib.parse import urlparse import aiohttp import av import numpy as np from aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration, VideoStreamTrack class DummyVideoTrack(VideoStreamTrack): async def recv(self): # 简洁初始化、返回固定颜色的帧 return np.full((480, 640, 3), (0, 0, 255), dtype=np.uint8) async def receive_video_frames(whep_url): pc = RTCPeerConnection(RTCConfiguration(iceServers=[])) frames_queue = asyncio.Queue() pc.addTrack(DummyVideoTrack()) @pc.on("track") def on_track(track): if track.kind == "video": asyncio.create_task(consume_track(track, frames_queue)) @pc.on("iceconnectionstatechange") def on_ice_connection_state_change(): print(f"ICE 连接状态: {pc.iceConnectionState}") offer = await pc.createOffer() await pc.setLocalDescription(offer) headers = {"Content-Type": "application/sdp"} async with aiohttp.ClientSession() as session: async with session.post(whep_url, data=pc.localDescription.sdp, headers=headers) as response: if response.status != 201: raise Exception(f"服务器返回错误: {response.status}") answer = RTCSessionDescription(sdp=await response.text(), type="answer") await pc.setRemoteDescription(answer) if "Location" in response.headers: base_url = f"{urlparse(whep_url).scheme}://{urlparse(whep_url).netloc}" print("ICE 协商 URL:", base_url + response.headers["Location"]) while pc.iceConnectionState not in ["connected", "completed"]: await asyncio.sleep(1) print("ICE 连接完成,开始接收视频流") try: while True: frame = await frames_queue.get() if frame is None: break yield frame except KeyboardInterrupt: pass finally: await pc.close() async def consume_track(track, frames_queue): try: while True: frame = await track.recv() if frame is None: print("没有接收到有效的帧数据") await frames_queue.put(None) break img = frame.to_ndarray(format="bgr24") await frames_queue.put(img) except Exception as e: print("处理帧错误:", e) await frames_queue.put(None) def rtc_frame(url, frame_queue): async def main(): async for frame in receive_video_frames(url): try: frame_queue.put_nowait(frame) except queue.Full: frame_queue.get_nowait() frame_queue.put_nowait(frame) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) loop.close()