diff --git a/xinnengyuan/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java b/xinnengyuan/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java index ef5cfc96..d6bb9112 100644 --- a/xinnengyuan/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java +++ b/xinnengyuan/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java @@ -13,6 +13,7 @@ import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.server.HandshakeInterceptor; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * WebSocket 配置 diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/pom.xml b/xinnengyuan/ruoyi-modules/ruoyi-system/pom.xml index ce758e5f..cb857252 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/pom.xml +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/pom.xml @@ -19,11 +19,15 @@ + + + + + + - javax.websocket - javax.websocket-api - 1.1 - provided + org.springframework.boot + spring-boot-starter-websocket diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/DeviceWebSocketServer.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/DeviceWebSocketServer.java index 33532311..01fb3f48 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/DeviceWebSocketServer.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/DeviceWebSocketServer.java @@ -1,153 +1,161 @@ package org.dromara.mobileAttendanceMachine; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.websocket.*; +import jakarta.websocket.server.ServerEndpoint; import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; -import javax.websocket.*; -import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.http.WebSocket; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** - * WebSocket服务端实现(对应Golang的HandleWebSocket逻辑) - * ServerEndpoint注解指定WebSocket连接路径 + * WebSocket服务端(设备连接管理、消息处理) + * 核心逻辑:通过状态标记区分设备注册阶段和正常通信阶段,避免多处理器冲突 */ -@ServerEndpoint("/ws/device") +@ServerEndpoint("/custom-ws/device") +@Component @Log4j2 public class DeviceWebSocketServer { - // ------------------------------ 常量定义(对应Golang的const) ------------------------------ - public static class Constants { - public static final String DECLARE = "declare"; // 设备初上线 - public static final String PING = "ping"; // 心跳 - public static final String TO_CLIENT = "to_client"; // 服务器消息下发到客户端的响应 - } + // ------------------------------ 常量定义 ------------------------------ + public static final String DECLARE = "declare"; // 设备注册消息 + public static final String PING = "ping"; // 心跳消息 + public static final String TO_CLIENT = "to_client"; // 设备响应消息 + private static final int REGISTER_TIMEOUT = 10; // 注册超时时间(秒) - // JSON序列化/反序列化工具(单例) - private static final ObjectMapper objectMapper = new ObjectMapper(); - - // 1. 存储所有连接的设备信息(key: 设备SN,value: 设备信息) + // ------------------------------ 全局静态存储 ------------------------------ + // JSON序列化工具(单例) + private static final ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);; + // 设备连接池:key=设备SN,value=设备信息(含Session) private static final Map connectedDevices = new ConcurrentHashMap<>(); - // 2. 存储UUID对应的响应通道(key: UUID,value: 响应结果容器) - private static final Map> responseChannels = new ConcurrentHashMap<>(); + // 响应通道:key=UUID,value=响应结果容器 + private static final Map responseChannels = new ConcurrentHashMap<>(); + // 设备-SN反向映射:key=设备SN,value=关联的UUID列表(用于连接关闭时清理) + private static final Map> snToUuids = new ConcurrentHashMap<>(); - // 当前连接的WebSocket会话 - private Session session; - // 当前连接的设备SN(连接建立后从DECLARE消息中提取) - private String currentDeviceSn; + // ------------------------------ 每个连接的实例变量 ------------------------------ + private Session session; // 当前WebSocket会话 + private String currentDeviceSn; // 当前连接的设备SN(注册后赋值) + private volatile boolean isRegistered; // 注册状态:false=未注册,true=已注册 + private CompletableFuture registerFuture; // 注册阶段的消息Future + // ------------------------------ 连接生命周期方法 ------------------------------ /** - * 连接建立时触发(对应Golang中upgrader.Upgrade后的初始化逻辑) + * 连接建立时触发(初始化状态) */ @OnOpen public void onOpen(Session session) { this.session = session; - log.info("新的WebSocket连接建立,会话ID: {}", session.getId()); + this.isRegistered = false; + this.registerFuture = new CompletableFuture<>(); // 初始化注册Future + + log.info("新连接建立,会话ID: {},等待设备注册({}秒超时)...", session.getId(), REGISTER_TIMEOUT); + + // 启动注册超时监听 + CompletableFuture.runAsync(() -> { + try { + // 等待注册消息,超时则关闭连接 + if (!registerFuture.get(REGISTER_TIMEOUT, TimeUnit.SECONDS)) { + throw new TimeoutException("注册超时"); + } + } catch (Exception e) { + log.error("设备注册超时/失败,会话ID: {}", session.getId(), e); + try { + if (session.isOpen()) { + session.close(new CloseReason(CloseReason.CloseCodes.TRY_AGAIN_LATER, "注册超时(请发送DECLARE消息)")); + } + } catch (IOException ex) { + log.error("关闭超时连接失败", ex); + } + } + }); + } + + /** + * 接收消息时触发(所有消息统一处理,按状态区分) + */ + @OnMessage + public void onMessage(String message, Session session) { + if (message == null || message.isEmpty()) { + log.warn("会话ID: {} 收到空消息,忽略", session.getId()); + return; + } try { - // 读取设备第一条消息(DECLARE消息),完成设备注册 - KqjEntity.DeclareMessage declareMsg = registerDevice(); - this.currentDeviceSn = declareMsg.getSn(); - log.info("设备注册成功,SN: {}, 设备信息: {}", currentDeviceSn, declareMsg); + // 解析通用消息的CMD(所有消息必须包含CMD) + KqjEntity.GenericMessage genericMsg = objectMapper.readValue(message, KqjEntity.GenericMessage.class); + String cmd = genericMsg.getCmd(); + if (cmd == null) { + log.warn("会话ID: {} 收到无CMD消息,关闭连接: {}", session.getId(), message); + session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "消息缺少cmd字段")); + return; + } + + // 未注册状态:仅处理DECLARE注册消息 + if (!isRegistered) { + handleRegisterStage(message, cmd); + return; + } + + // 已注册状态:处理正常业务消息 + handleNormalStage(message, cmd); } catch (Exception e) { - log.error("设备注册失败,关闭连接", e); + log.error("会话ID: {} 处理消息失败: {}", session.getId(), message, e); try { - session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "设备注册失败")); + if (session.isOpen()) { + session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "消息处理异常")); + } } catch (IOException ex) { log.error("关闭异常连接失败", ex); } } } - /** - * 接收客户端消息时触发(对应Golang的for循环读取消息逻辑) - */ - @OnMessage - public void onMessage(String message, Session session) { - if (message == null || message.isEmpty()) { - log.warn("收到空消息,忽略处理"); - return; - } - - try { - // 先解析通用消息的CMD字段(对应Golang的GenericMessage) - KqjEntity.GenericMessage genericMsg = objectMapper.readValue(message, KqjEntity.GenericMessage.class); - String cmd = genericMsg.getCmd(); - if (cmd == null) { - log.warn("收到无CMD字段的消息,忽略: {}", message); - return; - } - - // 根据CMD类型处理不同逻辑(对应Golang的switch case) - switch (cmd) { - case Constants.DECLARE: - log.info("设备在线心跳(DECLARE),SN: {}", currentDeviceSn); - break; - - case Constants.PING: - handlePing(message); - break; - - case Constants.TO_CLIENT: - handleToClientResponse(message); - break; - - default: - log.warn("收到未知CMD消息,类型: {}, 内容: {}", cmd, message); - } - - } catch (Exception e) { - log.error("处理消息失败,消息内容: {}", message, e); - } - } - - - /** - * 连接关闭时触发(对应Golang的defer conn.Close()和资源清理逻辑) + * 连接关闭时触发(清理资源) */ @OnClose public void onClose(Session session, CloseReason closeReason) { - log.info("WebSocket连接关闭,会话ID: {}, 原因: {}", session.getId(), closeReason); + log.info("连接关闭,会话ID: {},原因: {}", session.getId(), closeReason); - // 1. 移除设备连接信息 - if (currentDeviceSn != null) { + // 1. 移除设备连接(若已注册) + if (currentDeviceSn != null && connectedDevices.containsKey(currentDeviceSn)) { connectedDevices.remove(currentDeviceSn); - // 更新设备状态为离线(对应Golang的service.BusAttendanceMachine().Change) + log.info("设备离线,SN: {},会话ID: {}", currentDeviceSn, session.getId()); + + // 2. 更新设备状态为离线(业务服务调用) updateDeviceStatus(currentDeviceSn, "0"); + + // 3. 清理该设备关联的响应通道 + cleanDeviceResponseChannels(currentDeviceSn); } - // 2. 清理当前设备对应的响应通道(避免内存泄漏) - responseChannels.entrySet().removeIf(entry -> { - if (entry.getValue().getSn().equals(currentDeviceSn)) { - entry.getValue().getResultFuture().completeExceptionally( - new Exception("设备连接已关闭,响应通道清理") - ); - return true; - } - return false; - }); - + // 4. 重置实例变量 this.session = null; this.currentDeviceSn = null; + this.isRegistered = false; + this.registerFuture = null; } - /** * 连接异常时触发 */ @OnError public void onError(Session session, Throwable throwable) { - log.error("WebSocket连接异常,会话ID: {}", session.getId(), throwable); - // 异常时主动关闭连接 + log.error("连接异常,会话ID: {},异常信息: {}", session.getId(), throwable.getMessage(), throwable); try { if (session.isOpen()) { session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "连接异常")); @@ -157,63 +165,175 @@ public class DeviceWebSocketServer { } } - - // ------------------------------ 核心业务方法 ------------------------------ + // ------------------------------ 消息处理核心逻辑 ------------------------------ /** - * 设备注册(对应Golang的addDevice函数) - * 读取DECLARE消息,解析设备信息并存储 + * 处理注册阶段消息(仅接受DECLARE) */ - private KqjEntity.DeclareMessage registerDevice() throws Exception { - // 阻塞读取第一条消息(DECLARE消息) - String firstMessage = readFirstMessage(); - KqjEntity.DeclareMessage declareMsg = objectMapper.readValue(firstMessage, KqjEntity.DeclareMessage.class); +// private void handleRegisterStage(String message, String cmd) throws Exception { +// // 非DECLARE消息直接拒绝 +// if (!DECLARE.equals(cmd)) { +// log.warn("未注册设备发送非法消息,cmd: {},关闭连接", cmd); +// session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "请先发送注册消息(cmd=declare)")); +// registerFuture.complete(false); // 标记注册失败 +// return; +// } +// +// // 解析DECLARE消息并完成注册 +// KqjEntity.DeclareMessage declareMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class); +// String sn = declareMsg.getSn(); +// if (sn == null || sn.isEmpty()) { +// log.warn("注册消息缺少SN,关闭连接"); +// session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "注册消息缺少sn字段")); +// registerFuture.complete(false); +// return; +// } +// +// // 存储设备信息(IP、端口、Session) +// InetSocketAddress remoteAddr = (InetSocketAddress) session.getUserProperties().get("jakarta.websocket.endpoint.remoteAddress"); +// String ip = remoteAddr.getAddress().getHostAddress(); +// String port = String.valueOf(remoteAddr.getPort()); +// +// KqjEntity.DeviceInfo deviceInfo = new KqjEntity.DeviceInfo(); +// deviceInfo.setIp(ip); +// deviceInfo.setPort(port); +// deviceInfo.setConn(session); // 直接存储Jakarta Session,无类型转换 +// connectedDevices.put(sn, deviceInfo); +// +// // 初始化设备-SN反向映射 +// snToUuids.computeIfAbsent(sn, k -> ConcurrentHashMap.newKeySet()); +// +// // 调用业务服务完成注册 +// registerDeviceToService(sn); +// +// // 注册成功:更新状态 +// this.currentDeviceSn = sn; +// this.isRegistered = true; +// registerFuture.complete(true); // 标记注册成功 +// log.info("设备注册成功,SN: {},IP: {}:{},会话ID: {}", sn, ip, port, session.getId()); +// } - // 校验SN合法性 - String sn = declareMsg.getSn(); - if (sn == null || sn.isEmpty()) { - throw new IllegalArgumentException("设备SN为空,注册失败"); + private void handleRegisterStage(String message, String cmd) throws Exception { + // 非DECLARE消息直接拒绝 + if (!DECLARE.equals(cmd)) { + log.warn("未注册设备发送非法消息,cmd: {},关闭连接", cmd); + session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "请先发送注册消息(cmd=declare)")); + registerFuture.complete(false); // 标记注册失败 + return; } - // 解析客户端IP和端口(对应Golang的parseRemoteAddr) - InetSocketAddress remoteAddr = (InetSocketAddress) session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"); - String ip = remoteAddr.getAddress().getHostAddress(); - String port = String.valueOf(remoteAddr.getPort()); + // 解析DECLARE消息并完成注册 + KqjEntity.DeclareMessage declareMsg; + try { + declareMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class); + } catch (Exception e) { + log.error("解析DECLARE消息失败,原始消息: {}", message, e); + session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "DECLARE消息格式错误")); + registerFuture.complete(false); + return; + } - // 存储设备信息 + String sn = declareMsg.getSn(); + if (sn == null || sn.isEmpty()) { + log.warn("注册消息缺少SN,关闭连接,消息内容: {}", message); + session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "注册消息缺少sn字段")); + registerFuture.complete(false); + return; + } + + // 存储设备信息(IP、端口、Session)- 核心优化:兼容多容器,避免空指针 + String ip = "unknown"; + String port = "unknown"; + InetSocketAddress remoteAddr = null; + + // 尝试多种容器可能的键(优先级:Jakarta标准 -> Undertow专用 -> Javax兼容) + Object addrObj = session.getUserProperties().get("jakarta.websocket.endpoint.remoteAddress"); + if (addrObj == null) { + addrObj = session.getUserProperties().get("io.undertow.websocket.remoteAddress"); // Undertow专用 + } + if (addrObj == null) { + addrObj = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"); // 兼容旧版 + } + + // 校验并解析IP和端口 + if (addrObj instanceof InetSocketAddress) { + remoteAddr = (InetSocketAddress) addrObj; + // 防止getAddress()返回null(极端情况) + if (remoteAddr.getAddress() != null) { + ip = remoteAddr.getAddress().getHostAddress(); + } + port = String.valueOf(remoteAddr.getPort()); + log.info("解析设备地址成功,会话ID: {},IP: {},端口: {}", session.getId(), ip, port); + } else { + // 地址获取失败时,使用默认值,不中断注册流程 + log.warn("无法解析设备地址,会话ID: {},addrObj类型: {}", + session.getId(), addrObj == null ? "null" : addrObj.getClass().getName()); + log.warn("继续注册流程,使用默认地址(unknown)"); + } + + // 创建设备信息(即使IP获取失败,也正常注册) KqjEntity.DeviceInfo deviceInfo = new KqjEntity.DeviceInfo(); deviceInfo.setIp(ip); deviceInfo.setPort(port); - - // ======================== 添加类型检查 ======================== - // 检查session是否能转换为WebSocket,避免强转失败 - if (!(session instanceof WebSocket)) { - throw new IllegalStateException("WebSocket容器不支持Session转WebSocket,无法注册设备,SN: " + sn); - } - // 强转并赋值 - deviceInfo.setConn((WebSocket) session); // 用Session替换Golang的*websocket.Conn - // ============================================================ - - + deviceInfo.setConn(session); connectedDevices.put(sn, deviceInfo); - // 调用业务服务注册设备(对应Golang的service.BusAttendanceMachine().Register) + // 初始化设备-SN反向映射 + snToUuids.computeIfAbsent(sn, k -> ConcurrentHashMap.newKeySet()); + + // 调用业务服务完成注册 registerDeviceToService(sn); - return declareMsg; + // 注册成功:更新状态 + this.currentDeviceSn = sn; + this.isRegistered = true; + registerFuture.complete(true); // 标记注册成功 + log.info("设备注册成功,SN: {},IP: {}:{},会话ID: {}", sn, ip, port, session.getId()); } /** - * 处理PING消息(对应Golang的handlePing函数) + * 处理正常通信阶段消息(PING、TO_CLIENT等) + */ + private void handleNormalStage(String message, String cmd) throws Exception { + switch (cmd) { + case DECLARE: + // 已注册设备发送DECLARE视为在线心跳 + log.info("设备在线心跳,SN: {},会话ID: {}", currentDeviceSn, session.getId()); + break; + + case PING: + // 处理心跳:回复PONG + handlePing(message); + break; + + case TO_CLIENT: + // 处理设备响应消息 + handleToClientResponse(message); + break; + + default: + log.warn("收到未知消息类型,SN: {},cmd: {},内容: {}", currentDeviceSn, cmd, message); + } + } + + // ------------------------------ 业务子逻辑 ------------------------------ + + /** + * 处理PING心跳消息(回复PONG) */ private void handlePing(String message) throws Exception { + // 解析PING消息中的SN(双重校验) KqjEntity.DeclareMessage pingMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class); String sn = pingMsg.getSn(); + if (!currentDeviceSn.equals(sn)) { + log.warn("心跳消息SN不匹配,当前SN: {},消息SN: {}", currentDeviceSn, sn); + return; + } - // 1. 回复PONG消息 + // 构造PONG响应 KqjEntity.PongMessage pongMsg = new KqjEntity.PongMessage(); - pongMsg.setCmd(Constants.PING); + pongMsg.setCmd("pong"); pongMsg.setFrom("server"); pongMsg.setTo(sn); @@ -223,247 +343,185 @@ public class DeviceWebSocketServer { // 发送PONG消息 session.getBasicRemote().sendText(objectMapper.writeValueAsString(pongMsg)); - log.info("发送PONG消息给设备,SN: {}", sn); + log.info("发送PONG消息,SN: {},会话ID: {}", sn, session.getId()); - // 2. 更新设备连接状态(对应Golang的connectedDevices[sn]重设) - if (connectedDevices.containsKey(sn)) { - KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn); - deviceInfo.setConn((WebSocket) session); - connectedDevices.put(sn, deviceInfo); - } - - // 3. 调用业务服务更新设备状态(对应Golang的service.BusAttendanceMachine().Register) + // 更新设备连接状态(业务服务调用) registerDeviceToService(sn); } - /** - * 处理TO_CLIENT响应消息(对应Golang的requestResponse函数) + * 处理TO_CLIENT响应消息(分发到对应的UUID通道) */ private void handleToClientResponse(String message) throws Exception { - log.info("收到TO_CLIENT响应消息: {}", message); KqjEntity.CommonResponse commonResp = objectMapper.readValue(message, KqjEntity.CommonResponse.class); - - // 根据UUID查找响应通道,传递响应结果 String uuid = commonResp.getTo(); - ResponseHolder holder = responseChannels.get(uuid); + if (uuid == null || uuid.isEmpty()) { + log.warn("响应消息缺少UUID(to字段),SN: {},内容: {}", currentDeviceSn, message); + return; + } + + // 查找响应通道并分发结果 + ResponseHolder holder = responseChannels.get(uuid); if (holder != null) { holder.getResultFuture().complete(commonResp); - responseChannels.remove(uuid); // 移除已完成的通道 - log.info("响应已分发到UUID: {}, 响应内容: {}", uuid, commonResp); + responseChannels.remove(uuid); + snToUuids.get(currentDeviceSn).remove(uuid); // 清理反向映射 + log.info("响应分发成功,UUID: {},SN: {}", uuid, currentDeviceSn); } else { - log.warn("未找到UUID: {}对应的响应通道,响应丢弃", uuid); + log.warn("未找到UUID对应的响应通道,UUID: {},SN: {}", uuid, currentDeviceSn); } } + // ------------------------------ 业务服务调用(需替换为真实实现) ------------------------------ + + /** + * 设备注册到业务服务(对应原Golang的service.BusAttendanceMachine().Register) + */ + private void registerDeviceToService(String sn) { + try { + // TODO: 替换为真实的业务服务调用(如Spring Bean注入后调用) + log.info("【业务服务】设备注册,SN: {}", sn); + // 示例:BusAttendanceMachineService.register(sn); + } catch (Exception e) { + log.error("【业务服务】设备注册失败,SN: {}", sn, e); + } + } + + /** + * 更新设备状态(对应原Golang的service.BusAttendanceMachine().Change) + */ + private void updateDeviceStatus(String sn, String status) { + try { + // TODO: 替换为真实的业务服务调用 + log.info("【业务服务】更新设备状态,SN: {},状态: {}", sn, status); + // 示例:BusAttendanceMachineService.changeStatus(sn, status); + } catch (Exception e) { + log.error("【业务服务】更新设备状态失败,SN: {}", sn, e); + } + } // ------------------------------ 工具方法 ------------------------------ /** - * 读取设备第一条消息(阻塞直到收到消息) - */ - private String readFirstMessage() throws Exception { - // 使用CompletableFuture等待消息 - final java.util.concurrent.CompletableFuture firstMsgFuture = new java.util.concurrent.CompletableFuture<>(); - // 使用AtomicReference包装临时处理器,解决未初始化问题 - final java.util.concurrent.atomic.AtomicReference> tempHandlerRef = new java.util.concurrent.atomic.AtomicReference<>(); - - // 定义临时消息处理器 - MessageHandler.Whole tempHandler = msg -> { - if (!firstMsgFuture.isDone()) { - firstMsgFuture.complete(msg); - // 从引用中获取处理器并移除 - session.removeMessageHandler(tempHandlerRef.get()); - } - }; - // 将处理器存入引用 - tempHandlerRef.set(tempHandler); - // 注册处理器 - session.addMessageHandler(tempHandler); - - // 等待消息,超时10秒 - return firstMsgFuture.get(10, TimeUnit.SECONDS); -// // 使用Java并发工具等待消息(模拟Golang的阻塞读取) -// final java.util.concurrent.CompletableFuture firstMsgFuture = new java.util.concurrent.CompletableFuture<>(); -// -// // 临时注册消息处理器,读取第一条消息后移除 -// MessageHandler.Whole tempHandler = msg -> { -// if (!firstMsgFuture.isDone()) { -// firstMsgFuture.complete(msg); -// // 移除临时处理器(避免重复处理) -// session.removeMessageHandler(tempHandler); -// } -// }; -// session.addMessageHandler(tempHandler); -// -// // 等待消息,超时10秒(防止设备一直不发消息) -// return firstMsgFuture.get(10, TimeUnit.SECONDS); - } - - - /** - * 发送消息给指定设备(对应Golang的sendMessageToDevice函数) + * 发送消息给指定设备(对应原Golang的sendMessageToDevice) */ public static boolean sendMessageToDevice(String sn, String uuid, Object message) { // 1. 检查设备是否在线 KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn); if (deviceInfo == null) { - log.warn("设备不存在,SN: {}", sn); + log.warn("发送消息失败:设备不存在,SN: {}", sn); responseChannels.remove(uuid); return false; } - // 2. 将WebSocket转回Session(因为状态由Session管理) - WebSocket webSocket = deviceInfo.getConn(); - if (!(webSocket instanceof Session)) { - log.warn("设备连接类型错误,无法判断状态,SN: {}", sn); - responseChannels.remove(uuid); - return false; - } - Session session = (Session) webSocket; - - // 3. 检查连接是否打开 - if (!session.isOpen()) { - log.warn("设备连接已关闭,SN: {}", sn); - connectedDevices.remove(sn); // 移除已关闭的设备 + // 2. 检查Session是否有效 + Session session = deviceInfo.getConn(); + if (session == null || !session.isOpen()) { + log.warn("发送消息失败:设备连接已关闭,SN: {}", sn); + connectedDevices.remove(sn); + snToUuids.remove(sn); responseChannels.remove(uuid); return false; } try { - // 4. 序列化消息并发送 + // 3. 序列化并发送消息 String msgJson = objectMapper.writeValueAsString(message); - session.getBasicRemote().sendText(msgJson); // 通过Session发送消息 - log.info("发送消息给设备,SN: {}, UUID: {}, 消息: {}", sn, uuid, msgJson); + session.getBasicRemote().sendText(msgJson); + // 关联UUID和设备SN(用于后续清理) + snToUuids.get(sn).add(uuid); + log.info("发送消息成功,SN: {},UUID: {},消息: {}", sn, uuid, msgJson); return true; - } catch (Exception e) { - log.error("发送消息失败,SN: {}, UUID: {}", sn, uuid, e); - // 发送失败时移除设备(可能连接已异常) + log.error("发送消息失败,SN: {},UUID: {}", sn, uuid, e); + // 清理无效资源 connectedDevices.remove(sn); + snToUuids.remove(sn); responseChannels.remove(uuid); return false; } -// // 1. 检查设备是否在线 -// KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn); -// if (deviceInfo == null || !deviceInfo.getSession().isOpen()) { -// log.warn("设备不在线,SN: {}", sn); -// // 清理无效的响应通道 -// responseChannels.remove(uuid); -// return false; -// } -// -// try { -// // 2. 序列化消息并发送 -// String msgJson = objectMapper.writeValueAsString(message); -// deviceInfo.getSession().getBasicRemote().sendText(msgJson); -// log.info("发送消息给设备,SN: {}, UUID: {}, 消息: {}", sn, uuid, msgJson); -// return true; -// -// } catch (Exception e) { -// log.error("发送消息失败,SN: {}, UUID: {}", sn, uuid, e); -// // 清理异常的响应通道 -// responseChannels.remove(uuid); -// return false; -// } } - /** - * 发送请求并等待响应(对应Golang的SendRequestAndWaitResponse函数) + * 发送请求并等待响应(对应原Golang的SendRequestAndWaitResponse) */ public static KqjEntity.CommonResponse sendRequestAndWaitResponse(String sn, String uuid, Object payload) throws Exception { - // 1. 创建响应结果容器 - ResponseHolder responseHolder = new ResponseHolder<>(sn); - responseChannels.put(uuid, responseHolder); + // 1. 创建响应容器并注册 + ResponseHolder holder = new ResponseHolder(sn); + responseChannels.put(uuid, holder); try { // 2. 发送请求 boolean sendSuccess = sendMessageToDevice(sn, uuid, payload); if (!sendSuccess) { - throw new Exception("发送请求失败,设备不在线或发送异常,SN: " + sn); + throw new Exception("发送请求失败,设备不在线或连接异常,SN: " + sn); } - // 3. 等待响应,超时10秒 - return responseHolder.getResultFuture().get(10, TimeUnit.SECONDS); - - } catch (java.util.concurrent.TimeoutException e) { - log.error("等待响应超时,SN: {}, UUID: {}", sn, uuid); + // 3. 等待响应(10秒超时) + return holder.getResultFuture().get(10, TimeUnit.SECONDS); + } catch (TimeoutException e) { + log.error("等待响应超时,SN: {},UUID: {}", sn, uuid); responseChannels.remove(uuid); - throw new Exception("等待响应超时(10秒)", e); - + snToUuids.get(sn).remove(uuid); + throw new Exception("等待响应超时(10秒),SN: " + sn, e); } finally { - // 清理响应通道(防止内存泄漏) + // 4. 清理响应通道(防止内存泄漏) responseChannels.remove(uuid); + if (snToUuids.containsKey(sn)) { + snToUuids.get(sn).remove(uuid); + } } } - /** - * 生成带6位随机数的UUID(对应Golang的GenerateUUIDWithSixRandomDigits函数) + * 生成带6位随机数的UUID(对应原Golang的GenerateUUIDWithSixRandomDigits) */ public static String generateUUIDWithSixRandomDigits() { - // 生成标准UUID String uuidStr = UUID.randomUUID().toString().replace("-", ""); - // 生成6位随机数(100000-999999) Random random = new Random(); - int randomNum = random.nextInt(900000) + 100000; - // 拼接返回 + int randomNum = random.nextInt(900000) + 100000; // 6位随机数(100000-999999) return uuidStr + "-" + randomNum; } - - // ------------------------------ 业务服务调用(模拟Golang的service层) ------------------------------ - /** - * 注册设备到业务服务(对应Golang的service.BusAttendanceMachine().Register) - * 实际项目中替换为真实的Service调用 + * 清理设备关联的所有响应通道(连接关闭时调用) */ - private void registerDeviceToService(String sn) { - try { - // 模拟业务服务调用(如更新数据库设备状态为在线) - log.info("调用业务服务注册设备,SN: {}", sn); - // TODO: 替换为真实的Service代码(如Spring Bean调用) - } catch (Exception e) { - log.error("业务服务注册设备失败,SN: {}", sn, e); + private void cleanDeviceResponseChannels(String sn) { + Set uuids = snToUuids.remove(sn); + if (uuids == null || uuids.isEmpty()) { + return; } - } - - /** - * 更新设备状态(对应Golang的service.BusAttendanceMachine().Change) - * 实际项目中替换为真实的Service调用 - */ - private void updateDeviceStatus(String sn, String status) { - try { - log.info("调用业务服务更新设备状态,SN: {}, 状态: {}", sn, status); - // TODO: 替换为真实的Service代码(如Spring Bean调用) - } catch (Exception e) { - log.error("业务服务更新设备状态失败,SN: {}", sn, e); + for (String uuid : uuids) { + ResponseHolder holder = responseChannels.remove(uuid); + if (holder != null) { + holder.getResultFuture().completeExceptionally( + new Exception("设备已离线,响应通道已清理,SN: " + sn) + ); + } } + log.info("清理设备响应通道,SN: {},共清理UUID数量: {}", sn, uuids.size()); } - // ------------------------------ 内部辅助类 ------------------------------ /** - * 响应结果容器(对应Golang的chan CommonResponse) - * 用CompletableFuture实现异步结果等待 + * 响应结果容器(替代Golang的chan,用CompletableFuture实现异步等待) */ - private static class ResponseHolder { + private static class ResponseHolder { private final String sn; // 关联的设备SN - private final java.util.concurrent.CompletableFuture resultFuture; + private final CompletableFuture resultFuture; public ResponseHolder(String sn) { this.sn = sn; - this.resultFuture = new java.util.concurrent.CompletableFuture<>(); + this.resultFuture = new CompletableFuture<>(); } public String getSn() { return sn; } - public java.util.concurrent.CompletableFuture getResultFuture() { + public CompletableFuture getResultFuture() { return resultFuture; } } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/KqjEntity.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/KqjEntity.java index e505dd7a..d4a09145 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/KqjEntity.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/KqjEntity.java @@ -1,21 +1,20 @@ package org.dromara.mobileAttendanceMachine; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.websocket.Session; import lombok.AllArgsConstructor; import lombok.Data; -import java.net.http.WebSocket; - /** - * @Author 铁憨憨 - * @Date 2025/10/14 14:47 - * @Version 1.0 + * 实体类:WebSocket消息结构和设备信息 */ - @Data @AllArgsConstructor public class KqjEntity { + /** + * 设备连接信息(修正conn为Jakarta Session) + */ @Data public static class DeviceInfo { @JsonProperty("ip") @@ -25,14 +24,15 @@ public class KqjEntity { private String port; @JsonProperty("conn") - private WebSocket conn; + private Session conn; // 关键修正:使用Jakarta WebSocket的Session } + // ------------------------------ 以下为原有实体类,保持不变 ------------------------------ /** * 通用消息结构,用于解析初始的 cmd 字段 */ @Data - public class GenericMessage { + public static class GenericMessage { @JsonProperty("cmd") private String cmd; } @@ -41,7 +41,7 @@ public class KqjEntity { * 公共响应结构 */ @Data - public class CommonResponse { + public static class CommonResponse { @JsonProperty("cmd") private String cmd; @@ -56,12 +56,12 @@ public class KqjEntity { } @Data - public class CommonResponseData { + public static class CommonResponseData { @JsonProperty("cmd") private String cmd; @JsonProperty("userIds") - private String[] userIds; // 用户IDS + private String[] userIds; @JsonProperty("user_id") private String userId; @@ -77,7 +77,7 @@ public class KqjEntity { } @Data - public class DelMultiUserData { + public static class DelMultiUserData { @JsonProperty("user_id") private String userId; @@ -88,12 +88,11 @@ public class KqjEntity { private String msg; } - /** * 设备上线消息 */ @Data - public class DeclareMessage { + public static class DeclareMessage { @JsonProperty("cmd") private String cmd; @@ -134,7 +133,6 @@ public class KqjEntity { private PongMessageData data; } - /** * 心跳回复消息结构 */ @@ -150,13 +148,13 @@ public class KqjEntity { @Data public class PeopleInformation { @JsonProperty("cmd") - private String cmd; // 该接口固定为to_device + private String cmd; @JsonProperty("from") - private String from; // 可不填写,填写uuid来做为发送请求或响应的标识 + private String from; @JsonProperty("to") - private String to; // 设备号(请查看公共设置中的设备号) + private String to; @JsonProperty("data") private PeopleInData data; @@ -174,10 +172,10 @@ public class KqjEntity { private String name; @JsonProperty("face_template") - private String faceTemplate; // http 链接图 + private String faceTemplate; @JsonProperty("id_valid") - private String idValid; // 人员有效期(人员在这个时间点后,无法通行)格式:yyyy-MM-dd 或者 yyyy-MM-dd HH:mm,为 "" 则为永久 + private String idValid; } /** @@ -207,7 +205,7 @@ public class KqjEntity { private String userId; @JsonProperty("user_type") - private int userType; // 删除的用户类型:0-人脸接口下发的数据 1-人证比对接口下发的数据 + private int userType; } /** @@ -237,7 +235,7 @@ public class KqjEntity { private String[] userIds; @JsonProperty("user_type") - private int userType; // 删除的用户类型:0-人脸接口下发的数据 1-人证比对接口下发的数据 + private int userType; } /** @@ -264,7 +262,7 @@ public class KqjEntity { private String cmd; @JsonProperty("user_type") - private int userType; // 删除的用户类型:0-人脸接口下发的数据 1-人证比对接口下发的数据 + private int userType; } /** @@ -273,13 +271,13 @@ public class KqjEntity { @Data public class PersonnelInformationAcquisition { @JsonProperty("cmd") - private String cmd; // 该接口固定为to_device + private String cmd; @JsonProperty("from") - private String from; // 可不填写,填写uuid来做为发送请求或响应的标识 + private String from; @JsonProperty("to") - private String to; // 设备号(请查看公共设置中的设备号) + private String to; @JsonProperty("data") private PersonnelInformationAcquisitionTwo data; @@ -293,5 +291,4 @@ public class KqjEntity { @JsonProperty("value") private int value; } - } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/WebSocketConfig.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/WebSocketConfig.java index 7f2f5d36..d44992e9 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/WebSocketConfig.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/mobileAttendanceMachine/WebSocketConfig.java @@ -1,22 +1,13 @@ package org.dromara.mobileAttendanceMachine; + import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; -/** - * @Author 铁憨憨 - * @Date 2025/10/14 16:11 - * @Version 1.0 - * - * 系统启动就会开启ws - */ + @Configuration public class WebSocketConfig { - - /** - * 自动注册所有标注了@ServerEndpoint的WebSocket端点 - */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter();