import base64 import requests as http_client import queue import time # 确保 time 被导入,如果之前被误删 import cv2 import traceback import threading # 确保导入 threading import av # 重新导入 av import numpy as np # from fastapi import requests # 从 rfdetr_core 导入 RFDETRDetector 仅用于类型提示 (可选) from rfdetr_core import RFDETRDetector # 目标检测处理函数 # 函数签名已更改:现在接受一个 detector_instance 作为参数 def yolo_frame(rtc_q: queue.Queue, yolo_q: queue.Queue, stream_detector_instance: RFDETRDetector): thread_name = threading.current_thread().name # 获取线程名称用于日志 print(f"处理线程 '{thread_name}' 已启动。") error_message_displayed_once = False no_detector_message_displayed_once = False # 用于只提示一次没有检测器 if stream_detector_instance is None: print(f"错误 (线程 '{thread_name}'): 未提供有效的检测器实例给yolo_frame。线程将无法处理视频。") # 此线程实际上无法做任何有用的工作,可以考虑直接退出或进入一个安全循环 # 为简单起见,我们允许它进入主循环,但它会在每次迭代时打印警告 while True: frame = None # current_category_counts = {} # 将在获取后转换 try: # 恢复队列长度打印 print(f"线程 '{thread_name}' - 原始队列长度: {rtc_q.qsize()}, 检测队列长度: {yolo_q.qsize()}") frame = rtc_q.get(timeout=0.1) if frame is None: print(f"处理线程 '{thread_name}' 接收到停止信号,正在退出...") # 发送包含None frame和空计数的字典作为停止信号 yolo_q.put({"frame": None, "category_counts": {}}) break category_counts_for_packet = {} if stream_detector_instance: no_detector_message_displayed_once = False # 检测器有效,重置提示 annotated_frame = stream_detector_instance.detect_and_draw_count(frame) error_message_displayed_once = False # 获取英文键的类别计数 english_counts = stream_detector_instance.category_counts.copy() if hasattr(stream_detector_instance, 'category_counts') else {} # 转换为中文键的类别计数 if hasattr(stream_detector_instance, 'VISDRONE_CLASSES_CHINESE'): chinese_map = stream_detector_instance.VISDRONE_CLASSES_CHINESE for eng_key, count_val in english_counts.items(): # 使用 get 提供一个默认值,以防某个英文类别在中文映射表中确实没有 chi_key = chinese_map.get(eng_key, eng_key) category_counts_for_packet[chi_key] = count_val else: # 如果没有中文映射表,则直接使用英文计数 (或记录警告) category_counts_for_packet = english_counts # logger.warning(f"线程 '{thread_name}': stream_detector_instance 没有 VISDRONE_CLASSES_CHINESE 属性,将使用英文类别计数。") else: # 如果没有有效的检测器实例传递进来 if not no_detector_message_displayed_once: print(f"警告 (线程 '{thread_name}'): 无有效检测器实例。将在帧上绘制提示。") no_detector_message_displayed_once = True annotated_frame = frame.copy() cv2.putText(annotated_frame, "No detector instance provided for this stream", (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) category_counts_for_packet = {} # 无检测器,计数为空 # 将帧和类别计数一起放入队列 data_packet = {"frame": annotated_frame, "category_counts": category_counts_for_packet} try: yolo_q.put_nowait(data_packet) except queue.Full: # print(f"警告 (线程 '{thread_name}'): yolo_q 已满,丢弃帧。") # 避免刷屏 pass except queue.Empty: time.sleep(0.01) continue except Exception as e: if not error_message_displayed_once: print(f"线程 '{thread_name}' (yolo_frame) 处理时发生严重错误: {e}") traceback.print_exc() error_message_displayed_once = True time.sleep(1) if frame is not None: try: pass except queue.Full: pass continue print(f"处理线程 '{thread_name}' 已停止。") def push_frame(yolo_q: queue.Queue, rtmp_url: str, gateway: str, frequency: int, push_url: str): thread_name = threading.current_thread().name print(f"推流线程 '{thread_name}' (RTMP: {rtmp_url}) 已启动。") output_container = None stream = None first_frame_processed = False last_push_time = 0 # 记录上次推送base64的时间 try: while True: frame_to_push = None received_category_counts = {} # 初始化为空字典 try: data_packet = yolo_q.get(timeout=0.1) if data_packet: frame_to_push = data_packet.get("frame") received_category_counts = data_packet.get("category_counts", {}) else: # data_packet is None (不太可能,除非队列明确放入None) time.sleep(0.01) continue except queue.Empty: time.sleep(0.01) continue if frame_to_push is None: # 这是通过 data_packet["frame"] is None 来判断的 print(f"推流线程 '{thread_name}' 接收到停止信号,正在清理并退出...") break if not first_frame_processed: if frame_to_push is not None: try: height, width, _ = frame_to_push.shape print(f"线程 '{thread_name}': 首帧尺寸 {width}x{height},正在初始化RTMP推流器到 {rtmp_url}") output_container = av.open(rtmp_url, 'w', format='flv') stream = output_container.add_stream('libx264', rate=25) stream.pix_fmt = 'yuv420p' stream.width = width stream.height = height stream.options = {'preset': 'ultrafast', 'tune': 'zerolatency', 'crf': '25'} print(f"线程 '{thread_name}': RTMP推流器初始化成功。") first_frame_processed = True except Exception as e_init: print(f"错误 (线程 '{thread_name}'): 初始化PyAV推流容器/流失败: {e_init}") traceback.print_exc() return else: continue if not output_container or not stream: print(f"错误 (线程 '{thread_name}'): 推流器未初始化,无法推流。可能是首帧处理失败。") time.sleep(1) continue # 持续推流到RTMP try: video_frame = av.VideoFrame.from_ndarray(frame_to_push, format='bgr24') for packet in stream.encode(video_frame): output_container.mux(packet) except Exception as e_push: print(f"错误 (线程 '{thread_name}'): 推送帧到RTMP时发生错误: {e_push}") time.sleep(0.5) # 定时推送base64帧 current_time = time.time() if current_time - last_push_time >= frequency: # 将接收到的类别计数传递给 push_base64_frame push_base64_frame(frame_to_push, gateway, push_url, thread_name, received_category_counts) last_push_time = current_time except Exception as e_outer: print(f"推流线程 '{thread_name}' 发生严重外部错误: {e_outer}") traceback.print_exc() finally: print(f"推流线程 '{thread_name}': 进入finally块,准备关闭推流器。") if stream and output_container: try: print(f"推流线程 '{thread_name}': 正在编码流的剩余部分...") for packet in stream.encode(None): output_container.mux(packet) print(f"推流线程 '{thread_name}': 编码剩余部分完成。") except Exception as e_flush: print(f"错误 (线程 '{thread_name}'): 关闭推流流时发生编码/刷新错误: {e_flush}") traceback.print_exc() if output_container: try: print(f"推流线程 '{thread_name}': 正在关闭推流容器...") output_container.close() print(f"推流线程 '{thread_name}': 推流容器已关闭。") except Exception as e_close: print(f"错误 (线程 '{thread_name}'): 关闭推流容器时发生错误: {e_close}") traceback.print_exc() print(f"推流线程 '{thread_name}' 已停止并完成清理。") def push_base64_frame(frame: np.ndarray, gateway: str, push_url: str, thread_name: str, category_counts: dict): """将帧转换为base64并推送到指定URL""" try: # 转换为JPEG格式 _, buffer = cv2.imencode('.jpg', frame) # 转换为base64字符串 frame_base64 = base64.b64encode(buffer).decode('utf-8') # 构建JSON数据 data = { "gateway": gateway, "frame_base64": frame_base64, "category_counts": category_counts # 使用传入的 category_counts } print(f"DEBUG push_base64_frame: Pushing data: {data.get('category_counts')}") # 调试打印,检查发送的数据 # 发送POST请求 response = http_client.post(push_url, json=data, timeout=5) # 检查响应 if response.status_code == 200: print(f"线程 '{thread_name}': base64帧已成功推送到 {push_url}") else: print(f"错误 (线程 '{thread_name}'): 推送base64帧失败,状态码: {response.status_code}") except Exception as e: print(f"错误 (线程 '{thread_name}'): 处理或推送base64帧时发生错误: {e}")