解决ws不兼容

This commit is contained in:
2025-10-14 23:21:23 +08:00
parent b9507e1fd7
commit e87cbce77a
5 changed files with 373 additions and 322 deletions

View File

@ -13,6 +13,7 @@ import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* WebSocket 配置

View File

@ -19,11 +19,15 @@
<dependencies>
<!-- Java WebSocket 标准API -->
<!-- <dependency>-->
<!-- <groupId>javax.websocket</groupId>-->
<!-- <artifactId>javax.websocket-api</artifactId>-->
<!-- <version>1.1</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
<scope>provided</scope>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- <dependency>-->

View File

@ -1,153 +1,161 @@
package org.dromara.mobileAttendanceMachine;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.http.WebSocket;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/**
* WebSocket服务端实现对应Golang的HandleWebSocket逻辑
* ServerEndpoint注解指定WebSocket连接路径
* WebSocket服务端(设备连接管理、消息处理
* 核心逻辑:通过状态标记区分设备注册阶段和正常通信阶段,避免多处理器冲突
*/
@ServerEndpoint("/ws/device")
@ServerEndpoint("/custom-ws/device")
@Component
@Log4j2
public class DeviceWebSocketServer {
// ------------------------------ 常量定义对应Golang的const ------------------------------
public static class Constants {
public static final String DECLARE = "declare"; // 设备初上线
public static final String PING = "ping"; // 心跳
public static final String TO_CLIENT = "to_client"; // 服务器消息下发到客户端的响应
}
// ------------------------------ 常量定义 ------------------------------
public static final String DECLARE = "declare"; // 设备注册消息
public static final String PING = "ping"; // 心跳消息
public static final String TO_CLIENT = "to_client"; // 设备响应消息
private static final int REGISTER_TIMEOUT = 10; // 注册超时时间(秒)
// JSON序列化/反序列化工具(单例)
private static final ObjectMapper objectMapper = new ObjectMapper();
// 1. 存储所有连接的设备信息(key: 设备SNvalue: 设备信息)
// ------------------------------ 全局静态存储 ------------------------------
// JSON序列化工具单例
private static final ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);;
// 设备连接池:key=设备SNvalue=设备信息含Session
private static final Map<String, KqjEntity.DeviceInfo> connectedDevices = new ConcurrentHashMap<>();
// 2. 存储UUID对应的响应通道key: UUIDvalue: 响应结果容器
private static final Map<String, ResponseHolder<KqjEntity.CommonResponse>> responseChannels = new ConcurrentHashMap<>();
// 响应通道key=UUIDvalue=响应结果容器
private static final Map<String, ResponseHolder> responseChannels = new ConcurrentHashMap<>();
// 设备-SN反向映射key=设备SNvalue=关联的UUID列表用于连接关闭时清理
private static final Map<String, Set<String>> snToUuids = new ConcurrentHashMap<>();
// 当前连接的WebSocket会话
private Session session;
// 当前连接的设备SN连接建立后从DECLARE消息中提取
private String currentDeviceSn;
// ------------------------------ 每个连接的实例变量 ------------------------------
private Session session; // 当前WebSocket会话
private String currentDeviceSn; // 当前连接的设备SN注册后赋值
private volatile boolean isRegistered; // 注册状态false=未注册true=已注册
private CompletableFuture<Boolean> registerFuture; // 注册阶段的消息Future
// ------------------------------ 连接生命周期方法 ------------------------------
/**
* 连接建立时触发(对应Golang中upgrader.Upgrade后的初始化逻辑
* 连接建立时触发(初始化状态
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
log.info("新的WebSocket连接建立会话ID: {}", session.getId());
this.isRegistered = false;
this.registerFuture = new CompletableFuture<>(); // 初始化注册Future
log.info("新连接建立会话ID: {},等待设备注册({}秒超时)...", session.getId(), REGISTER_TIMEOUT);
// 启动注册超时监听
CompletableFuture.runAsync(() -> {
try {
// 等待注册消息,超时则关闭连接
if (!registerFuture.get(REGISTER_TIMEOUT, TimeUnit.SECONDS)) {
throw new TimeoutException("注册超时");
}
} catch (Exception e) {
log.error("设备注册超时/失败会话ID: {}", session.getId(), e);
try {
if (session.isOpen()) {
session.close(new CloseReason(CloseReason.CloseCodes.TRY_AGAIN_LATER, "注册超时请发送DECLARE消息"));
}
} catch (IOException ex) {
log.error("关闭超时连接失败", ex);
}
}
});
}
/**
* 接收消息时触发(所有消息统一处理,按状态区分)
*/
@OnMessage
public void onMessage(String message, Session session) {
if (message == null || message.isEmpty()) {
log.warn("会话ID: {} 收到空消息,忽略", session.getId());
return;
}
try {
// 读取设备第一条消息DECLARE消息完成设备注册
KqjEntity.DeclareMessage declareMsg = registerDevice();
this.currentDeviceSn = declareMsg.getSn();
log.info("设备注册成功SN: {}, 设备信息: {}", currentDeviceSn, declareMsg);
// 解析通用消息的CMD所有消息必须包含CMD
KqjEntity.GenericMessage genericMsg = objectMapper.readValue(message, KqjEntity.GenericMessage.class);
String cmd = genericMsg.getCmd();
if (cmd == null) {
log.warn("会话ID: {} 收到无CMD消息关闭连接: {}", session.getId(), message);
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "消息缺少cmd字段"));
return;
}
// 未注册状态仅处理DECLARE注册消息
if (!isRegistered) {
handleRegisterStage(message, cmd);
return;
}
// 已注册状态:处理正常业务消息
handleNormalStage(message, cmd);
} catch (Exception e) {
log.error("设备注册失败,关闭连接", e);
log.error("会话ID: {} 处理消息失败: {}", session.getId(), message, e);
try {
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "设备注册失败"));
if (session.isOpen()) {
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "消息处理异常"));
}
} catch (IOException ex) {
log.error("关闭异常连接失败", ex);
}
}
}
/**
* 接收客户端消息时触发对应Golang的for循环读取消息逻辑
*/
@OnMessage
public void onMessage(String message, Session session) {
if (message == null || message.isEmpty()) {
log.warn("收到空消息,忽略处理");
return;
}
try {
// 先解析通用消息的CMD字段对应Golang的GenericMessage
KqjEntity.GenericMessage genericMsg = objectMapper.readValue(message, KqjEntity.GenericMessage.class);
String cmd = genericMsg.getCmd();
if (cmd == null) {
log.warn("收到无CMD字段的消息忽略: {}", message);
return;
}
// 根据CMD类型处理不同逻辑对应Golang的switch case
switch (cmd) {
case Constants.DECLARE:
log.info("设备在线心跳DECLARESN: {}", currentDeviceSn);
break;
case Constants.PING:
handlePing(message);
break;
case Constants.TO_CLIENT:
handleToClientResponse(message);
break;
default:
log.warn("收到未知CMD消息类型: {}, 内容: {}", cmd, message);
}
} catch (Exception e) {
log.error("处理消息失败,消息内容: {}", message, e);
}
}
/**
* 连接关闭时触发对应Golang的defer conn.Close()和资源清理逻辑)
* 连接关闭时触发(清理资源
*/
@OnClose
public void onClose(Session session, CloseReason closeReason) {
log.info("WebSocket连接关闭会话ID: {}, 原因: {}", session.getId(), closeReason);
log.info("连接关闭会话ID: {}原因: {}", session.getId(), closeReason);
// 1. 移除设备连接信息
if (currentDeviceSn != null) {
// 1. 移除设备连接(若已注册)
if (currentDeviceSn != null && connectedDevices.containsKey(currentDeviceSn)) {
connectedDevices.remove(currentDeviceSn);
// 更新设备状态为离线对应Golang的service.BusAttendanceMachine().Change
log.info("设备离线SN: {}会话ID: {}", currentDeviceSn, session.getId());
// 2. 更新设备状态为离线(业务服务调用)
updateDeviceStatus(currentDeviceSn, "0");
// 3. 清理该设备关联的响应通道
cleanDeviceResponseChannels(currentDeviceSn);
}
// 2. 清理当前设备对应的响应通道(避免内存泄漏)
responseChannels.entrySet().removeIf(entry -> {
if (entry.getValue().getSn().equals(currentDeviceSn)) {
entry.getValue().getResultFuture().completeExceptionally(
new Exception("设备连接已关闭,响应通道清理")
);
return true;
}
return false;
});
// 4. 重置实例变量
this.session = null;
this.currentDeviceSn = null;
this.isRegistered = false;
this.registerFuture = null;
}
/**
* 连接异常时触发
*/
@OnError
public void onError(Session session, Throwable throwable) {
log.error("WebSocket连接异常会话ID: {}", session.getId(), throwable);
// 异常时主动关闭连接
log.error("连接异常会话ID: {},异常信息: {}", session.getId(), throwable.getMessage(), throwable);
try {
if (session.isOpen()) {
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "连接异常"));
@ -157,63 +165,175 @@ public class DeviceWebSocketServer {
}
}
// ------------------------------ 核心业务方法 ------------------------------
// ------------------------------ 消息处理核心逻辑 ------------------------------
/**
* 设备注册对应Golang的addDevice函数
* 读取DECLARE消息解析设备信息并存储
* 处理注册阶段消息仅接受DECLARE
*/
private KqjEntity.DeclareMessage registerDevice() throws Exception {
// 阻塞读取第一条消息(DECLARE消息
String firstMessage = readFirstMessage();
KqjEntity.DeclareMessage declareMsg = objectMapper.readValue(firstMessage, KqjEntity.DeclareMessage.class);
// private void handleRegisterStage(String message, String cmd) throws Exception {
// // DECLARE消息直接拒绝
// if (!DECLARE.equals(cmd)) {
// log.warn("未注册设备发送非法消息cmd: {},关闭连接", cmd);
// session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "请先发送注册消息cmd=declare"));
// registerFuture.complete(false); // 标记注册失败
// return;
// }
//
// // 解析DECLARE消息并完成注册
// KqjEntity.DeclareMessage declareMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class);
// String sn = declareMsg.getSn();
// if (sn == null || sn.isEmpty()) {
// log.warn("注册消息缺少SN关闭连接");
// session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "注册消息缺少sn字段"));
// registerFuture.complete(false);
// return;
// }
//
// // 存储设备信息IP、端口、Session
// InetSocketAddress remoteAddr = (InetSocketAddress) session.getUserProperties().get("jakarta.websocket.endpoint.remoteAddress");
// String ip = remoteAddr.getAddress().getHostAddress();
// String port = String.valueOf(remoteAddr.getPort());
//
// KqjEntity.DeviceInfo deviceInfo = new KqjEntity.DeviceInfo();
// deviceInfo.setIp(ip);
// deviceInfo.setPort(port);
// deviceInfo.setConn(session); // 直接存储Jakarta Session无类型转换
// connectedDevices.put(sn, deviceInfo);
//
// // 初始化设备-SN反向映射
// snToUuids.computeIfAbsent(sn, k -> ConcurrentHashMap.newKeySet());
//
// // 调用业务服务完成注册
// registerDeviceToService(sn);
//
// // 注册成功:更新状态
// this.currentDeviceSn = sn;
// this.isRegistered = true;
// registerFuture.complete(true); // 标记注册成功
// log.info("设备注册成功SN: {}IP: {}:{}会话ID: {}", sn, ip, port, session.getId());
// }
// 校验SN合法性
String sn = declareMsg.getSn();
if (sn == null || sn.isEmpty()) {
throw new IllegalArgumentException("设备SN为空注册失败");
private void handleRegisterStage(String message, String cmd) throws Exception {
// 非DECLARE消息直接拒绝
if (!DECLARE.equals(cmd)) {
log.warn("未注册设备发送非法消息cmd: {},关闭连接", cmd);
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "请先发送注册消息cmd=declare"));
registerFuture.complete(false); // 标记注册失败
return;
}
// 解析客户端IP和端口对应Golang的parseRemoteAddr
InetSocketAddress remoteAddr = (InetSocketAddress) session.getUserProperties().get("javax.websocket.endpoint.remoteAddress");
String ip = remoteAddr.getAddress().getHostAddress();
String port = String.valueOf(remoteAddr.getPort());
// 解析DECLARE消息并完成注册
KqjEntity.DeclareMessage declareMsg;
try {
declareMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class);
} catch (Exception e) {
log.error("解析DECLARE消息失败原始消息: {}", message, e);
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "DECLARE消息格式错误"));
registerFuture.complete(false);
return;
}
// 存储设备信息
String sn = declareMsg.getSn();
if (sn == null || sn.isEmpty()) {
log.warn("注册消息缺少SN关闭连接消息内容: {}", message);
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "注册消息缺少sn字段"));
registerFuture.complete(false);
return;
}
// 存储设备信息IP、端口、Session- 核心优化:兼容多容器,避免空指针
String ip = "unknown";
String port = "unknown";
InetSocketAddress remoteAddr = null;
// 尝试多种容器可能的键优先级Jakarta标准 -> Undertow专用 -> Javax兼容
Object addrObj = session.getUserProperties().get("jakarta.websocket.endpoint.remoteAddress");
if (addrObj == null) {
addrObj = session.getUserProperties().get("io.undertow.websocket.remoteAddress"); // Undertow专用
}
if (addrObj == null) {
addrObj = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"); // 兼容旧版
}
// 校验并解析IP和端口
if (addrObj instanceof InetSocketAddress) {
remoteAddr = (InetSocketAddress) addrObj;
// 防止getAddress()返回null极端情况
if (remoteAddr.getAddress() != null) {
ip = remoteAddr.getAddress().getHostAddress();
}
port = String.valueOf(remoteAddr.getPort());
log.info("解析设备地址成功会话ID: {}IP: {},端口: {}", session.getId(), ip, port);
} else {
// 地址获取失败时,使用默认值,不中断注册流程
log.warn("无法解析设备地址会话ID: {}addrObj类型: {}",
session.getId(), addrObj == null ? "null" : addrObj.getClass().getName());
log.warn("继续注册流程使用默认地址unknown");
}
// 创建设备信息即使IP获取失败也正常注册
KqjEntity.DeviceInfo deviceInfo = new KqjEntity.DeviceInfo();
deviceInfo.setIp(ip);
deviceInfo.setPort(port);
// ======================== 添加类型检查 ========================
// 检查session是否能转换为WebSocket避免强转失败
if (!(session instanceof WebSocket)) {
throw new IllegalStateException("WebSocket容器不支持Session转WebSocket无法注册设备SN: " + sn);
}
// 强转并赋值
deviceInfo.setConn((WebSocket) session); // 用Session替换Golang的*websocket.Conn
// ============================================================
deviceInfo.setConn(session);
connectedDevices.put(sn, deviceInfo);
// 调用业务服务注册设备对应Golang的service.BusAttendanceMachine().Register
// 初始化设备-SN反向映射
snToUuids.computeIfAbsent(sn, k -> ConcurrentHashMap.newKeySet());
// 调用业务服务完成注册
registerDeviceToService(sn);
return declareMsg;
// 注册成功:更新状态
this.currentDeviceSn = sn;
this.isRegistered = true;
registerFuture.complete(true); // 标记注册成功
log.info("设备注册成功SN: {}IP: {}:{}会话ID: {}", sn, ip, port, session.getId());
}
/**
* 处理PING消息对应Golang的handlePing函数
* 处理正常通信阶段消息PING、TO_CLIENT等
*/
private void handleNormalStage(String message, String cmd) throws Exception {
switch (cmd) {
case DECLARE:
// 已注册设备发送DECLARE视为在线心跳
log.info("设备在线心跳SN: {}会话ID: {}", currentDeviceSn, session.getId());
break;
case PING:
// 处理心跳回复PONG
handlePing(message);
break;
case TO_CLIENT:
// 处理设备响应消息
handleToClientResponse(message);
break;
default:
log.warn("收到未知消息类型SN: {}cmd: {},内容: {}", currentDeviceSn, cmd, message);
}
}
// ------------------------------ 业务子逻辑 ------------------------------
/**
* 处理PING心跳消息回复PONG
*/
private void handlePing(String message) throws Exception {
// 解析PING消息中的SN双重校验
KqjEntity.DeclareMessage pingMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class);
String sn = pingMsg.getSn();
if (!currentDeviceSn.equals(sn)) {
log.warn("心跳消息SN不匹配当前SN: {}消息SN: {}", currentDeviceSn, sn);
return;
}
// 1. 回复PONG消息
// 构造PONG响应
KqjEntity.PongMessage pongMsg = new KqjEntity.PongMessage();
pongMsg.setCmd(Constants.PING);
pongMsg.setCmd("pong");
pongMsg.setFrom("server");
pongMsg.setTo(sn);
@ -223,247 +343,185 @@ public class DeviceWebSocketServer {
// 发送PONG消息
session.getBasicRemote().sendText(objectMapper.writeValueAsString(pongMsg));
log.info("发送PONG消息给设备SN: {}", sn);
log.info("发送PONG消息SN: {}会话ID: {}", sn, session.getId());
// 2. 更新设备连接状态(对应Golang的connectedDevices[sn]重设
if (connectedDevices.containsKey(sn)) {
KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn);
deviceInfo.setConn((WebSocket) session);
connectedDevices.put(sn, deviceInfo);
}
// 3. 调用业务服务更新设备状态对应Golang的service.BusAttendanceMachine().Register
// 更新设备连接状态(业务服务调用
registerDeviceToService(sn);
}
/**
* 处理TO_CLIENT响应消息对应Golang的requestResponse函数
* 处理TO_CLIENT响应消息分发到对应的UUID通道
*/
private void handleToClientResponse(String message) throws Exception {
log.info("收到TO_CLIENT响应消息: {}", message);
KqjEntity.CommonResponse commonResp = objectMapper.readValue(message, KqjEntity.CommonResponse.class);
// 根据UUID查找响应通道传递响应结果
String uuid = commonResp.getTo();
ResponseHolder<KqjEntity.CommonResponse> holder = responseChannels.get(uuid);
if (uuid == null || uuid.isEmpty()) {
log.warn("响应消息缺少UUIDto字段SN: {},内容: {}", currentDeviceSn, message);
return;
}
// 查找响应通道并分发结果
ResponseHolder holder = responseChannels.get(uuid);
if (holder != null) {
holder.getResultFuture().complete(commonResp);
responseChannels.remove(uuid); // 移除已完成的通道
log.info("响应已分发到UUID: {}, 响应内容: {}", uuid, commonResp);
responseChannels.remove(uuid);
snToUuids.get(currentDeviceSn).remove(uuid); // 清理反向映射
log.info("响应分发成功UUID: {}SN: {}", uuid, currentDeviceSn);
} else {
log.warn("未找到UUID: {}对应的响应通道,响应丢弃", uuid);
log.warn("未找到UUID对应的响应通道UUID: {}SN: {}", uuid, currentDeviceSn);
}
}
// ------------------------------ 业务服务调用(需替换为真实实现) ------------------------------
/**
* 设备注册到业务服务对应原Golang的service.BusAttendanceMachine().Register
*/
private void registerDeviceToService(String sn) {
try {
// TODO: 替换为真实的业务服务调用如Spring Bean注入后调用
log.info("【业务服务】设备注册SN: {}", sn);
// 示例BusAttendanceMachineService.register(sn);
} catch (Exception e) {
log.error("【业务服务】设备注册失败SN: {}", sn, e);
}
}
/**
* 更新设备状态对应原Golang的service.BusAttendanceMachine().Change
*/
private void updateDeviceStatus(String sn, String status) {
try {
// TODO: 替换为真实的业务服务调用
log.info("【业务服务】更新设备状态SN: {},状态: {}", sn, status);
// 示例BusAttendanceMachineService.changeStatus(sn, status);
} catch (Exception e) {
log.error("【业务服务】更新设备状态失败SN: {}", sn, e);
}
}
// ------------------------------ 工具方法 ------------------------------
/**
* 读取设备第一条消息(阻塞直到收到消息
*/
private String readFirstMessage() throws Exception {
// 使用CompletableFuture等待消息
final java.util.concurrent.CompletableFuture<String> firstMsgFuture = new java.util.concurrent.CompletableFuture<>();
// 使用AtomicReference包装临时处理器解决未初始化问题
final java.util.concurrent.atomic.AtomicReference<MessageHandler.Whole<String>> tempHandlerRef = new java.util.concurrent.atomic.AtomicReference<>();
// 定义临时消息处理器
MessageHandler.Whole<String> tempHandler = msg -> {
if (!firstMsgFuture.isDone()) {
firstMsgFuture.complete(msg);
// 从引用中获取处理器并移除
session.removeMessageHandler(tempHandlerRef.get());
}
};
// 将处理器存入引用
tempHandlerRef.set(tempHandler);
// 注册处理器
session.addMessageHandler(tempHandler);
// 等待消息超时10秒
return firstMsgFuture.get(10, TimeUnit.SECONDS);
// // 使用Java并发工具等待消息模拟Golang的阻塞读取
// final java.util.concurrent.CompletableFuture<String> firstMsgFuture = new java.util.concurrent.CompletableFuture<>();
//
// // 临时注册消息处理器,读取第一条消息后移除
// MessageHandler.Whole<String> tempHandler = msg -> {
// if (!firstMsgFuture.isDone()) {
// firstMsgFuture.complete(msg);
// // 移除临时处理器(避免重复处理)
// session.removeMessageHandler(tempHandler);
// }
// };
// session.addMessageHandler(tempHandler);
//
// // 等待消息超时10秒防止设备一直不发消息
// return firstMsgFuture.get(10, TimeUnit.SECONDS);
}
/**
* 发送消息给指定设备对应Golang的sendMessageToDevice函数
* 发送消息给指定设备对应原Golang的sendMessageToDevice
*/
public static boolean sendMessageToDevice(String sn, String uuid, Object message) {
// 1. 检查设备是否在线
KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn);
if (deviceInfo == null) {
log.warn("设备不存在SN: {}", sn);
log.warn("发送消息失败:设备不存在SN: {}", sn);
responseChannels.remove(uuid);
return false;
}
// 2. 将WebSocket转回Session因为状态由Session管理)
WebSocket webSocket = deviceInfo.getConn();
if (!(webSocket instanceof Session)) {
log.warn("设备连接类型错误,无法判断状态SN: {}", sn);
responseChannels.remove(uuid);
return false;
}
Session session = (Session) webSocket;
// 3. 检查连接是否打开
if (!session.isOpen()) {
log.warn("设备连接已关闭SN: {}", sn);
connectedDevices.remove(sn); // 移除已关闭的设备
// 2. 检查Session是否有效
Session session = deviceInfo.getConn();
if (session == null || !session.isOpen()) {
log.warn("发送消息失败:设备连接已关闭SN: {}", sn);
connectedDevices.remove(sn);
snToUuids.remove(sn);
responseChannels.remove(uuid);
return false;
}
try {
// 4. 序列化消息并发送
// 3. 序列化并发送消息
String msgJson = objectMapper.writeValueAsString(message);
session.getBasicRemote().sendText(msgJson); // 通过Session发送消息
log.info("发送消息给设备SN: {}, UUID: {}, 消息: {}", sn, uuid, msgJson);
session.getBasicRemote().sendText(msgJson);
// 关联UUID和设备SN用于后续清理
snToUuids.get(sn).add(uuid);
log.info("发送消息成功SN: {}UUID: {},消息: {}", sn, uuid, msgJson);
return true;
} catch (Exception e) {
log.error("发送消息失败SN: {}, UUID: {}", sn, uuid, e);
// 发送失败时移除设备(可能连接已异常)
log.error("发送消息失败SN: {}UUID: {}", sn, uuid, e);
// 清理无效资源
connectedDevices.remove(sn);
snToUuids.remove(sn);
responseChannels.remove(uuid);
return false;
}
// // 1. 检查设备是否在线
// KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn);
// if (deviceInfo == null || !deviceInfo.getSession().isOpen()) {
// log.warn("设备不在线SN: {}", sn);
// // 清理无效的响应通道
// responseChannels.remove(uuid);
// return false;
// }
//
// try {
// // 2. 序列化消息并发送
// String msgJson = objectMapper.writeValueAsString(message);
// deviceInfo.getSession().getBasicRemote().sendText(msgJson);
// log.info("发送消息给设备SN: {}, UUID: {}, 消息: {}", sn, uuid, msgJson);
// return true;
//
// } catch (Exception e) {
// log.error("发送消息失败SN: {}, UUID: {}", sn, uuid, e);
// // 清理异常的响应通道
// responseChannels.remove(uuid);
// return false;
// }
}
/**
* 发送请求并等待响应对应Golang的SendRequestAndWaitResponse函数
* 发送请求并等待响应(对应Golang的SendRequestAndWaitResponse
*/
public static KqjEntity.CommonResponse sendRequestAndWaitResponse(String sn, String uuid, Object payload) throws Exception {
// 1. 创建响应结果容器
ResponseHolder<KqjEntity.CommonResponse> responseHolder = new ResponseHolder<>(sn);
responseChannels.put(uuid, responseHolder);
// 1. 创建响应容器并注册
ResponseHolder holder = new ResponseHolder(sn);
responseChannels.put(uuid, holder);
try {
// 2. 发送请求
boolean sendSuccess = sendMessageToDevice(sn, uuid, payload);
if (!sendSuccess) {
throw new Exception("发送请求失败,设备不在线或发送异常SN: " + sn);
throw new Exception("发送请求失败,设备不在线或连接异常SN: " + sn);
}
// 3. 等待响应超时10秒
return responseHolder.getResultFuture().get(10, TimeUnit.SECONDS);
} catch (java.util.concurrent.TimeoutException e) {
log.error("等待响应超时SN: {}, UUID: {}", sn, uuid);
// 3. 等待响应10秒超时
return holder.getResultFuture().get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.error("等待响应超时SN: {}UUID: {}", sn, uuid);
responseChannels.remove(uuid);
throw new Exception("等待响应超时10秒", e);
snToUuids.get(sn).remove(uuid);
throw new Exception("等待响应超时10秒SN: " + sn, e);
} finally {
// 清理响应通道(防止内存泄漏)
// 4. 清理响应通道(防止内存泄漏)
responseChannels.remove(uuid);
if (snToUuids.containsKey(sn)) {
snToUuids.get(sn).remove(uuid);
}
}
}
/**
* 生成带6位随机数的UUID对应Golang的GenerateUUIDWithSixRandomDigits函数
* 生成带6位随机数的UUID对应Golang的GenerateUUIDWithSixRandomDigits
*/
public static String generateUUIDWithSixRandomDigits() {
// 生成标准UUID
String uuidStr = UUID.randomUUID().toString().replace("-", "");
// 生成6位随机数100000-999999
Random random = new Random();
int randomNum = random.nextInt(900000) + 100000;
// 拼接返回
int randomNum = random.nextInt(900000) + 100000; // 6位随机数100000-999999
return uuidStr + "-" + randomNum;
}
// ------------------------------ 业务服务调用模拟Golang的service层 ------------------------------
/**
* 注册设备到业务服务对应Golang的service.BusAttendanceMachine().Register
* 实际项目中替换为真实的Service调用
* 清理设备关联的所有响应通道(连接关闭时调用
*/
private void registerDeviceToService(String sn) {
try {
// 模拟业务服务调用(如更新数据库设备状态为在线)
log.info("调用业务服务注册设备SN: {}", sn);
// TODO: 替换为真实的Service代码如Spring Bean调用
} catch (Exception e) {
log.error("业务服务注册设备失败SN: {}", sn, e);
private void cleanDeviceResponseChannels(String sn) {
Set<String> uuids = snToUuids.remove(sn);
if (uuids == null || uuids.isEmpty()) {
return;
}
}
/**
* 更新设备状态对应Golang的service.BusAttendanceMachine().Change
* 实际项目中替换为真实的Service调用
*/
private void updateDeviceStatus(String sn, String status) {
try {
log.info("调用业务服务更新设备状态SN: {}, 状态: {}", sn, status);
// TODO: 替换为真实的Service代码如Spring Bean调用
} catch (Exception e) {
log.error("业务服务更新设备状态失败SN: {}", sn, e);
for (String uuid : uuids) {
ResponseHolder holder = responseChannels.remove(uuid);
if (holder != null) {
holder.getResultFuture().completeExceptionally(
new Exception("设备已离线响应通道已清理SN: " + sn)
);
}
}
log.info("清理设备响应通道SN: {}共清理UUID数量: {}", sn, uuids.size());
}
// ------------------------------ 内部辅助类 ------------------------------
/**
* 响应结果容器(对应Golang的chan CommonResponse
* 用CompletableFuture实现异步结果等待
* 响应结果容器(替代Golang的chan,用CompletableFuture实现异步等待
*/
private static class ResponseHolder<T> {
private static class ResponseHolder {
private final String sn; // 关联的设备SN
private final java.util.concurrent.CompletableFuture<T> resultFuture;
private final CompletableFuture<KqjEntity.CommonResponse> resultFuture;
public ResponseHolder(String sn) {
this.sn = sn;
this.resultFuture = new java.util.concurrent.CompletableFuture<>();
this.resultFuture = new CompletableFuture<>();
}
public String getSn() {
return sn;
}
public java.util.concurrent.CompletableFuture<T> getResultFuture() {
public CompletableFuture<KqjEntity.CommonResponse> getResultFuture() {
return resultFuture;
}
}

View File

@ -1,21 +1,20 @@
package org.dromara.mobileAttendanceMachine;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.websocket.Session;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.net.http.WebSocket;
/**
* @Author 铁憨憨
* @Date 2025/10/14 14:47
* @Version 1.0
* 实体类WebSocket消息结构和设备信息
*/
@Data
@AllArgsConstructor
public class KqjEntity {
/**
* 设备连接信息修正conn为Jakarta Session
*/
@Data
public static class DeviceInfo {
@JsonProperty("ip")
@ -25,14 +24,15 @@ public class KqjEntity {
private String port;
@JsonProperty("conn")
private WebSocket conn;
private Session conn; // 关键修正使用Jakarta WebSocket的Session
}
// ------------------------------ 以下为原有实体类,保持不变 ------------------------------
/**
* 通用消息结构,用于解析初始的 cmd 字段
*/
@Data
public class GenericMessage {
public static class GenericMessage {
@JsonProperty("cmd")
private String cmd;
}
@ -41,7 +41,7 @@ public class KqjEntity {
* 公共响应结构
*/
@Data
public class CommonResponse {
public static class CommonResponse {
@JsonProperty("cmd")
private String cmd;
@ -56,12 +56,12 @@ public class KqjEntity {
}
@Data
public class CommonResponseData {
public static class CommonResponseData {
@JsonProperty("cmd")
private String cmd;
@JsonProperty("userIds")
private String[] userIds; // 用户IDS
private String[] userIds;
@JsonProperty("user_id")
private String userId;
@ -77,7 +77,7 @@ public class KqjEntity {
}
@Data
public class DelMultiUserData {
public static class DelMultiUserData {
@JsonProperty("user_id")
private String userId;
@ -88,12 +88,11 @@ public class KqjEntity {
private String msg;
}
/**
* 设备上线消息
*/
@Data
public class DeclareMessage {
public static class DeclareMessage {
@JsonProperty("cmd")
private String cmd;
@ -134,7 +133,6 @@ public class KqjEntity {
private PongMessageData data;
}
/**
* 心跳回复消息结构
*/
@ -150,13 +148,13 @@ public class KqjEntity {
@Data
public class PeopleInformation {
@JsonProperty("cmd")
private String cmd; // 该接口固定为to_device
private String cmd;
@JsonProperty("from")
private String from; // 可不填写,填写uuid来做为发送请求或响应的标识
private String from;
@JsonProperty("to")
private String to; // 设备号(请查看公共设置中的设备号)
private String to;
@JsonProperty("data")
private PeopleInData data;
@ -174,10 +172,10 @@ public class KqjEntity {
private String name;
@JsonProperty("face_template")
private String faceTemplate; // http 链接图
private String faceTemplate;
@JsonProperty("id_valid")
private String idValid; // 人员有效期人员在这个时间点后无法通行格式yyyy-MM-dd 或者 yyyy-MM-dd HH:mm为 "" 则为永久
private String idValid;
}
/**
@ -207,7 +205,7 @@ public class KqjEntity {
private String userId;
@JsonProperty("user_type")
private int userType; // 删除的用户类型0-人脸接口下发的数据 1-人证比对接口下发的数据
private int userType;
}
/**
@ -237,7 +235,7 @@ public class KqjEntity {
private String[] userIds;
@JsonProperty("user_type")
private int userType; // 删除的用户类型0-人脸接口下发的数据 1-人证比对接口下发的数据
private int userType;
}
/**
@ -264,7 +262,7 @@ public class KqjEntity {
private String cmd;
@JsonProperty("user_type")
private int userType; // 删除的用户类型0-人脸接口下发的数据 1-人证比对接口下发的数据
private int userType;
}
/**
@ -273,13 +271,13 @@ public class KqjEntity {
@Data
public class PersonnelInformationAcquisition {
@JsonProperty("cmd")
private String cmd; // 该接口固定为to_device
private String cmd;
@JsonProperty("from")
private String from; // 可不填写,填写uuid来做为发送请求或响应的标识
private String from;
@JsonProperty("to")
private String to; // 设备号(请查看公共设置中的设备号)
private String to;
@JsonProperty("data")
private PersonnelInformationAcquisitionTwo data;
@ -293,5 +291,4 @@ public class KqjEntity {
@JsonProperty("value")
private int value;
}
}

View File

@ -1,22 +1,13 @@
package org.dromara.mobileAttendanceMachine;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @Author 铁憨憨
* @Date 2025/10/14 16:11
* @Version 1.0
*
* 系统启动就会开启ws
*/
@Configuration
public class WebSocketConfig {
/**
* 自动注册所有标注了@ServerEndpoint的WebSocket端点
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();