diff --git a/core/rtc.py b/core/rtc.py new file mode 100644 index 0000000..1f5fc6f --- /dev/null +++ b/core/rtc.py @@ -0,0 +1,117 @@ +import asyncio +import logging +from aiortc import RTCPeerConnection, RTCSessionDescription +import aiohttp + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("whep_video_puller") + + +async def whep_pull_video_stream(whep_url): + """ + 通过WHEP从指定URL拉取视频流并在收到每一帧时打印消息 + + Args: + whep_url: WHEP端点的URL + """ + pc = RTCPeerConnection() + + # 添加连接状态变化监听 + @pc.on("connectionstatechange") + async def on_connectionstatechange(): + print(f"连接状态: {pc.connectionState}") + + # 添加ICE连接状态变化监听 + @pc.on("iceconnectionstatechange") + async def on_iceconnectionstatechange(): + print(f"ICE连接状态: {pc.iceConnectionState}") + + # 添加视频接收器 + pc.addTransceiver("video", direction="recvonly") + + # 处理接收到的视频轨道 + @pc.on("track") + def on_track(track): + print(f"接收到轨道: {track.kind}") + if track.kind == "video": + print(f"轨道ID: {track.id}") + print(f"轨道就绪状态: {track.readyState}") + # 创建异步任务来处理视频帧 + asyncio.ensure_future(handle_video_track(track)) + + async def handle_video_track(track): + """处理视频轨道,接收并打印每一帧""" + frame_count = 0 + print("开始处理视频轨道...") + + while True: + try: + # 尝试接收帧 + frame = await track.recv() + frame_count += 1 + print(f"收到原始帧 (第{frame_count}帧)") + + # 打印帧的基本信息 + if hasattr(frame, 'width') and hasattr(frame, 'height'): + print(f" 尺寸: {frame.width}x{frame.height}") + if hasattr(frame, 'time_base'): + print(f" 时间基准: {frame.time_base}") + if hasattr(frame, 'pts'): + print(f" 显示时间戳: {frame.pts}") + + except Exception as e: + print(f"接收帧时出错: {e}") + # 等待一段时间后重试 + await asyncio.sleep(0.1) + continue + + # 创建offer + offer = await pc.createOffer() + await pc.setLocalDescription(offer) + + print(f"本地SDP信息:\n{offer.sdp}") + + # 通过HTTP POST发送offer到WHEP端点 + async with aiohttp.ClientSession() as session: + async with session.post( + whep_url, + data=offer.sdp, + headers={"Content-Type": "application/sdp"} + ) as response: + if response.status != 201: + print(f"WHEP服务器返回错误: {response.status}") + print(f"响应内容: {await response.text()}") + raise Exception(f"WHEP服务器返回错误: {response.status}") + + # 获取answer SDP + answer_sdp = await response.text() + + # 创建RTCSessionDescription对象 + answer = RTCSessionDescription(sdp=answer_sdp, type="answer") + + print(f"收到远程SDP:\n{answer_sdp}") + + # 设置远程描述 + await pc.setRemoteDescription(answer) + + print("连接已建立,开始接收视频流...") + + # 保持连接,直到用户中断 + try: + while True: + await asyncio.sleep(1) + # 检查连接状态 + print(f"当前连接状态: {pc.connectionState}") + except KeyboardInterrupt: + print("用户中断,关闭连接...") + finally: + await pc.close() + + +if __name__ == "__main__": + # 替换为你的WHEP端点URL + WHEP_URL = "http://192.168.110.25:1985/rtc/v1/whep/?app=live&stream=473b95a47e338301cbd96809ea7ac416" + + # 运行拉流任务 + asyncio.run(whep_pull_video_stream(WHEP_URL)) diff --git a/core/rtmp.py b/core/rtmp.py new file mode 100644 index 0000000..c200c04 --- /dev/null +++ b/core/rtmp.py @@ -0,0 +1,101 @@ +import asyncio +import logging +import cv2 +import time + +# 配置日志(与WHEP代码保持一致的日志风格) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("rtmp_video_puller") + + +async def rtmp_pull_video_stream(rtmp_url): + """ + 通过RTMP从指定URL拉取视频流并在收到每一帧时打印消息 + 功能与WHEP拉流函数对齐:流状态反馈、帧信息打印、帧率统计、异常处理 + + Args: + rtmp_url: RTMP流的URL地址(如 rtmp://xxx/live/stream_key) + """ + cap = None # 初始化视频捕获对象 + try: + # 1. 异步打开RTMP流(指定FFmpeg后端确保RTMP兼容性,同步操作通过to_thread避免阻塞事件循环) + cap = await asyncio.to_thread( + cv2.VideoCapture, + rtmp_url, + cv2.CAP_FFMPEG # 必须指定FFmpeg后端,RTMP协议依赖该后端解析 + ) + + # 2. 检查RTMP流是否成功打开 + is_opened = await asyncio.to_thread(cap.isOpened) + if not is_opened: + raise Exception(f"RTMP流打开失败: {rtmp_url}(请检查URL有效性和FFmpeg环境)") + + # 3. 异步获取RTMP流基础信息(分辨率、帧率) + width = await asyncio.to_thread(cap.get, cv2.CAP_PROP_FRAME_WIDTH) + height = await asyncio.to_thread(cap.get, cv2.CAP_PROP_FRAME_HEIGHT) + fps = await asyncio.to_thread(cap.get, cv2.CAP_PROP_FPS) + + # 处理异常情况:部分RTMP流未返回帧率时默认30FPS + fps = fps if fps > 0 else 30.0 + # 分辨率转为整数(视频尺寸必然是整数) + width, height = int(width), int(height) + + # 打印流初始化成功信息(与WHEP连接成功信息风格一致) + print(f"RTMP流状态: 已成功连接") + print(f"流基础信息: 分辨率 {width}x{height} | 配置帧率 {fps:.2f} FPS") + print("开始接收视频帧...(按 Ctrl+C 中断)") + + # 4. 初始化帧统计参数 + frame_count = 0 # 总接收帧数 + start_time = time.time() # 统计起始时间 + + # 5. 循环异步读取视频帧(核心逻辑) + while True: + # 异步读取一帧(cv2.read是同步操作,用to_thread适配异步环境) + ret, frame = await asyncio.to_thread(cap.read) + + # 检查帧是否读取成功(流中断/结束时ret为False) + if not ret: + print(f"RTMP流状态: 帧读取失败(可能流已中断或结束)") + break + + # 帧计数累加 + frame_count += 1 + + # 6. 打印当前帧基础信息(与WHEP帧信息打印风格对齐) + print(f"收到帧 (第{frame_count}帧)") + print(f" 帧尺寸: {width}x{height}") + print(f" 配置帧率: {fps:.2f} FPS") + + # 7. 每100帧统计一次实际接收帧率(补充性能监控,与原RTMP示例逻辑一致) + if frame_count % 100 == 0: + elapsed_time = time.time() - start_time + actual_fps = frame_count / elapsed_time # 实际接收帧率(可能低于配置帧率) + print(f"---- 帧统计: 累计{frame_count}帧 | 实际平均帧率 {actual_fps:.2f} FPS ----") + + # (可选)帧数据处理入口:如需处理帧(如推流、分析),可在此处添加逻辑 + # 示例:yield frame (若需生成器模式,可调整函数为异步生成器) + + # 8. 异常处理(覆盖用户中断、通用错误) + except KeyboardInterrupt: + print(f"\n用户操作: 已通过 Ctrl+C 中断程序") + except Exception as e: + # 日志记录详细错误(便于问题排查),同时打印用户可见信息 + logger.error(f"RTMP流处理异常: {str(e)}", exc_info=True) + print(f"错误信息: {str(e)}") + finally: + # 9. 资源释放(无论成功/失败都确保释放,避免内存泄漏) + if cap is not None: + await asyncio.to_thread(cap.release) + print(f"\n资源释放: RTMP流已关闭") + print(f"最终统计: 共接收 {frame_count if 'frame_count' in locals() else 0} 帧") + + +if __name__ == "__main__": + RTMP_URL = "rtmp://192.168.110.25:1935/live/473b95a47e338301cbd96809ea7ac416" + + # 运行RTMP拉流任务(与WHEP一致的异步执行方式) + try: + asyncio.run(rtmp_pull_video_stream(RTMP_URL)) + except Exception as e: + print(f"程序启动失败: {str(e)}") \ No newline at end of file diff --git a/rtc/rtc.py b/rtc/rtc.py index 2e7563b..1f5fc6f 100644 --- a/rtc/rtc.py +++ b/rtc/rtc.py @@ -1,155 +1,117 @@ -import queue import asyncio +import logging +from aiortc import RTCPeerConnection, RTCSessionDescription import aiohttp -import threading -import time -from aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration -from aiortc.mediastreams import MediaStreamTrack -# 创建一个长度为1的队列,用于生产者和消费者之间的通信 -frame_queue = queue.Queue(maxsize=1) +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("whep_video_puller") -class VideoTrack(MediaStreamTrack): - """自定义视频轨道类,继承自MediaStreamTrack""" - kind = "video" - - def __init__(self, max_frames=100): - super().__init__() - self.frames = queue.Queue(maxsize=max_frames) - - async def recv(self): - return await super().recv() - - -def webrtc_producer(webrtc_url): +async def whep_pull_video_stream(whep_url): """ - 生产者方法:从WEBRTC读取视频帧并放入队列 - 仅当队列空时才放入新帧,否则丢弃 + 通过WHEP从指定URL拉取视频流并在收到每一帧时打印消息 + + Args: + whep_url: WHEP端点的URL """ - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + pc = RTCPeerConnection() - # 创建RTCPeerConnection对象,不使用ICE服务器 - pc = RTCPeerConnection(RTCConfiguration(iceServers=[])) - video_track = VideoTrack() - pc.addTrack(video_track) + # 添加连接状态变化监听 + @pc.on("connectionstatechange") + async def on_connectionstatechange(): + print(f"连接状态: {pc.connectionState}") + # 添加ICE连接状态变化监听 + @pc.on("iceconnectionstatechange") + async def on_iceconnectionstatechange(): + print(f"ICE连接状态: {pc.iceConnectionState}") + + # 添加视频接收器 + pc.addTransceiver("video", direction="recvonly") + + # 处理接收到的视频轨道 @pc.on("track") - async def on_track(track): + def on_track(track): + print(f"接收到轨道: {track.kind}") if track.kind == "video": - print("接收到视频轨道,开始接收视频帧") - while True: - # 从轨道接收视频帧 + print(f"轨道ID: {track.id}") + print(f"轨道就绪状态: {track.readyState}") + # 创建异步任务来处理视频帧 + asyncio.ensure_future(handle_video_track(track)) + + async def handle_video_track(track): + """处理视频轨道,接收并打印每一帧""" + frame_count = 0 + print("开始处理视频轨道...") + + while True: + try: + # 尝试接收帧 frame = await track.recv() - # 转换为BGR24格式的NumPy数组 - frame_bgr24 = frame.to_ndarray(format='bgr24') + frame_count += 1 + print(f"收到原始帧 (第{frame_count}帧)") - # 检查队列是否为空,为空则加入,否则丢弃 - if frame_queue.empty(): - try: - frame_queue.put_nowait(frame_bgr24) - print("帧已放入队列") - except queue.Full: - print("队列已满,丢弃帧") - else: - print("队列非空,丢弃帧") + # 打印帧的基本信息 + if hasattr(frame, 'width') and hasattr(frame, 'height'): + print(f" 尺寸: {frame.width}x{frame.height}") + if hasattr(frame, 'time_base'): + print(f" 时间基准: {frame.time_base}") + if hasattr(frame, 'pts'): + print(f" 显示时间戳: {frame.pts}") - async def main(): - # 创建并发送SDP Offer - offer = await pc.createOffer() - print("已创建本地SDP Offer") - await pc.setLocalDescription(offer) - - # 发送Offer到服务器并接收Answer - async with aiohttp.ClientSession() as session: - print(f"开始向服务器 {webrtc_url} 发送SDP Offer") - async with session.post( - webrtc_url, - data=offer.sdp.encode(), - headers={ - "Content-Type": "application/sdp", - "Content-Length": str(len(offer.sdp)) - }, - ssl=False - ) as response: - print("已接收到服务器的响应") - answer_sdp = await response.text() - await pc.setRemoteDescription(RTCSessionDescription(sdp=answer_sdp, type='answer')) - - # 保持连接 - try: - while True: + except Exception as e: + print(f"接收帧时出错: {e}") + # 等待一段时间后重试 await asyncio.sleep(0.1) - except KeyboardInterrupt: - pass - finally: - print("关闭RTCPeerConnection") - await pc.close() + continue + # 创建offer + offer = await pc.createOffer() + await pc.setLocalDescription(offer) + + print(f"本地SDP信息:\n{offer.sdp}") + + # 通过HTTP POST发送offer到WHEP端点 + async with aiohttp.ClientSession() as session: + async with session.post( + whep_url, + data=offer.sdp, + headers={"Content-Type": "application/sdp"} + ) as response: + if response.status != 201: + print(f"WHEP服务器返回错误: {response.status}") + print(f"响应内容: {await response.text()}") + raise Exception(f"WHEP服务器返回错误: {response.status}") + + # 获取answer SDP + answer_sdp = await response.text() + + # 创建RTCSessionDescription对象 + answer = RTCSessionDescription(sdp=answer_sdp, type="answer") + + print(f"收到远程SDP:\n{answer_sdp}") + + # 设置远程描述 + await pc.setRemoteDescription(answer) + + print("连接已建立,开始接收视频流...") + + # 保持连接,直到用户中断 try: - loop.run_until_complete(main()) + while True: + await asyncio.sleep(1) + # 检查连接状态 + print(f"当前连接状态: {pc.connectionState}") + except KeyboardInterrupt: + print("用户中断,关闭连接...") finally: - loop.close() - - -def frame_consumer(): - """ - 消费者方法:从队列中读取帧并处理 - 每次处理后休眠200ms模拟延迟 - """ - print("消费者启动,开始等待帧...") - try: - while True: - # 阻塞等待队列中的帧 - frame = frame_queue.get() - print(f"消费帧,大小: {frame.shape}") - - # 模拟处理延迟 - time.sleep(0.2) # 200ms - - # 标记任务完成 - frame_queue.task_done() - except KeyboardInterrupt: - print("消费者退出") - - -def start_webrtc_stream(webrtc_url): - """ - 启动WebRTC视频流处理的主方法 - 参数: webrtc_url - WebRTC服务器地址 - """ - print(f"开始连接到WebRTC服务器: {webrtc_url}") - - # 启动生产者线程 - producer_thread = threading.Thread( - target=webrtc_producer, - args=(webrtc_url,), - daemon=True, - name="webrtc-producer" - ) - - # 启动消费者线程 - consumer_thread = threading.Thread( - target=frame_consumer, - daemon=True, - name="frame-consumer" - ) - - producer_thread.start() - consumer_thread.start() - print("生产者和消费者线程已启动") - - try: - # 保持主线程运行 - while True: - time.sleep(1) - except KeyboardInterrupt: - print("程序正在退出...") + await pc.close() if __name__ == "__main__": - # 示例用法 - # 实际使用时替换为真实的WebRTC服务器地址 - webrtc_server_url = "http://192.168.110.65:1985/rtc/v1/whep/?app=live&stream=677a4845aa48cb8526c811ad56fc5e60" - start_webrtc_stream(webrtc_server_url) + # 替换为你的WHEP端点URL + WHEP_URL = "http://192.168.110.25:1985/rtc/v1/whep/?app=live&stream=473b95a47e338301cbd96809ea7ac416" + + # 运行拉流任务 + asyncio.run(whep_pull_video_stream(WHEP_URL)) diff --git a/rtmp/rtmp.py b/rtmp/rtmp.py new file mode 100644 index 0000000..c200c04 --- /dev/null +++ b/rtmp/rtmp.py @@ -0,0 +1,101 @@ +import asyncio +import logging +import cv2 +import time + +# 配置日志(与WHEP代码保持一致的日志风格) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("rtmp_video_puller") + + +async def rtmp_pull_video_stream(rtmp_url): + """ + 通过RTMP从指定URL拉取视频流并在收到每一帧时打印消息 + 功能与WHEP拉流函数对齐:流状态反馈、帧信息打印、帧率统计、异常处理 + + Args: + rtmp_url: RTMP流的URL地址(如 rtmp://xxx/live/stream_key) + """ + cap = None # 初始化视频捕获对象 + try: + # 1. 异步打开RTMP流(指定FFmpeg后端确保RTMP兼容性,同步操作通过to_thread避免阻塞事件循环) + cap = await asyncio.to_thread( + cv2.VideoCapture, + rtmp_url, + cv2.CAP_FFMPEG # 必须指定FFmpeg后端,RTMP协议依赖该后端解析 + ) + + # 2. 检查RTMP流是否成功打开 + is_opened = await asyncio.to_thread(cap.isOpened) + if not is_opened: + raise Exception(f"RTMP流打开失败: {rtmp_url}(请检查URL有效性和FFmpeg环境)") + + # 3. 异步获取RTMP流基础信息(分辨率、帧率) + width = await asyncio.to_thread(cap.get, cv2.CAP_PROP_FRAME_WIDTH) + height = await asyncio.to_thread(cap.get, cv2.CAP_PROP_FRAME_HEIGHT) + fps = await asyncio.to_thread(cap.get, cv2.CAP_PROP_FPS) + + # 处理异常情况:部分RTMP流未返回帧率时默认30FPS + fps = fps if fps > 0 else 30.0 + # 分辨率转为整数(视频尺寸必然是整数) + width, height = int(width), int(height) + + # 打印流初始化成功信息(与WHEP连接成功信息风格一致) + print(f"RTMP流状态: 已成功连接") + print(f"流基础信息: 分辨率 {width}x{height} | 配置帧率 {fps:.2f} FPS") + print("开始接收视频帧...(按 Ctrl+C 中断)") + + # 4. 初始化帧统计参数 + frame_count = 0 # 总接收帧数 + start_time = time.time() # 统计起始时间 + + # 5. 循环异步读取视频帧(核心逻辑) + while True: + # 异步读取一帧(cv2.read是同步操作,用to_thread适配异步环境) + ret, frame = await asyncio.to_thread(cap.read) + + # 检查帧是否读取成功(流中断/结束时ret为False) + if not ret: + print(f"RTMP流状态: 帧读取失败(可能流已中断或结束)") + break + + # 帧计数累加 + frame_count += 1 + + # 6. 打印当前帧基础信息(与WHEP帧信息打印风格对齐) + print(f"收到帧 (第{frame_count}帧)") + print(f" 帧尺寸: {width}x{height}") + print(f" 配置帧率: {fps:.2f} FPS") + + # 7. 每100帧统计一次实际接收帧率(补充性能监控,与原RTMP示例逻辑一致) + if frame_count % 100 == 0: + elapsed_time = time.time() - start_time + actual_fps = frame_count / elapsed_time # 实际接收帧率(可能低于配置帧率) + print(f"---- 帧统计: 累计{frame_count}帧 | 实际平均帧率 {actual_fps:.2f} FPS ----") + + # (可选)帧数据处理入口:如需处理帧(如推流、分析),可在此处添加逻辑 + # 示例:yield frame (若需生成器模式,可调整函数为异步生成器) + + # 8. 异常处理(覆盖用户中断、通用错误) + except KeyboardInterrupt: + print(f"\n用户操作: 已通过 Ctrl+C 中断程序") + except Exception as e: + # 日志记录详细错误(便于问题排查),同时打印用户可见信息 + logger.error(f"RTMP流处理异常: {str(e)}", exc_info=True) + print(f"错误信息: {str(e)}") + finally: + # 9. 资源释放(无论成功/失败都确保释放,避免内存泄漏) + if cap is not None: + await asyncio.to_thread(cap.release) + print(f"\n资源释放: RTMP流已关闭") + print(f"最终统计: 共接收 {frame_count if 'frame_count' in locals() else 0} 帧") + + +if __name__ == "__main__": + RTMP_URL = "rtmp://192.168.110.25:1935/live/473b95a47e338301cbd96809ea7ac416" + + # 运行RTMP拉流任务(与WHEP一致的异步执行方式) + try: + asyncio.run(rtmp_pull_video_stream(RTMP_URL)) + except Exception as e: + print(f"程序启动失败: {str(e)}") \ No newline at end of file