Files
video_detect/service/device_service.py

251 lines
8.3 KiB
Python
Raw Permalink Normal View History

2025-09-30 17:17:20 +08:00
import threading
import time
from typing import Optional
from mysql.connector import Error as MySQLError
from ds.db import db
from service.device_action_service import add_device_action
_last_alarm_timestamps: dict[str, float] = {}
_timestamp_lock = threading.Lock()
# 获取所有去重的客户端IP列表
def get_unique_client_ips() -> list[str]:
"""获取所有去重的客户端IP列表"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT DISTINCT client_ip FROM devices WHERE client_ip IS NOT NULL"
cursor.execute(query)
results = cursor.fetchall()
return [item['client_ip'] for item in results]
except MySQLError as e:
raise Exception(f"获取客户端IP列表失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# 通过客户端IP更新设备是否需要处理
def update_is_need_handler_by_client_ip(client_ip: str, is_need_handler: int) -> bool:
"""
通过客户端IP更新设备的是否需要处理状态is_need_handler字段
"""
# 参数合法性校验
if not client_ip:
raise ValueError("客户端IP不能为空")
# 校验is_need_handler取值需与数据库字段类型匹配、通常为0/1 tinyint
if is_need_handler not in (0, 1):
raise ValueError("是否需要处理is_need_handler必须是0不需要或1需要")
conn = None
cursor = None
try:
# 2. 获取数据库连接与游标
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 3. 先校验设备是否存在通过client_ip定位
cursor.execute(
"SELECT id FROM devices WHERE client_ip = %s",
(client_ip,)
)
device = cursor.fetchone()
if not device:
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在、无法更新「是否需要处理」状态")
# 4. 执行更新操作(同时更新时间戳、保持与其他更新逻辑一致性)
update_query = """
UPDATE devices
SET is_need_handler = %s,
updated_at = CURRENT_TIMESTAMP
WHERE client_ip = %s
"""
cursor.execute(update_query, (is_need_handler, client_ip))
# 5. 确认更新生效(判断影响行数、避免无意义更新)
if cursor.rowcount <= 0:
raise Exception(f"更新失败客户端IP {client_ip} 的设备未发生状态变更(可能已为目标值)")
# 6. 提交事务
conn.commit()
return True
except MySQLError as e:
# 数据库异常时回滚事务
if conn:
conn.rollback()
raise Exception(f"数据库操作失败:更新设备「是否需要处理」状态时出错 - {str(e)}") from e
finally:
# 无论成功失败、都关闭数据库连接(避免连接泄漏)
db.close_connection(conn, cursor)
def increment_alarm_count_by_ip(client_ip: str) -> bool:
"""
通过客户端IP增加设备的报警次数相同IP 200ms内重复调用会被忽略
:param client_ip: 客户端IP地址
:return: 操作是否成功是否实际执行了数据库更新
"""
if not client_ip:
raise ValueError("客户端IP不能为空")
current_time = time.time() # 获取当前时间戳(秒,含小数)
with _timestamp_lock: # 确保线程安全的字典操作
last_time: Optional[float] = _last_alarm_timestamps.get(client_ip)
# 如果存在最近记录且间隔小于200ms直接返回False不执行更新
if last_time is not None and (current_time - last_time) < 0.2:
return False
# 更新当前IP的最近调用时间
_last_alarm_timestamps[client_ip] = current_time
# 2. 执行数据库更新操作(只有通过时间校验才会执行)
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 检查设备是否存在
cursor.execute("SELECT id FROM devices WHERE client_ip = %s", (client_ip,))
device = cursor.fetchone()
if not device:
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在")
# 报警次数加1、并更新时间戳
update_query = """
UPDATE devices
SET alarm_count = alarm_count + 1,
updated_at = CURRENT_TIMESTAMP
WHERE client_ip = %s
"""
cursor.execute(update_query, (client_ip,))
conn.commit()
return True
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"更新报警次数失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# 通过客户端IP更新设备在线状态
def update_online_status_by_ip(client_ip: str, online_status: int) -> bool:
"""
通过客户端IP更新设备的在线状态
:param client_ip: 客户端IP地址
:param online_status: 在线状态1-在线0-离线
:return: 操作是否成功
"""
if not client_ip:
raise ValueError("客户端IP不能为空")
if online_status not in (0, 1):
raise ValueError("在线状态必须是0离线或1在线")
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 检查设备是否存在并获取设备ID
cursor.execute("SELECT id, device_online_status FROM devices WHERE client_ip = %s", (client_ip,))
device = cursor.fetchone()
if not device:
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在")
# 状态无变化则不操作
if device['device_online_status'] == online_status:
return True
# 更新在线状态和时间戳
update_query = """
UPDATE devices
SET device_online_status = %s,
updated_at = CURRENT_TIMESTAMP
WHERE client_ip = %s
"""
cursor.execute(update_query, (online_status, client_ip))
# 记录状态变更历史
add_device_action(client_ip, online_status)
conn.commit()
return True
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"更新设备在线状态失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# 通过客户端IP查询设备在数据库中存在
def is_device_exist_by_ip(client_ip: str) -> bool:
"""
通过客户端IP查询设备在数据库中是否存在
"""
if not client_ip:
raise ValueError("客户端IP不能为空")
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 查询设备是否存在
cursor.execute(
"SELECT id FROM devices WHERE client_ip = %s",
(client_ip,)
)
device = cursor.fetchone()
# 如果查询到结果则存在,否则不存在
return bool(device)
except MySQLError as e:
raise Exception(f"查询设备是否存在失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# 根据客户端IP获取是否需要处理
def get_is_need_handler_by_ip(client_ip: str) -> int:
"""
通过客户端IP查询设备的is_need_handler状态
:param client_ip: 客户端IP地址
:return: 设备的is_need_handler状态0-不需要处理1-需要处理
"""
if not client_ip:
raise ValueError("客户端IP不能为空")
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 查询设备的is_need_handler状态
cursor.execute(
"SELECT is_need_handler FROM devices WHERE client_ip = %s",
(client_ip,)
)
device = cursor.fetchone()
if not device:
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在")
return device['is_need_handler']
except MySQLError as e:
raise Exception(f"查询设备is_need_handler状态失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)