2025-09-02 19:46:34 +08:00
|
|
|
|
import asyncio
|
|
|
|
|
import aiohttp
|
2025-09-02 20:14:40 +08:00
|
|
|
|
import cv2 # 导入OpenCV库
|
|
|
|
|
import numpy as np
|
2025-09-02 19:46:34 +08:00
|
|
|
|
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration
|
|
|
|
|
from aiortc.mediastreams import MediaStreamTrack
|
2025-09-02 21:30:28 +08:00
|
|
|
|
from ocr.ocr_violation_detector import OCRViolationDetector
|
2025-09-02 19:46:34 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoTrack(MediaStreamTrack):
|
|
|
|
|
kind = "video"
|
|
|
|
|
|
|
|
|
|
def __init__(self, max_frames=1):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.frames = asyncio.Queue(maxsize=max_frames)
|
|
|
|
|
|
|
|
|
|
async def recv(self):
|
|
|
|
|
return await super().recv()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def rtc_frame_receiver(url, frame_queue):
|
|
|
|
|
"""
|
2025-09-02 20:14:40 +08:00
|
|
|
|
对每帧进行检查、只要接收到 RTC 帧且队列为空、就往队列放入cv2格式的帧数据
|
2025-09-02 19:46:34 +08:00
|
|
|
|
"""
|
|
|
|
|
pc = RTCPeerConnection(RTCConfiguration(iceServers=[]))
|
|
|
|
|
video_track = VideoTrack()
|
|
|
|
|
pc.addTrack(video_track)
|
|
|
|
|
|
|
|
|
|
# 累计帧计数器
|
|
|
|
|
total_frames = 0
|
|
|
|
|
|
|
|
|
|
@pc.on("track")
|
|
|
|
|
async def on_track(track):
|
|
|
|
|
nonlocal total_frames
|
|
|
|
|
if track.kind == "video":
|
|
|
|
|
print("接收到视频轨道、开始接收视频帧")
|
|
|
|
|
while True:
|
|
|
|
|
# 接收当前帧并累计计数
|
|
|
|
|
frame = await track.recv()
|
2025-09-02 20:14:40 +08:00
|
|
|
|
# 转换为cv2兼容的BGR格式numpy数组
|
|
|
|
|
frame_cv2 = frame.to_ndarray(format='bgr24')
|
|
|
|
|
|
|
|
|
|
# 验证是否为cv2兼容格式
|
|
|
|
|
if isinstance(frame_cv2, np.ndarray) and frame_cv2.ndim == 3 and frame_cv2.shape[2] == 3:
|
|
|
|
|
total_frames += 1
|
|
|
|
|
|
|
|
|
|
# 对每帧都检查队列状态、队列为空则放入
|
|
|
|
|
if frame_queue.empty():
|
|
|
|
|
# 队列为空、放入当前cv2帧
|
|
|
|
|
await frame_queue.put(frame_cv2)
|
2025-09-02 21:30:28 +08:00
|
|
|
|
# print(f"第{total_frames}帧:队列为空、已放入新的cv2帧,尺寸: {frame_cv2.shape}")
|
2025-09-02 20:14:40 +08:00
|
|
|
|
else:
|
|
|
|
|
# 队列非空、说明上一帧还未处理、跳过当前帧
|
|
|
|
|
print(f"第{total_frames}帧:队列非空、跳过该帧")
|
2025-09-02 19:46:34 +08:00
|
|
|
|
else:
|
2025-09-02 20:14:40 +08:00
|
|
|
|
print("帧格式转换失败,不是有效的cv2格式")
|
2025-09-02 19:46:34 +08:00
|
|
|
|
|
|
|
|
|
# 创建并设置本地offer
|
|
|
|
|
offer = await pc.createOffer()
|
|
|
|
|
print("已创建本地 SDP Offer")
|
|
|
|
|
await pc.setLocalDescription(offer)
|
|
|
|
|
|
|
|
|
|
# 发送offer到服务器
|
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
|
print("开始向服务器发送 SDP Offer")
|
|
|
|
|
async with session.post(
|
|
|
|
|
url,
|
|
|
|
|
data=offer.sdp.encode(),
|
|
|
|
|
headers={
|
|
|
|
|
"Content-Type": "application/sdp",
|
|
|
|
|
"Content-Length": str(len(offer.sdp))
|
|
|
|
|
},
|
|
|
|
|
ssl=False
|
|
|
|
|
) as response:
|
|
|
|
|
print("已接收到服务器的响应、开始处理 SDP Answer")
|
|
|
|
|
answer_sdp = await response.text()
|
|
|
|
|
await pc.setRemoteDescription(RTCSessionDescription(sdp=answer_sdp, type='answer'))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# 保持连接
|
|
|
|
|
while True:
|
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
print("用户中断")
|
|
|
|
|
finally:
|
|
|
|
|
print("开始关闭 RTCPeerConnection")
|
|
|
|
|
await pc.close()
|
|
|
|
|
print("已关闭 RTCPeerConnection")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def frame_consumer(frame_queue):
|
|
|
|
|
"""
|
2025-09-02 20:14:40 +08:00
|
|
|
|
从队列中读取cv2帧并处理(队列空时会阻塞等待)
|
2025-09-02 19:46:34 +08:00
|
|
|
|
|
|
|
|
|
Args: frame_queue: 帧队列
|
|
|
|
|
"""
|
2025-09-02 21:30:28 +08:00
|
|
|
|
# 创建OCR检测器实例(请替换为实际的违禁词文件路径)
|
|
|
|
|
ocr_detector = OCRViolationDetector(
|
|
|
|
|
forbidden_words_path=r"D:\Git\bin\video\ocr\forbidden_words.txt", # 替换为实际路径
|
|
|
|
|
ocr_confidence_threshold=0.5,)
|
|
|
|
|
|
2025-09-02 19:46:34 +08:00
|
|
|
|
while True:
|
2025-09-02 20:14:40 +08:00
|
|
|
|
# 从队列中获取cv2帧(队列为空时会阻塞等待新帧)
|
2025-09-02 19:46:34 +08:00
|
|
|
|
current_frame = await frame_queue.get()
|
2025-09-02 19:54:06 +08:00
|
|
|
|
|
2025-09-02 21:30:28 +08:00
|
|
|
|
ocr_detector.detect(current_frame)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-09-02 20:14:40 +08:00
|
|
|
|
# 验证这是cv2可以处理的帧
|
2025-09-02 21:30:28 +08:00
|
|
|
|
# print(f"从队列获取到cv2帧、尺寸: {current_frame.shape}、数据类型: {current_frame.dtype}")
|
2025-09-02 19:54:06 +08:00
|
|
|
|
|
2025-09-02 20:14:40 +08:00
|
|
|
|
# 这里可以添加cv2的处理代码,例如显示帧
|
|
|
|
|
# cv2.imshow('Received Frame', current_frame)
|
|
|
|
|
# if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
|
|
|
# break
|
|
|
|
|
|
2025-09-02 21:30:28 +08:00
|
|
|
|
# print("cv2帧处理完成")
|
2025-09-02 19:46:34 +08:00
|
|
|
|
|
|
|
|
|
# 标记任务完成
|
|
|
|
|
frame_queue.task_done()
|
2025-09-02 21:30:28 +08:00
|
|
|
|
# print("帧处理完成、队列已清空")
|
2025-09-02 19:46:34 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
# WebRTC服务器地址
|
|
|
|
|
url = "http://192.168.110.25:1985/rtc/v1/whep/?app=live&stream=677a4845aa48cb8526c811ad56fc5e60"
|
|
|
|
|
|
|
|
|
|
# 创建队列
|
|
|
|
|
frame_queue = asyncio.Queue(maxsize=1)
|
|
|
|
|
|
|
|
|
|
# 创建任务
|
|
|
|
|
receiver_task = asyncio.create_task(rtc_frame_receiver(url, frame_queue))
|
|
|
|
|
consumer_task = asyncio.create_task(frame_consumer(frame_queue))
|
|
|
|
|
|
|
|
|
|
# 等待任务完成
|
|
|
|
|
await asyncio.gather(receiver_task, consumer_task)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2025-09-02 20:14:40 +08:00
|
|
|
|
try:
|
|
|
|
|
asyncio.run(main())
|
|
|
|
|
finally:
|
|
|
|
|
# 确保关闭所有cv2窗口
|
|
|
|
|
cv2.destroyAllWindows()
|