import asyncio import logging import cv2 import time # 配置日志(与WHEP代码保持一致的日志风格) logging.basicConfig(level=logging.INFO) logger = logging.getLogger("rtmp_video_puller") async def rtmp_pull_video_stream(rtmp_url): """ 通过RTMP从指定URL拉取视频流并在收到每一帧时打印消息 功能与WHEP拉流函数对齐:流状态反馈、帧信息打印、帧率统计、异常处理 Args: rtmp_url: RTMP流的URL地址(如 rtmp://xxx/live/stream_key) """ cap = None # 初始化视频捕获对象 try: # 1. 异步打开RTMP流(指定FFmpeg后端确保RTMP兼容性,同步操作通过to_thread避免阻塞事件循环) cap = await asyncio.to_thread( cv2.VideoCapture, rtmp_url, cv2.CAP_FFMPEG # 必须指定FFmpeg后端,RTMP协议依赖该后端解析 ) # 2. 检查RTMP流是否成功打开 is_opened = await asyncio.to_thread(cap.isOpened) if not is_opened: raise Exception(f"RTMP流打开失败: {rtmp_url}(请检查URL有效性和FFmpeg环境)") # 3. 异步获取RTMP流基础信息(分辨率、帧率) width = await asyncio.to_thread(cap.get, cv2.CAP_PROP_FRAME_WIDTH) height = await asyncio.to_thread(cap.get, cv2.CAP_PROP_FRAME_HEIGHT) fps = await asyncio.to_thread(cap.get, cv2.CAP_PROP_FPS) # 处理异常情况:部分RTMP流未返回帧率时默认30FPS fps = fps if fps > 0 else 30.0 # 分辨率转为整数(视频尺寸必然是整数) width, height = int(width), int(height) # 打印流初始化成功信息(与WHEP连接成功信息风格一致) print(f"RTMP流状态: 已成功连接") print(f"流基础信息: 分辨率 {width}x{height} | 配置帧率 {fps:.2f} FPS") print("开始接收视频帧...(按 Ctrl+C 中断)") # 4. 初始化帧统计参数 frame_count = 0 # 总接收帧数 start_time = time.time() # 统计起始时间 # 5. 循环异步读取视频帧(核心逻辑) while True: # 异步读取一帧(cv2.read是同步操作,用to_thread适配异步环境) ret, frame = await asyncio.to_thread(cap.read) # 检查帧是否读取成功(流中断/结束时ret为False) if not ret: print(f"RTMP流状态: 帧读取失败(可能流已中断或结束)") break # 帧计数累加 frame_count += 1 # 6. 打印当前帧基础信息(与WHEP帧信息打印风格对齐) print(f"收到帧 (第{frame_count}帧)") print(f" 帧尺寸: {width}x{height}") print(f" 配置帧率: {fps:.2f} FPS") # 7. 每100帧统计一次实际接收帧率(补充性能监控,与原RTMP示例逻辑一致) if frame_count % 100 == 0: elapsed_time = time.time() - start_time actual_fps = frame_count / elapsed_time # 实际接收帧率(可能低于配置帧率) print(f"---- 帧统计: 累计{frame_count}帧 | 实际平均帧率 {actual_fps:.2f} FPS ----") # (可选)帧数据处理入口:如需处理帧(如推流、分析),可在此处添加逻辑 # 示例:yield frame (若需生成器模式,可调整函数为异步生成器) # 8. 异常处理(覆盖用户中断、通用错误) except KeyboardInterrupt: print(f"\n用户操作: 已通过 Ctrl+C 中断程序") except Exception as e: # 日志记录详细错误(便于问题排查),同时打印用户可见信息 logger.error(f"RTMP流处理异常: {str(e)}", exc_info=True) print(f"错误信息: {str(e)}") finally: # 9. 资源释放(无论成功/失败都确保释放,避免内存泄漏) if cap is not None: await asyncio.to_thread(cap.release) print(f"\n资源释放: RTMP流已关闭") print(f"最终统计: 共接收 {frame_count if 'frame_count' in locals() else 0} 帧") if __name__ == "__main__": RTMP_URL = "rtmp://192.168.110.25:1935/live/473b95a47e338301cbd96809ea7ac416" # 运行RTMP拉流任务(与WHEP一致的异步执行方式) try: asyncio.run(rtmp_pull_video_stream(RTMP_URL)) except Exception as e: print(f"程序启动失败: {str(e)}")