251 lines
8.3 KiB
Python
251 lines
8.3 KiB
Python
import threading
|
||
import time
|
||
from typing import Optional
|
||
|
||
from mysql.connector import Error as MySQLError
|
||
|
||
from ds.db import db
|
||
from service.device_action_service import add_device_action
|
||
_last_alarm_timestamps: dict[str, float] = {}
|
||
_timestamp_lock = threading.Lock()
|
||
|
||
# 获取所有去重的客户端IP列表
|
||
def get_unique_client_ips() -> list[str]:
|
||
"""获取所有去重的客户端IP列表"""
|
||
conn = None
|
||
cursor = None
|
||
try:
|
||
conn = db.get_connection()
|
||
cursor = conn.cursor(dictionary=True)
|
||
query = "SELECT DISTINCT client_ip FROM devices WHERE client_ip IS NOT NULL"
|
||
cursor.execute(query)
|
||
results = cursor.fetchall()
|
||
return [item['client_ip'] for item in results]
|
||
|
||
except MySQLError as e:
|
||
raise Exception(f"获取客户端IP列表失败: {str(e)}") from e
|
||
finally:
|
||
db.close_connection(conn, cursor)
|
||
|
||
# 通过客户端IP更新设备是否需要处理
|
||
def update_is_need_handler_by_client_ip(client_ip: str, is_need_handler: int) -> bool:
|
||
"""
|
||
通过客户端IP更新设备的「是否需要处理」状态(is_need_handler字段)
|
||
"""
|
||
# 参数合法性校验
|
||
if not client_ip:
|
||
raise ValueError("客户端IP不能为空")
|
||
|
||
# 校验is_need_handler取值(需与数据库字段类型匹配、通常为0/1 tinyint)
|
||
if is_need_handler not in (0, 1):
|
||
raise ValueError("是否需要处理(is_need_handler)必须是0(不需要)或1(需要)")
|
||
|
||
conn = None
|
||
cursor = None
|
||
try:
|
||
# 2. 获取数据库连接与游标
|
||
conn = db.get_connection()
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 3. 先校验设备是否存在(通过client_ip定位)
|
||
cursor.execute(
|
||
"SELECT id FROM devices WHERE client_ip = %s",
|
||
(client_ip,)
|
||
)
|
||
device = cursor.fetchone()
|
||
if not device:
|
||
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在、无法更新「是否需要处理」状态")
|
||
|
||
# 4. 执行更新操作(同时更新时间戳、保持与其他更新逻辑一致性)
|
||
update_query = """
|
||
UPDATE devices
|
||
SET is_need_handler = %s,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE client_ip = %s
|
||
"""
|
||
cursor.execute(update_query, (is_need_handler, client_ip))
|
||
|
||
# 5. 确认更新生效(判断影响行数、避免无意义更新)
|
||
if cursor.rowcount <= 0:
|
||
raise Exception(f"更新失败:客户端IP {client_ip} 的设备未发生状态变更(可能已为目标值)")
|
||
|
||
# 6. 提交事务
|
||
conn.commit()
|
||
return True
|
||
|
||
except MySQLError as e:
|
||
# 数据库异常时回滚事务
|
||
if conn:
|
||
conn.rollback()
|
||
raise Exception(f"数据库操作失败:更新设备「是否需要处理」状态时出错 - {str(e)}") from e
|
||
finally:
|
||
# 无论成功失败、都关闭数据库连接(避免连接泄漏)
|
||
db.close_connection(conn, cursor)
|
||
|
||
def increment_alarm_count_by_ip(client_ip: str) -> bool:
|
||
"""
|
||
通过客户端IP增加设备的报警次数,相同IP 200ms内重复调用会被忽略
|
||
|
||
:param client_ip: 客户端IP地址
|
||
:return: 操作是否成功(是否实际执行了数据库更新)
|
||
"""
|
||
if not client_ip:
|
||
raise ValueError("客户端IP不能为空")
|
||
|
||
current_time = time.time() # 获取当前时间戳(秒,含小数)
|
||
with _timestamp_lock: # 确保线程安全的字典操作
|
||
last_time: Optional[float] = _last_alarm_timestamps.get(client_ip)
|
||
|
||
# 如果存在最近记录且间隔小于200ms,直接返回False(不执行更新)
|
||
if last_time is not None and (current_time - last_time) < 0.2:
|
||
return False
|
||
|
||
# 更新当前IP的最近调用时间
|
||
_last_alarm_timestamps[client_ip] = current_time
|
||
|
||
# 2. 执行数据库更新操作(只有通过时间校验才会执行)
|
||
conn = None
|
||
cursor = None
|
||
try:
|
||
conn = db.get_connection()
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 检查设备是否存在
|
||
cursor.execute("SELECT id FROM devices WHERE client_ip = %s", (client_ip,))
|
||
device = cursor.fetchone()
|
||
if not device:
|
||
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在")
|
||
|
||
# 报警次数加1、并更新时间戳
|
||
update_query = """
|
||
UPDATE devices
|
||
SET alarm_count = alarm_count + 1,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE client_ip = %s
|
||
"""
|
||
cursor.execute(update_query, (client_ip,))
|
||
conn.commit()
|
||
|
||
return True
|
||
except MySQLError as e:
|
||
if conn:
|
||
conn.rollback()
|
||
raise Exception(f"更新报警次数失败: {str(e)}") from e
|
||
finally:
|
||
db.close_connection(conn, cursor)
|
||
|
||
# 通过客户端IP更新设备在线状态
|
||
def update_online_status_by_ip(client_ip: str, online_status: int) -> bool:
|
||
"""
|
||
通过客户端IP更新设备的在线状态
|
||
|
||
:param client_ip: 客户端IP地址
|
||
:param online_status: 在线状态(1-在线、0-离线)
|
||
:return: 操作是否成功
|
||
"""
|
||
if not client_ip:
|
||
raise ValueError("客户端IP不能为空")
|
||
|
||
if online_status not in (0, 1):
|
||
raise ValueError("在线状态必须是0(离线)或1(在线)")
|
||
|
||
conn = None
|
||
cursor = None
|
||
try:
|
||
conn = db.get_connection()
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 检查设备是否存在并获取设备ID
|
||
cursor.execute("SELECT id, device_online_status FROM devices WHERE client_ip = %s", (client_ip,))
|
||
device = cursor.fetchone()
|
||
if not device:
|
||
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在")
|
||
|
||
# 状态无变化则不操作
|
||
if device['device_online_status'] == online_status:
|
||
return True
|
||
|
||
# 更新在线状态和时间戳
|
||
update_query = """
|
||
UPDATE devices
|
||
SET device_online_status = %s,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE client_ip = %s
|
||
"""
|
||
cursor.execute(update_query, (online_status, client_ip))
|
||
|
||
# 记录状态变更历史
|
||
add_device_action(client_ip, online_status)
|
||
|
||
conn.commit()
|
||
return True
|
||
except MySQLError as e:
|
||
if conn:
|
||
conn.rollback()
|
||
raise Exception(f"更新设备在线状态失败: {str(e)}") from e
|
||
finally:
|
||
db.close_connection(conn, cursor)
|
||
|
||
# 通过客户端IP查询设备在数据库中存在
|
||
def is_device_exist_by_ip(client_ip: str) -> bool:
|
||
"""
|
||
通过客户端IP查询设备在数据库中是否存在
|
||
"""
|
||
if not client_ip:
|
||
raise ValueError("客户端IP不能为空")
|
||
|
||
conn = None
|
||
cursor = None
|
||
try:
|
||
conn = db.get_connection()
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 查询设备是否存在
|
||
cursor.execute(
|
||
"SELECT id FROM devices WHERE client_ip = %s",
|
||
(client_ip,)
|
||
)
|
||
device = cursor.fetchone()
|
||
|
||
# 如果查询到结果则存在,否则不存在
|
||
return bool(device)
|
||
|
||
except MySQLError as e:
|
||
raise Exception(f"查询设备是否存在失败: {str(e)}") from e
|
||
finally:
|
||
db.close_connection(conn, cursor)
|
||
|
||
# 根据客户端IP获取是否需要处理
|
||
def get_is_need_handler_by_ip(client_ip: str) -> int:
|
||
"""
|
||
通过客户端IP查询设备的is_need_handler状态
|
||
|
||
:param client_ip: 客户端IP地址
|
||
:return: 设备的is_need_handler状态(0-不需要处理,1-需要处理)
|
||
"""
|
||
if not client_ip:
|
||
raise ValueError("客户端IP不能为空")
|
||
|
||
conn = None
|
||
cursor = None
|
||
try:
|
||
conn = db.get_connection()
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 查询设备的is_need_handler状态
|
||
cursor.execute(
|
||
"SELECT is_need_handler FROM devices WHERE client_ip = %s",
|
||
(client_ip,)
|
||
)
|
||
device = cursor.fetchone()
|
||
|
||
if not device:
|
||
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在")
|
||
|
||
return device['is_need_handler']
|
||
|
||
except MySQLError as e:
|
||
raise Exception(f"查询设备is_need_handler状态失败: {str(e)}") from e
|
||
finally:
|
||
db.close_connection(conn, cursor)
|