Files
video_detect/service/device_service.py
2025-09-30 17:17:20 +08:00

251 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import time
from typing import Optional
from mysql.connector import Error as MySQLError
from ds.db import db
from service.device_action_service import add_device_action
_last_alarm_timestamps: dict[str, float] = {}
_timestamp_lock = threading.Lock()
# 获取所有去重的客户端IP列表
def get_unique_client_ips() -> list[str]:
"""获取所有去重的客户端IP列表"""
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT DISTINCT client_ip FROM devices WHERE client_ip IS NOT NULL"
cursor.execute(query)
results = cursor.fetchall()
return [item['client_ip'] for item in results]
except MySQLError as e:
raise Exception(f"获取客户端IP列表失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# 通过客户端IP更新设备是否需要处理
def update_is_need_handler_by_client_ip(client_ip: str, is_need_handler: int) -> bool:
"""
通过客户端IP更新设备的「是否需要处理」状态is_need_handler字段
"""
# 参数合法性校验
if not client_ip:
raise ValueError("客户端IP不能为空")
# 校验is_need_handler取值需与数据库字段类型匹配、通常为0/1 tinyint
if is_need_handler not in (0, 1):
raise ValueError("是否需要处理is_need_handler必须是0不需要或1需要")
conn = None
cursor = None
try:
# 2. 获取数据库连接与游标
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 3. 先校验设备是否存在通过client_ip定位
cursor.execute(
"SELECT id FROM devices WHERE client_ip = %s",
(client_ip,)
)
device = cursor.fetchone()
if not device:
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在、无法更新「是否需要处理」状态")
# 4. 执行更新操作(同时更新时间戳、保持与其他更新逻辑一致性)
update_query = """
UPDATE devices
SET is_need_handler = %s,
updated_at = CURRENT_TIMESTAMP
WHERE client_ip = %s
"""
cursor.execute(update_query, (is_need_handler, client_ip))
# 5. 确认更新生效(判断影响行数、避免无意义更新)
if cursor.rowcount <= 0:
raise Exception(f"更新失败客户端IP {client_ip} 的设备未发生状态变更(可能已为目标值)")
# 6. 提交事务
conn.commit()
return True
except MySQLError as e:
# 数据库异常时回滚事务
if conn:
conn.rollback()
raise Exception(f"数据库操作失败:更新设备「是否需要处理」状态时出错 - {str(e)}") from e
finally:
# 无论成功失败、都关闭数据库连接(避免连接泄漏)
db.close_connection(conn, cursor)
def increment_alarm_count_by_ip(client_ip: str) -> bool:
"""
通过客户端IP增加设备的报警次数相同IP 200ms内重复调用会被忽略
:param client_ip: 客户端IP地址
:return: 操作是否成功(是否实际执行了数据库更新)
"""
if not client_ip:
raise ValueError("客户端IP不能为空")
current_time = time.time() # 获取当前时间戳(秒,含小数)
with _timestamp_lock: # 确保线程安全的字典操作
last_time: Optional[float] = _last_alarm_timestamps.get(client_ip)
# 如果存在最近记录且间隔小于200ms直接返回False不执行更新
if last_time is not None and (current_time - last_time) < 0.2:
return False
# 更新当前IP的最近调用时间
_last_alarm_timestamps[client_ip] = current_time
# 2. 执行数据库更新操作(只有通过时间校验才会执行)
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 检查设备是否存在
cursor.execute("SELECT id FROM devices WHERE client_ip = %s", (client_ip,))
device = cursor.fetchone()
if not device:
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在")
# 报警次数加1、并更新时间戳
update_query = """
UPDATE devices
SET alarm_count = alarm_count + 1,
updated_at = CURRENT_TIMESTAMP
WHERE client_ip = %s
"""
cursor.execute(update_query, (client_ip,))
conn.commit()
return True
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"更新报警次数失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# 通过客户端IP更新设备在线状态
def update_online_status_by_ip(client_ip: str, online_status: int) -> bool:
"""
通过客户端IP更新设备的在线状态
:param client_ip: 客户端IP地址
:param online_status: 在线状态1-在线、0-离线)
:return: 操作是否成功
"""
if not client_ip:
raise ValueError("客户端IP不能为空")
if online_status not in (0, 1):
raise ValueError("在线状态必须是0离线或1在线")
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 检查设备是否存在并获取设备ID
cursor.execute("SELECT id, device_online_status FROM devices WHERE client_ip = %s", (client_ip,))
device = cursor.fetchone()
if not device:
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在")
# 状态无变化则不操作
if device['device_online_status'] == online_status:
return True
# 更新在线状态和时间戳
update_query = """
UPDATE devices
SET device_online_status = %s,
updated_at = CURRENT_TIMESTAMP
WHERE client_ip = %s
"""
cursor.execute(update_query, (online_status, client_ip))
# 记录状态变更历史
add_device_action(client_ip, online_status)
conn.commit()
return True
except MySQLError as e:
if conn:
conn.rollback()
raise Exception(f"更新设备在线状态失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# 通过客户端IP查询设备在数据库中存在
def is_device_exist_by_ip(client_ip: str) -> bool:
"""
通过客户端IP查询设备在数据库中是否存在
"""
if not client_ip:
raise ValueError("客户端IP不能为空")
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 查询设备是否存在
cursor.execute(
"SELECT id FROM devices WHERE client_ip = %s",
(client_ip,)
)
device = cursor.fetchone()
# 如果查询到结果则存在,否则不存在
return bool(device)
except MySQLError as e:
raise Exception(f"查询设备是否存在失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)
# 根据客户端IP获取是否需要处理
def get_is_need_handler_by_ip(client_ip: str) -> int:
"""
通过客户端IP查询设备的is_need_handler状态
:param client_ip: 客户端IP地址
:return: 设备的is_need_handler状态0-不需要处理1-需要处理)
"""
if not client_ip:
raise ValueError("客户端IP不能为空")
conn = None
cursor = None
try:
conn = db.get_connection()
cursor = conn.cursor(dictionary=True)
# 查询设备的is_need_handler状态
cursor.execute(
"SELECT is_need_handler FROM devices WHERE client_ip = %s",
(client_ip,)
)
device = cursor.fetchone()
if not device:
raise ValueError(f"客户端IP为 {client_ip} 的设备不存在")
return device['is_need_handler']
except MySQLError as e:
raise Exception(f"查询设备is_need_handler状态失败: {str(e)}") from e
finally:
db.close_connection(conn, cursor)