277 lines
12 KiB
Python
277 lines
12 KiB
Python
|
import base64
|
|||
|
import hashlib
|
|||
|
import queue
|
|||
|
import time
|
|||
|
from multiprocessing import Process, Event, Queue as MpQueue
|
|||
|
from typing import Dict, Any
|
|||
|
|
|||
|
import cv2
|
|||
|
import numpy as np
|
|||
|
from fastapi import FastAPI, Request, HTTPException
|
|||
|
|
|||
|
from api_server import model_management_router, initialize_default_model_on_startup, get_active_detector, \
|
|||
|
get_active_model_identifier
|
|||
|
from data_pusher import pusher_router, initialize_data_pusher, get_data_pusher_instance
|
|||
|
from frame_transfer import yolo_frame, push_frame
|
|||
|
from result import Response
|
|||
|
from rfdetr_core import RFDETRDetector
|
|||
|
from rtc_handler import rtc_frame
|
|||
|
|
|||
|
app = FastAPI(title="Real-time Video Processing, Model Management, and Data Pusher API")
|
|||
|
|
|||
|
app.include_router(model_management_router, prefix="/api", tags=["Model Management"])
|
|||
|
app.include_router(pusher_router, prefix="/api/pusher", tags=["Data Pusher"])
|
|||
|
|
|||
|
process_map: Dict[str, Dict[str, Any]] = {}
|
|||
|
|
|||
|
|
|||
|
@app.on_event("startup")
|
|||
|
async def web_app_startup_event():
|
|||
|
print("主应用服务启动中...")
|
|||
|
await initialize_default_model_on_startup()
|
|||
|
active_detector_instance = get_active_detector()
|
|||
|
if active_detector_instance:
|
|||
|
print(f"主应用启动:检测到活动模型 '{get_active_model_identifier()}',将用于初始化 DataPusher。")
|
|||
|
else:
|
|||
|
print("主应用启动:未检测到活动模型。DataPusher 将以无检测器状态初始化。")
|
|||
|
initialize_data_pusher(active_detector_instance)
|
|||
|
print("DataPusher 服务已初始化。")
|
|||
|
print("主应用启动完成。")
|
|||
|
|
|||
|
|
|||
|
@app.on_event("shutdown")
|
|||
|
async def web_app_shutdown_event():
|
|||
|
print("主应用服务关闭中...")
|
|||
|
pusher = get_data_pusher_instance()
|
|||
|
if pusher:
|
|||
|
print("正在关闭 DataPusher 调度器...")
|
|||
|
pusher.shutdown_scheduler()
|
|||
|
else:
|
|||
|
print("DataPusher 实例未找到,跳过调度器关闭。")
|
|||
|
|
|||
|
print("正在尝试终止所有活动的视频处理子进程...")
|
|||
|
for url, task_info in list(process_map.items()):
|
|||
|
process: Process = task_info['process']
|
|||
|
stop_event: Event = task_info['stop_event']
|
|||
|
data_q: MpQueue = task_info['data_queue']
|
|||
|
|
|||
|
if process.is_alive():
|
|||
|
print(f"向进程 {url} 发送停止信号...")
|
|||
|
stop_event.set()
|
|||
|
try:
|
|||
|
process.join(timeout=15)
|
|||
|
if process.is_alive():
|
|||
|
print(f"进程 {url} 在优雅关闭超时后仍然存活,尝试 terminate。")
|
|||
|
process.terminate()
|
|||
|
process.join(timeout=5)
|
|||
|
if process.is_alive():
|
|||
|
print(f"进程 {url} 在 terminate 后仍然存活,尝试 kill。")
|
|||
|
process.kill()
|
|||
|
process.join(timeout=2)
|
|||
|
except Exception as e:
|
|||
|
print(f"关闭/终止进程 {url} 时发生错误: {e}")
|
|||
|
|
|||
|
try:
|
|||
|
if not data_q.empty():
|
|||
|
pass
|
|||
|
data_q.close()
|
|||
|
data_q.join_thread()
|
|||
|
except Exception as e_q_cleanup:
|
|||
|
print(f"清理进程 {url} 的数据队列时出错: {e_q_cleanup}")
|
|||
|
|
|||
|
del process_map[url]
|
|||
|
print("所有视频处理子进程已尝试终止和清理。")
|
|||
|
print("主应用服务关闭完成。")
|
|||
|
|
|||
|
|
|||
|
def start_video_processing(url: str, rtmp_url: str, model_config_name: str, stop_event: Event, data_queue: MpQueue,
|
|||
|
gateway: str,frequency:int, push_url:str):
|
|||
|
print(f"视频处理子进程启动 (URL: {url}, Model: {model_config_name})")
|
|||
|
detector_instance_for_stream: RFDETRDetector = None
|
|||
|
producer_thread, transfer_thread, consumer_thread = None, None, None
|
|||
|
|
|||
|
try:
|
|||
|
print(f"正在为流 {url} 初始化模型: {model_config_name}...")
|
|||
|
detector_instance_for_stream = RFDETRDetector(config_name=model_config_name)
|
|||
|
print(f"模型 {model_config_name} 为流 {url} 初始化成功。")
|
|||
|
rtc_q = queue.Queue(maxsize=10000)
|
|||
|
yolo_q = queue.Queue(maxsize=10000)
|
|||
|
import threading
|
|||
|
producer_thread = threading.Thread(target=rtc_frame, args=(url, rtc_q), name=f"RTC-{url[:20]}", daemon=True)
|
|||
|
transfer_thread = threading.Thread(target=yolo_frame, args=(rtc_q, yolo_q, detector_instance_for_stream),
|
|||
|
name=f"YOLO-{url[:20]}", daemon=True)
|
|||
|
consumer_thread = threading.Thread(target=push_frame, args=(yolo_q, rtmp_url,gateway,frequency,push_url), name=f"Push-{url[:20]}",
|
|||
|
daemon=True)
|
|||
|
|
|||
|
producer_thread.start()
|
|||
|
transfer_thread.start()
|
|||
|
consumer_thread.start()
|
|||
|
|
|||
|
stop_event.wait()
|
|||
|
print(f"子进程 {url}: 收到停止信号。准备关闭线程...")
|
|||
|
|
|||
|
except FileNotFoundError as e:
|
|||
|
print(f"错误 (视频进程 {url}): 模型配置文件 '{model_config_name}.json' 未找到。错误: {e}")
|
|||
|
except Exception as e:
|
|||
|
print(f"错误 (视频进程 {url}): 初始化或运行时错误。错误: {e}")
|
|||
|
finally:
|
|||
|
print(f"视频处理子进程 {url} 进入 finally 块。")
|
|||
|
|
|||
|
if producer_thread and producer_thread.is_alive():
|
|||
|
print(f"子进程 {url}: producer_thread is still alive (daemon).")
|
|||
|
if transfer_thread and transfer_thread.is_alive():
|
|||
|
print(f"子进程 {url}: transfer_thread is still alive (daemon).")
|
|||
|
if consumer_thread and consumer_thread.is_alive():
|
|||
|
print(f"子进程 {url}: consumer_thread is still alive (daemon).")
|
|||
|
|
|||
|
if detector_instance_for_stream:
|
|||
|
print(f"子进程 {url}: 收集最后数据...")
|
|||
|
final_counts = getattr(detector_instance_for_stream, 'category_counts', {})
|
|||
|
final_frame_np = getattr(detector_instance_for_stream, 'last_annotated_frame', None)
|
|||
|
frame_base64 = None
|
|||
|
if final_frame_np is not None and isinstance(final_frame_np, np.ndarray):
|
|||
|
try:
|
|||
|
_, buffer = cv2.imencode('.jpg', final_frame_np)
|
|||
|
frame_base64 = base64.b64encode(buffer).decode('utf-8')
|
|||
|
except Exception as e_encode:
|
|||
|
print(f"子进程 {url}: 帧编码错误: {e_encode}")
|
|||
|
|
|||
|
payload = {
|
|||
|
"timestamp": time.time(),
|
|||
|
"category_counts": final_counts,
|
|||
|
"frame_base64": frame_base64,
|
|||
|
"source_url": url,
|
|||
|
"event": "task_stopped_final_data"
|
|||
|
}
|
|||
|
try:
|
|||
|
data_queue.put(payload, timeout=5)
|
|||
|
print(f"子进程 {url}: 已将最终数据放入队列。")
|
|||
|
except queue.Full:
|
|||
|
print(f"子进程 {url}: 无法将最终数据放入队列 (队列已满或超时)。")
|
|||
|
except Exception as e_put:
|
|||
|
print(f"子进程 {url}: 将最终数据放入队列时发生错误: {e_put}")
|
|||
|
else:
|
|||
|
print(f"子进程 {url}: 检测器实例不可用,无法发送最终数据。")
|
|||
|
|
|||
|
try:
|
|||
|
data_queue.close()
|
|||
|
except Exception as e_q_close:
|
|||
|
print(f"子进程 {url}: 关闭数据队列时出错: {e_q_close}")
|
|||
|
print(f"视频处理子进程 {url} 执行完毕。")
|
|||
|
|
|||
|
|
|||
|
@app.post("/start_video", tags=["Video Processing"])
|
|||
|
async def start_video(request: Request):
|
|||
|
data = await request.json()
|
|||
|
url = data.get("url")
|
|||
|
model_identifier_to_use = data.get("model_identifier")
|
|||
|
host = data.get("host")
|
|||
|
rtmp_port = data.get("rtmp_port")
|
|||
|
rtc_port = data.get("rtc_port")
|
|||
|
gateway = data.get("gateway")
|
|||
|
frequency = data.get("frequency")
|
|||
|
push_url = data.get("push_url")
|
|||
|
|
|||
|
# 生成MD5
|
|||
|
md5_hash = hashlib.md5(url.encode()).hexdigest()
|
|||
|
rtmp_url = f"rtmp://{host}:{rtmp_port}/live/{md5_hash}"
|
|||
|
rtc_url = f"http://{host}:{rtc_port}/rtc/v1/whep/?{md5_hash}"
|
|||
|
if not url or not rtmp_url:
|
|||
|
raise HTTPException(status_code=400, detail="'url' 和 'rtmp_url' 字段是必须的。")
|
|||
|
|
|||
|
if not model_identifier_to_use:
|
|||
|
print(f"请求中未指定 model_identifier,尝试使用全局激活的模型。")
|
|||
|
model_identifier_to_use = get_active_model_identifier()
|
|||
|
if not model_identifier_to_use:
|
|||
|
raise HTTPException(status_code=400, detail="请求中未指定 'model_identifier',且当前无全局激活的默认模型。")
|
|||
|
print(f"将为流 {url} 使用当前全局激活的模型: {model_identifier_to_use}")
|
|||
|
|
|||
|
if url in process_map and process_map[url]['process'].is_alive():
|
|||
|
raise HTTPException(status_code=409, detail=f"视频处理进程已在运行: {url}")
|
|||
|
|
|||
|
print(f"请求启动视频处理: URL = {url}, RTMP = {rtmp_url}, Model = {model_identifier_to_use}")
|
|||
|
|
|||
|
stop_event = Event()
|
|||
|
data_queue = MpQueue(maxsize=1)
|
|||
|
|
|||
|
process = Process(target=start_video_processing,
|
|||
|
args=(url, rtmp_url, model_identifier_to_use, stop_event, data_queue, gateway, frequency, push_url))
|
|||
|
process.start()
|
|||
|
process_map[url] = {'process': process, 'stop_event': stop_event, 'data_queue': data_queue}
|
|||
|
return Response.success(message=f"视频处理已为 URL '{url}' 使用模型 '{model_identifier_to_use}' 启动。",
|
|||
|
data=rtc_url)
|
|||
|
|
|||
|
|
|||
|
@app.post("/stop_video", tags=["Video Processing"])
|
|||
|
async def stop_video(request: Request):
|
|||
|
data = await request.json()
|
|||
|
url = data.get("url")
|
|||
|
if not url:
|
|||
|
raise HTTPException(status_code=400, detail="'url' 字段是必须的。")
|
|||
|
|
|||
|
task_info = process_map.get(url)
|
|||
|
if not task_info:
|
|||
|
raise HTTPException(status_code=404, detail=f"没有找到与 URL '{url}' 匹配的活动视频处理进程。")
|
|||
|
|
|||
|
process: Process = task_info['process']
|
|||
|
stop_event: Event = task_info['stop_event']
|
|||
|
data_q: MpQueue = task_info['data_queue']
|
|||
|
|
|||
|
final_data_pushed = False
|
|||
|
if process.is_alive():
|
|||
|
print(f"请求停止视频处理: {url}. 发送停止信号...")
|
|||
|
stop_event.set()
|
|||
|
process.join(timeout=20)
|
|||
|
|
|||
|
if process.is_alive():
|
|||
|
print(f"警告: 视频处理进程 {url} 在超时后未能正常终止,尝试强制结束。")
|
|||
|
process.terminate()
|
|||
|
process.join(timeout=5)
|
|||
|
if process.is_alive():
|
|||
|
print(f"错误: 视频处理进程 {url} 强制结束后仍然存在。尝试 kill。")
|
|||
|
process.kill()
|
|||
|
process.join(timeout=2)
|
|||
|
else:
|
|||
|
print(f"进程 {url} 已优雅停止。尝试获取最后数据...")
|
|||
|
try:
|
|||
|
final_payload = data_q.get(timeout=10)
|
|||
|
print(f"从停止的任务 {url} 收到最终数据。")
|
|||
|
|
|||
|
pusher_instance = get_data_pusher_instance()
|
|||
|
if pusher_instance and pusher_instance.target_url:
|
|||
|
print(f"准备将任务 {url} 的最后数据推送到 {pusher_instance.target_url}")
|
|||
|
pusher_instance.push_specific_payload(final_payload)
|
|||
|
final_data_pushed = True
|
|||
|
elif pusher_instance:
|
|||
|
print(
|
|||
|
f"DataPusher 服务已配置,但未设置目标URL (pusher.target_url is None)。无法推送任务 {url} 的最后数据。")
|
|||
|
else:
|
|||
|
print(f"DataPusher 服务未初始化或不可用。无法推送任务 {url} 的最后数据。")
|
|||
|
|
|||
|
except queue.Empty:
|
|||
|
print(f"警告: 任务 {url} 优雅停止后,未从其数据队列中获取到最终数据 (队列为空或超时)。")
|
|||
|
except Exception as e_q_get:
|
|||
|
print(f"获取或处理来自任务 {url} 的最终数据时发生错误: {e_q_get}")
|
|||
|
else:
|
|||
|
print(f"视频处理进程先前已停止或已结束: {url}")
|
|||
|
|
|||
|
try:
|
|||
|
while not data_q.empty():
|
|||
|
try:
|
|||
|
data_q.get_nowait()
|
|||
|
except queue.Empty:
|
|||
|
break
|
|||
|
data_q.close()
|
|||
|
data_q.join_thread()
|
|||
|
except Exception as e_q_final_cleanup:
|
|||
|
print(f"清理任务 {url} 的数据队列的最后步骤中发生错误: {e_q_final_cleanup}")
|
|||
|
|
|||
|
del process_map[url]
|
|||
|
message = f"视频处理已为 URL '{url}' 停止。"
|
|||
|
if final_data_pushed:
|
|||
|
message += " 已尝试推送最后的数据。"
|
|||
|
elif process.exitcode == 0:
|
|||
|
message += " 进程已退出,但未确认最后数据推送 (可能未配置推送或队列问题)。"
|
|||
|
|
|||
|
return Response.success(message=message)
|