import threading import time from typing import Optional from mysql.connector import Error as MySQLError from ds.db import db from service.device_action_service import add_device_action _last_alarm_timestamps: dict[str, float] = {} _timestamp_lock = threading.Lock() # 获取所有去重的客户端IP列表 def get_unique_client_ips() -> list[str]: """获取所有去重的客户端IP列表""" conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) query = "SELECT DISTINCT client_ip FROM devices WHERE client_ip IS NOT NULL" cursor.execute(query) results = cursor.fetchall() return [item['client_ip'] for item in results] except MySQLError as e: raise Exception(f"获取客户端IP列表失败: {str(e)}") from e finally: db.close_connection(conn, cursor) # 通过客户端IP更新设备是否需要处理 def update_is_need_handler_by_client_ip(client_ip: str, is_need_handler: int) -> bool: """ 通过客户端IP更新设备的「是否需要处理」状态(is_need_handler字段) """ # 参数合法性校验 if not client_ip: raise ValueError("客户端IP不能为空") # 校验is_need_handler取值(需与数据库字段类型匹配、通常为0/1 tinyint) if is_need_handler not in (0, 1): raise ValueError("是否需要处理(is_need_handler)必须是0(不需要)或1(需要)") conn = None cursor = None try: # 2. 获取数据库连接与游标 conn = db.get_connection() cursor = conn.cursor(dictionary=True) # 3. 先校验设备是否存在(通过client_ip定位) cursor.execute( "SELECT id FROM devices WHERE client_ip = %s", (client_ip,) ) device = cursor.fetchone() if not device: raise ValueError(f"客户端IP为 {client_ip} 的设备不存在、无法更新「是否需要处理」状态") # 4. 执行更新操作(同时更新时间戳、保持与其他更新逻辑一致性) update_query = """ UPDATE devices SET is_need_handler = %s, updated_at = CURRENT_TIMESTAMP WHERE client_ip = %s """ cursor.execute(update_query, (is_need_handler, client_ip)) # 5. 确认更新生效(判断影响行数、避免无意义更新) if cursor.rowcount <= 0: raise Exception(f"更新失败:客户端IP {client_ip} 的设备未发生状态变更(可能已为目标值)") # 6. 提交事务 conn.commit() return True except MySQLError as e: # 数据库异常时回滚事务 if conn: conn.rollback() raise Exception(f"数据库操作失败:更新设备「是否需要处理」状态时出错 - {str(e)}") from e finally: # 无论成功失败、都关闭数据库连接(避免连接泄漏) db.close_connection(conn, cursor) def increment_alarm_count_by_ip(client_ip: str) -> bool: """ 通过客户端IP增加设备的报警次数,相同IP 200ms内重复调用会被忽略 :param client_ip: 客户端IP地址 :return: 操作是否成功(是否实际执行了数据库更新) """ if not client_ip: raise ValueError("客户端IP不能为空") current_time = time.time() # 获取当前时间戳(秒,含小数) with _timestamp_lock: # 确保线程安全的字典操作 last_time: Optional[float] = _last_alarm_timestamps.get(client_ip) # 如果存在最近记录且间隔小于200ms,直接返回False(不执行更新) if last_time is not None and (current_time - last_time) < 0.2: return False # 更新当前IP的最近调用时间 _last_alarm_timestamps[client_ip] = current_time # 2. 执行数据库更新操作(只有通过时间校验才会执行) conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) # 检查设备是否存在 cursor.execute("SELECT id FROM devices WHERE client_ip = %s", (client_ip,)) device = cursor.fetchone() if not device: raise ValueError(f"客户端IP为 {client_ip} 的设备不存在") # 报警次数加1、并更新时间戳 update_query = """ UPDATE devices SET alarm_count = alarm_count + 1, updated_at = CURRENT_TIMESTAMP WHERE client_ip = %s """ cursor.execute(update_query, (client_ip,)) conn.commit() return True except MySQLError as e: if conn: conn.rollback() raise Exception(f"更新报警次数失败: {str(e)}") from e finally: db.close_connection(conn, cursor) # 通过客户端IP更新设备在线状态 def update_online_status_by_ip(client_ip: str, online_status: int) -> bool: """ 通过客户端IP更新设备的在线状态 :param client_ip: 客户端IP地址 :param online_status: 在线状态(1-在线、0-离线) :return: 操作是否成功 """ if not client_ip: raise ValueError("客户端IP不能为空") if online_status not in (0, 1): raise ValueError("在线状态必须是0(离线)或1(在线)") conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) # 检查设备是否存在并获取设备ID cursor.execute("SELECT id, device_online_status FROM devices WHERE client_ip = %s", (client_ip,)) device = cursor.fetchone() if not device: raise ValueError(f"客户端IP为 {client_ip} 的设备不存在") # 状态无变化则不操作 if device['device_online_status'] == online_status: return True # 更新在线状态和时间戳 update_query = """ UPDATE devices SET device_online_status = %s, updated_at = CURRENT_TIMESTAMP WHERE client_ip = %s """ cursor.execute(update_query, (online_status, client_ip)) # 记录状态变更历史 add_device_action(client_ip, online_status) conn.commit() return True except MySQLError as e: if conn: conn.rollback() raise Exception(f"更新设备在线状态失败: {str(e)}") from e finally: db.close_connection(conn, cursor) # 通过客户端IP查询设备在数据库中存在 def is_device_exist_by_ip(client_ip: str) -> bool: """ 通过客户端IP查询设备在数据库中是否存在 """ if not client_ip: raise ValueError("客户端IP不能为空") conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) # 查询设备是否存在 cursor.execute( "SELECT id FROM devices WHERE client_ip = %s", (client_ip,) ) device = cursor.fetchone() # 如果查询到结果则存在,否则不存在 return bool(device) except MySQLError as e: raise Exception(f"查询设备是否存在失败: {str(e)}") from e finally: db.close_connection(conn, cursor) # 根据客户端IP获取是否需要处理 def get_is_need_handler_by_ip(client_ip: str) -> int: """ 通过客户端IP查询设备的is_need_handler状态 :param client_ip: 客户端IP地址 :return: 设备的is_need_handler状态(0-不需要处理,1-需要处理) """ if not client_ip: raise ValueError("客户端IP不能为空") conn = None cursor = None try: conn = db.get_connection() cursor = conn.cursor(dictionary=True) # 查询设备的is_need_handler状态 cursor.execute( "SELECT is_need_handler FROM devices WHERE client_ip = %s", (client_ip,) ) device = cursor.fetchone() if not device: raise ValueError(f"客户端IP为 {client_ip} 的设备不存在") return device['is_need_handler'] except MySQLError as e: raise Exception(f"查询设备is_need_handler状态失败: {str(e)}") from e finally: db.close_connection(conn, cursor)