from ultralytics import YOLO from service.model_service import get_current_yolo_model, get_current_conf_threshold # 新增置信度获取函数 def load_model(model_path=None): """加载YOLO模型(优先使用带版本校验的默认模型)""" if model_path is None: # 调用带版本校验的模型获取函数(自动判断是否需要重新加载) return get_current_yolo_model() try: # 加载指定路径模型(用于特殊场景) return YOLO(model_path) except Exception as e: print(f"YOLO模型加载失败(指定路径):{str(e)}") return None def detect(frame): """执行目标检测(使用动态置信度,仅模型版本变化时重新加载)""" # 获取模型(内部已做版本校验,未变化则直接返回缓存) current_model = load_model() if not current_model: return (False, "未加载到最新YOLO模型") if frame is None: return (False, "无效输入帧") try: # 获取动态置信度(从全局配置中读取) conf_threshold = get_current_conf_threshold() # 用当前模型执行检测(复用缓存,无额外加载耗时) results = current_model(frame, conf=conf_threshold, verbose=False) has_results = len(results[0].boxes) > 0 if results else False if not has_results: return (False, "未检测到目标") # 构建结果字符串 result_parts = [] for box in results[0].boxes: cls = int(box.cls[0]) conf = float(box.conf[0]) bbox = [round(x, 2) for x in box.xyxy[0].tolist()] # 保留两位小数 # 从当前模型中获取类别名(确保与模型匹配) class_name = current_model.names[cls] if hasattr(current_model, 'names') else f"类别{cls}" result_parts.append(f"{class_name}(置信度:{conf:.2f},位置:{bbox})") return (True, "; ".join(result_parts)) except Exception as e: print(f"YOLO检测过程出错:{str(e)}") return (False, f"检测错误:{str(e)}")