Compare commits
2 Commits
98f23e2c02
...
536b25d773
| Author | SHA1 | Date | |
|---|---|---|---|
| 536b25d773 | |||
| e87cbce77a |
@ -13,6 +13,7 @@ import org.springframework.web.socket.WebSocketHandler;
|
|||||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
||||||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
|
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
|
||||||
import org.springframework.web.socket.server.HandshakeInterceptor;
|
import org.springframework.web.socket.server.HandshakeInterceptor;
|
||||||
|
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* WebSocket 配置
|
* WebSocket 配置
|
||||||
|
|||||||
@ -19,11 +19,15 @@
|
|||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
||||||
<!-- Java WebSocket 标准API -->
|
<!-- Java WebSocket 标准API -->
|
||||||
|
<!-- <dependency>-->
|
||||||
|
<!-- <groupId>javax.websocket</groupId>-->
|
||||||
|
<!-- <artifactId>javax.websocket-api</artifactId>-->
|
||||||
|
<!-- <version>1.1</version>-->
|
||||||
|
<!-- <scope>provided</scope>-->
|
||||||
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>javax.websocket</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>javax.websocket-api</artifactId>
|
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||||
<version>1.1</version>
|
|
||||||
<scope>provided</scope>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- <dependency>-->
|
<!-- <dependency>-->
|
||||||
|
|||||||
@ -1,153 +1,161 @@
|
|||||||
package org.dromara.mobileAttendanceMachine;
|
package org.dromara.mobileAttendanceMachine;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import jakarta.websocket.*;
|
||||||
|
import jakarta.websocket.server.ServerEndpoint;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.websocket.*;
|
|
||||||
import javax.websocket.server.ServerEndpoint;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.http.WebSocket;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* WebSocket服务端实现(对应Golang的HandleWebSocket逻辑)
|
* WebSocket服务端(设备连接管理、消息处理)
|
||||||
* ServerEndpoint注解指定WebSocket连接路径
|
* 核心逻辑:通过状态标记区分设备注册阶段和正常通信阶段,避免多处理器冲突
|
||||||
*/
|
*/
|
||||||
@ServerEndpoint("/ws/device")
|
@ServerEndpoint("/custom-ws/device")
|
||||||
|
@Component
|
||||||
@Log4j2
|
@Log4j2
|
||||||
public class DeviceWebSocketServer {
|
public class DeviceWebSocketServer {
|
||||||
|
|
||||||
// ------------------------------ 常量定义(对应Golang的const) ------------------------------
|
// ------------------------------ 常量定义 ------------------------------
|
||||||
public static class Constants {
|
public static final String DECLARE = "declare"; // 设备注册消息
|
||||||
public static final String DECLARE = "declare"; // 设备初上线
|
public static final String PING = "ping"; // 心跳消息
|
||||||
public static final String PING = "ping"; // 心跳
|
public static final String TO_CLIENT = "to_client"; // 设备响应消息
|
||||||
public static final String TO_CLIENT = "to_client"; // 服务器消息下发到客户端的响应
|
private static final int REGISTER_TIMEOUT = 10; // 注册超时时间(秒)
|
||||||
}
|
|
||||||
|
|
||||||
// JSON序列化/反序列化工具(单例)
|
// ------------------------------ 全局静态存储 ------------------------------
|
||||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
// JSON序列化工具(单例)
|
||||||
|
private static final ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);;
|
||||||
// 1. 存储所有连接的设备信息(key: 设备SN,value: 设备信息)
|
// 设备连接池:key=设备SN,value=设备信息(含Session)
|
||||||
private static final Map<String, KqjEntity.DeviceInfo> connectedDevices = new ConcurrentHashMap<>();
|
private static final Map<String, KqjEntity.DeviceInfo> connectedDevices = new ConcurrentHashMap<>();
|
||||||
// 2. 存储UUID对应的响应通道(key: UUID,value: 响应结果容器)
|
// 响应通道:key=UUID,value=响应结果容器
|
||||||
private static final Map<String, ResponseHolder<KqjEntity.CommonResponse>> responseChannels = new ConcurrentHashMap<>();
|
private static final Map<String, ResponseHolder> responseChannels = new ConcurrentHashMap<>();
|
||||||
|
// 设备-SN反向映射:key=设备SN,value=关联的UUID列表(用于连接关闭时清理)
|
||||||
|
private static final Map<String, Set<String>> snToUuids = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
// 当前连接的WebSocket会话
|
// ------------------------------ 每个连接的实例变量 ------------------------------
|
||||||
private Session session;
|
private Session session; // 当前WebSocket会话
|
||||||
// 当前连接的设备SN(连接建立后从DECLARE消息中提取)
|
private String currentDeviceSn; // 当前连接的设备SN(注册后赋值)
|
||||||
private String currentDeviceSn;
|
private volatile boolean isRegistered; // 注册状态:false=未注册,true=已注册
|
||||||
|
private CompletableFuture<Boolean> registerFuture; // 注册阶段的消息Future
|
||||||
|
|
||||||
|
// ------------------------------ 连接生命周期方法 ------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 连接建立时触发(对应Golang中upgrader.Upgrade后的初始化逻辑)
|
* 连接建立时触发(初始化状态)
|
||||||
*/
|
*/
|
||||||
@OnOpen
|
@OnOpen
|
||||||
public void onOpen(Session session) {
|
public void onOpen(Session session) {
|
||||||
this.session = session;
|
this.session = session;
|
||||||
log.info("新的WebSocket连接建立,会话ID: {}", session.getId());
|
this.isRegistered = false;
|
||||||
|
this.registerFuture = new CompletableFuture<>(); // 初始化注册Future
|
||||||
|
|
||||||
|
log.info("新连接建立,会话ID: {},等待设备注册({}秒超时)...", session.getId(), REGISTER_TIMEOUT);
|
||||||
|
|
||||||
|
// 启动注册超时监听
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
// 等待注册消息,超时则关闭连接
|
||||||
|
if (!registerFuture.get(REGISTER_TIMEOUT, TimeUnit.SECONDS)) {
|
||||||
|
throw new TimeoutException("注册超时");
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("设备注册超时/失败,会话ID: {}", session.getId(), e);
|
||||||
|
try {
|
||||||
|
if (session.isOpen()) {
|
||||||
|
session.close(new CloseReason(CloseReason.CloseCodes.TRY_AGAIN_LATER, "注册超时(请发送DECLARE消息)"));
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
log.error("关闭超时连接失败", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 接收消息时触发(所有消息统一处理,按状态区分)
|
||||||
|
*/
|
||||||
|
@OnMessage
|
||||||
|
public void onMessage(String message, Session session) {
|
||||||
|
if (message == null || message.isEmpty()) {
|
||||||
|
log.warn("会话ID: {} 收到空消息,忽略", session.getId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 读取设备第一条消息(DECLARE消息),完成设备注册
|
// 解析通用消息的CMD(所有消息必须包含CMD)
|
||||||
KqjEntity.DeclareMessage declareMsg = registerDevice();
|
KqjEntity.GenericMessage genericMsg = objectMapper.readValue(message, KqjEntity.GenericMessage.class);
|
||||||
this.currentDeviceSn = declareMsg.getSn();
|
String cmd = genericMsg.getCmd();
|
||||||
log.info("设备注册成功,SN: {}, 设备信息: {}", currentDeviceSn, declareMsg);
|
if (cmd == null) {
|
||||||
|
log.warn("会话ID: {} 收到无CMD消息,关闭连接: {}", session.getId(), message);
|
||||||
|
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "消息缺少cmd字段"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 未注册状态:仅处理DECLARE注册消息
|
||||||
|
if (!isRegistered) {
|
||||||
|
handleRegisterStage(message, cmd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 已注册状态:处理正常业务消息
|
||||||
|
handleNormalStage(message, cmd);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("设备注册失败,关闭连接", e);
|
log.error("会话ID: {} 处理消息失败: {}", session.getId(), message, e);
|
||||||
try {
|
try {
|
||||||
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "设备注册失败"));
|
if (session.isOpen()) {
|
||||||
|
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "消息处理异常"));
|
||||||
|
}
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
log.error("关闭异常连接失败", ex);
|
log.error("关闭异常连接失败", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 接收客户端消息时触发(对应Golang的for循环读取消息逻辑)
|
* 连接关闭时触发(清理资源)
|
||||||
*/
|
|
||||||
@OnMessage
|
|
||||||
public void onMessage(String message, Session session) {
|
|
||||||
if (message == null || message.isEmpty()) {
|
|
||||||
log.warn("收到空消息,忽略处理");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 先解析通用消息的CMD字段(对应Golang的GenericMessage)
|
|
||||||
KqjEntity.GenericMessage genericMsg = objectMapper.readValue(message, KqjEntity.GenericMessage.class);
|
|
||||||
String cmd = genericMsg.getCmd();
|
|
||||||
if (cmd == null) {
|
|
||||||
log.warn("收到无CMD字段的消息,忽略: {}", message);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 根据CMD类型处理不同逻辑(对应Golang的switch case)
|
|
||||||
switch (cmd) {
|
|
||||||
case Constants.DECLARE:
|
|
||||||
log.info("设备在线心跳(DECLARE),SN: {}", currentDeviceSn);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Constants.PING:
|
|
||||||
handlePing(message);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Constants.TO_CLIENT:
|
|
||||||
handleToClientResponse(message);
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
log.warn("收到未知CMD消息,类型: {}, 内容: {}", cmd, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("处理消息失败,消息内容: {}", message, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 连接关闭时触发(对应Golang的defer conn.Close()和资源清理逻辑)
|
|
||||||
*/
|
*/
|
||||||
@OnClose
|
@OnClose
|
||||||
public void onClose(Session session, CloseReason closeReason) {
|
public void onClose(Session session, CloseReason closeReason) {
|
||||||
log.info("WebSocket连接关闭,会话ID: {}, 原因: {}", session.getId(), closeReason);
|
log.info("连接关闭,会话ID: {},原因: {}", session.getId(), closeReason);
|
||||||
|
|
||||||
// 1. 移除设备连接信息
|
// 1. 移除设备连接(若已注册)
|
||||||
if (currentDeviceSn != null) {
|
if (currentDeviceSn != null && connectedDevices.containsKey(currentDeviceSn)) {
|
||||||
connectedDevices.remove(currentDeviceSn);
|
connectedDevices.remove(currentDeviceSn);
|
||||||
// 更新设备状态为离线(对应Golang的service.BusAttendanceMachine().Change)
|
log.info("设备离线,SN: {},会话ID: {}", currentDeviceSn, session.getId());
|
||||||
|
|
||||||
|
// 2. 更新设备状态为离线(业务服务调用)
|
||||||
updateDeviceStatus(currentDeviceSn, "0");
|
updateDeviceStatus(currentDeviceSn, "0");
|
||||||
|
|
||||||
|
// 3. 清理该设备关联的响应通道
|
||||||
|
cleanDeviceResponseChannels(currentDeviceSn);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. 清理当前设备对应的响应通道(避免内存泄漏)
|
// 4. 重置实例变量
|
||||||
responseChannels.entrySet().removeIf(entry -> {
|
|
||||||
if (entry.getValue().getSn().equals(currentDeviceSn)) {
|
|
||||||
entry.getValue().getResultFuture().completeExceptionally(
|
|
||||||
new Exception("设备连接已关闭,响应通道清理")
|
|
||||||
);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
});
|
|
||||||
|
|
||||||
this.session = null;
|
this.session = null;
|
||||||
this.currentDeviceSn = null;
|
this.currentDeviceSn = null;
|
||||||
|
this.isRegistered = false;
|
||||||
|
this.registerFuture = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 连接异常时触发
|
* 连接异常时触发
|
||||||
*/
|
*/
|
||||||
@OnError
|
@OnError
|
||||||
public void onError(Session session, Throwable throwable) {
|
public void onError(Session session, Throwable throwable) {
|
||||||
log.error("WebSocket连接异常,会话ID: {}", session.getId(), throwable);
|
log.error("连接异常,会话ID: {},异常信息: {}", session.getId(), throwable.getMessage(), throwable);
|
||||||
// 异常时主动关闭连接
|
|
||||||
try {
|
try {
|
||||||
if (session.isOpen()) {
|
if (session.isOpen()) {
|
||||||
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "连接异常"));
|
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "连接异常"));
|
||||||
@ -157,63 +165,175 @@ public class DeviceWebSocketServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------ 消息处理核心逻辑 ------------------------------
|
||||||
// ------------------------------ 核心业务方法 ------------------------------
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设备注册(对应Golang的addDevice函数)
|
* 处理注册阶段消息(仅接受DECLARE)
|
||||||
* 读取DECLARE消息,解析设备信息并存储
|
|
||||||
*/
|
*/
|
||||||
private KqjEntity.DeclareMessage registerDevice() throws Exception {
|
// private void handleRegisterStage(String message, String cmd) throws Exception {
|
||||||
// 阻塞读取第一条消息(DECLARE消息)
|
// // 非DECLARE消息直接拒绝
|
||||||
String firstMessage = readFirstMessage();
|
// if (!DECLARE.equals(cmd)) {
|
||||||
KqjEntity.DeclareMessage declareMsg = objectMapper.readValue(firstMessage, KqjEntity.DeclareMessage.class);
|
// log.warn("未注册设备发送非法消息,cmd: {},关闭连接", cmd);
|
||||||
|
// session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "请先发送注册消息(cmd=declare)"));
|
||||||
|
// registerFuture.complete(false); // 标记注册失败
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // 解析DECLARE消息并完成注册
|
||||||
|
// KqjEntity.DeclareMessage declareMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class);
|
||||||
|
// String sn = declareMsg.getSn();
|
||||||
|
// if (sn == null || sn.isEmpty()) {
|
||||||
|
// log.warn("注册消息缺少SN,关闭连接");
|
||||||
|
// session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "注册消息缺少sn字段"));
|
||||||
|
// registerFuture.complete(false);
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // 存储设备信息(IP、端口、Session)
|
||||||
|
// InetSocketAddress remoteAddr = (InetSocketAddress) session.getUserProperties().get("jakarta.websocket.endpoint.remoteAddress");
|
||||||
|
// String ip = remoteAddr.getAddress().getHostAddress();
|
||||||
|
// String port = String.valueOf(remoteAddr.getPort());
|
||||||
|
//
|
||||||
|
// KqjEntity.DeviceInfo deviceInfo = new KqjEntity.DeviceInfo();
|
||||||
|
// deviceInfo.setIp(ip);
|
||||||
|
// deviceInfo.setPort(port);
|
||||||
|
// deviceInfo.setConn(session); // 直接存储Jakarta Session,无类型转换
|
||||||
|
// connectedDevices.put(sn, deviceInfo);
|
||||||
|
//
|
||||||
|
// // 初始化设备-SN反向映射
|
||||||
|
// snToUuids.computeIfAbsent(sn, k -> ConcurrentHashMap.newKeySet());
|
||||||
|
//
|
||||||
|
// // 调用业务服务完成注册
|
||||||
|
// registerDeviceToService(sn);
|
||||||
|
//
|
||||||
|
// // 注册成功:更新状态
|
||||||
|
// this.currentDeviceSn = sn;
|
||||||
|
// this.isRegistered = true;
|
||||||
|
// registerFuture.complete(true); // 标记注册成功
|
||||||
|
// log.info("设备注册成功,SN: {},IP: {}:{},会话ID: {}", sn, ip, port, session.getId());
|
||||||
|
// }
|
||||||
|
|
||||||
// 校验SN合法性
|
private void handleRegisterStage(String message, String cmd) throws Exception {
|
||||||
String sn = declareMsg.getSn();
|
// 非DECLARE消息直接拒绝
|
||||||
if (sn == null || sn.isEmpty()) {
|
if (!DECLARE.equals(cmd)) {
|
||||||
throw new IllegalArgumentException("设备SN为空,注册失败");
|
log.warn("未注册设备发送非法消息,cmd: {},关闭连接", cmd);
|
||||||
|
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "请先发送注册消息(cmd=declare)"));
|
||||||
|
registerFuture.complete(false); // 标记注册失败
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 解析客户端IP和端口(对应Golang的parseRemoteAddr)
|
// 解析DECLARE消息并完成注册
|
||||||
InetSocketAddress remoteAddr = (InetSocketAddress) session.getUserProperties().get("javax.websocket.endpoint.remoteAddress");
|
KqjEntity.DeclareMessage declareMsg;
|
||||||
String ip = remoteAddr.getAddress().getHostAddress();
|
try {
|
||||||
String port = String.valueOf(remoteAddr.getPort());
|
declareMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("解析DECLARE消息失败,原始消息: {}", message, e);
|
||||||
|
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "DECLARE消息格式错误"));
|
||||||
|
registerFuture.complete(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 存储设备信息
|
String sn = declareMsg.getSn();
|
||||||
|
if (sn == null || sn.isEmpty()) {
|
||||||
|
log.warn("注册消息缺少SN,关闭连接,消息内容: {}", message);
|
||||||
|
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "注册消息缺少sn字段"));
|
||||||
|
registerFuture.complete(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 存储设备信息(IP、端口、Session)- 核心优化:兼容多容器,避免空指针
|
||||||
|
String ip = "unknown";
|
||||||
|
String port = "unknown";
|
||||||
|
InetSocketAddress remoteAddr = null;
|
||||||
|
|
||||||
|
// 尝试多种容器可能的键(优先级:Jakarta标准 -> Undertow专用 -> Javax兼容)
|
||||||
|
Object addrObj = session.getUserProperties().get("jakarta.websocket.endpoint.remoteAddress");
|
||||||
|
if (addrObj == null) {
|
||||||
|
addrObj = session.getUserProperties().get("io.undertow.websocket.remoteAddress"); // Undertow专用
|
||||||
|
}
|
||||||
|
if (addrObj == null) {
|
||||||
|
addrObj = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"); // 兼容旧版
|
||||||
|
}
|
||||||
|
|
||||||
|
// 校验并解析IP和端口
|
||||||
|
if (addrObj instanceof InetSocketAddress) {
|
||||||
|
remoteAddr = (InetSocketAddress) addrObj;
|
||||||
|
// 防止getAddress()返回null(极端情况)
|
||||||
|
if (remoteAddr.getAddress() != null) {
|
||||||
|
ip = remoteAddr.getAddress().getHostAddress();
|
||||||
|
}
|
||||||
|
port = String.valueOf(remoteAddr.getPort());
|
||||||
|
log.info("解析设备地址成功,会话ID: {},IP: {},端口: {}", session.getId(), ip, port);
|
||||||
|
} else {
|
||||||
|
// 地址获取失败时,使用默认值,不中断注册流程
|
||||||
|
log.warn("无法解析设备地址,会话ID: {},addrObj类型: {}",
|
||||||
|
session.getId(), addrObj == null ? "null" : addrObj.getClass().getName());
|
||||||
|
log.warn("继续注册流程,使用默认地址(unknown)");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建设备信息(即使IP获取失败,也正常注册)
|
||||||
KqjEntity.DeviceInfo deviceInfo = new KqjEntity.DeviceInfo();
|
KqjEntity.DeviceInfo deviceInfo = new KqjEntity.DeviceInfo();
|
||||||
deviceInfo.setIp(ip);
|
deviceInfo.setIp(ip);
|
||||||
deviceInfo.setPort(port);
|
deviceInfo.setPort(port);
|
||||||
|
deviceInfo.setConn(session);
|
||||||
// ======================== 添加类型检查 ========================
|
|
||||||
// 检查session是否能转换为WebSocket,避免强转失败
|
|
||||||
if (!(session instanceof WebSocket)) {
|
|
||||||
throw new IllegalStateException("WebSocket容器不支持Session转WebSocket,无法注册设备,SN: " + sn);
|
|
||||||
}
|
|
||||||
// 强转并赋值
|
|
||||||
deviceInfo.setConn((WebSocket) session); // 用Session替换Golang的*websocket.Conn
|
|
||||||
// ============================================================
|
|
||||||
|
|
||||||
|
|
||||||
connectedDevices.put(sn, deviceInfo);
|
connectedDevices.put(sn, deviceInfo);
|
||||||
|
|
||||||
// 调用业务服务注册设备(对应Golang的service.BusAttendanceMachine().Register)
|
// 初始化设备-SN反向映射
|
||||||
|
snToUuids.computeIfAbsent(sn, k -> ConcurrentHashMap.newKeySet());
|
||||||
|
|
||||||
|
// 调用业务服务完成注册
|
||||||
registerDeviceToService(sn);
|
registerDeviceToService(sn);
|
||||||
|
|
||||||
return declareMsg;
|
// 注册成功:更新状态
|
||||||
|
this.currentDeviceSn = sn;
|
||||||
|
this.isRegistered = true;
|
||||||
|
registerFuture.complete(true); // 标记注册成功
|
||||||
|
log.info("设备注册成功,SN: {},IP: {}:{},会话ID: {}", sn, ip, port, session.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理PING消息(对应Golang的handlePing函数)
|
* 处理正常通信阶段消息(PING、TO_CLIENT等)
|
||||||
|
*/
|
||||||
|
private void handleNormalStage(String message, String cmd) throws Exception {
|
||||||
|
switch (cmd) {
|
||||||
|
case DECLARE:
|
||||||
|
// 已注册设备发送DECLARE视为在线心跳
|
||||||
|
log.info("设备在线心跳,SN: {},会话ID: {}", currentDeviceSn, session.getId());
|
||||||
|
break;
|
||||||
|
|
||||||
|
case PING:
|
||||||
|
// 处理心跳:回复PONG
|
||||||
|
handlePing(message);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TO_CLIENT:
|
||||||
|
// 处理设备响应消息
|
||||||
|
handleToClientResponse(message);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.warn("收到未知消息类型,SN: {},cmd: {},内容: {}", currentDeviceSn, cmd, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------ 业务子逻辑 ------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理PING心跳消息(回复PONG)
|
||||||
*/
|
*/
|
||||||
private void handlePing(String message) throws Exception {
|
private void handlePing(String message) throws Exception {
|
||||||
|
// 解析PING消息中的SN(双重校验)
|
||||||
KqjEntity.DeclareMessage pingMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class);
|
KqjEntity.DeclareMessage pingMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class);
|
||||||
String sn = pingMsg.getSn();
|
String sn = pingMsg.getSn();
|
||||||
|
if (!currentDeviceSn.equals(sn)) {
|
||||||
|
log.warn("心跳消息SN不匹配,当前SN: {},消息SN: {}", currentDeviceSn, sn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 1. 回复PONG消息
|
// 构造PONG响应
|
||||||
KqjEntity.PongMessage pongMsg = new KqjEntity.PongMessage();
|
KqjEntity.PongMessage pongMsg = new KqjEntity.PongMessage();
|
||||||
pongMsg.setCmd(Constants.PING);
|
pongMsg.setCmd("pong");
|
||||||
pongMsg.setFrom("server");
|
pongMsg.setFrom("server");
|
||||||
pongMsg.setTo(sn);
|
pongMsg.setTo(sn);
|
||||||
|
|
||||||
@ -223,247 +343,185 @@ public class DeviceWebSocketServer {
|
|||||||
|
|
||||||
// 发送PONG消息
|
// 发送PONG消息
|
||||||
session.getBasicRemote().sendText(objectMapper.writeValueAsString(pongMsg));
|
session.getBasicRemote().sendText(objectMapper.writeValueAsString(pongMsg));
|
||||||
log.info("发送PONG消息给设备,SN: {}", sn);
|
log.info("发送PONG消息,SN: {},会话ID: {}", sn, session.getId());
|
||||||
|
|
||||||
// 2. 更新设备连接状态(对应Golang的connectedDevices[sn]重设)
|
// 更新设备连接状态(业务服务调用)
|
||||||
if (connectedDevices.containsKey(sn)) {
|
|
||||||
KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn);
|
|
||||||
deviceInfo.setConn((WebSocket) session);
|
|
||||||
connectedDevices.put(sn, deviceInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. 调用业务服务更新设备状态(对应Golang的service.BusAttendanceMachine().Register)
|
|
||||||
registerDeviceToService(sn);
|
registerDeviceToService(sn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理TO_CLIENT响应消息(对应Golang的requestResponse函数)
|
* 处理TO_CLIENT响应消息(分发到对应的UUID通道)
|
||||||
*/
|
*/
|
||||||
private void handleToClientResponse(String message) throws Exception {
|
private void handleToClientResponse(String message) throws Exception {
|
||||||
log.info("收到TO_CLIENT响应消息: {}", message);
|
|
||||||
KqjEntity.CommonResponse commonResp = objectMapper.readValue(message, KqjEntity.CommonResponse.class);
|
KqjEntity.CommonResponse commonResp = objectMapper.readValue(message, KqjEntity.CommonResponse.class);
|
||||||
|
|
||||||
// 根据UUID查找响应通道,传递响应结果
|
|
||||||
String uuid = commonResp.getTo();
|
String uuid = commonResp.getTo();
|
||||||
ResponseHolder<KqjEntity.CommonResponse> holder = responseChannels.get(uuid);
|
if (uuid == null || uuid.isEmpty()) {
|
||||||
|
log.warn("响应消息缺少UUID(to字段),SN: {},内容: {}", currentDeviceSn, message);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 查找响应通道并分发结果
|
||||||
|
ResponseHolder holder = responseChannels.get(uuid);
|
||||||
if (holder != null) {
|
if (holder != null) {
|
||||||
holder.getResultFuture().complete(commonResp);
|
holder.getResultFuture().complete(commonResp);
|
||||||
responseChannels.remove(uuid); // 移除已完成的通道
|
responseChannels.remove(uuid);
|
||||||
log.info("响应已分发到UUID: {}, 响应内容: {}", uuid, commonResp);
|
snToUuids.get(currentDeviceSn).remove(uuid); // 清理反向映射
|
||||||
|
log.info("响应分发成功,UUID: {},SN: {}", uuid, currentDeviceSn);
|
||||||
} else {
|
} else {
|
||||||
log.warn("未找到UUID: {}对应的响应通道,响应丢弃", uuid);
|
log.warn("未找到UUID对应的响应通道,UUID: {},SN: {}", uuid, currentDeviceSn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------ 业务服务调用(需替换为真实实现) ------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设备注册到业务服务(对应原Golang的service.BusAttendanceMachine().Register)
|
||||||
|
*/
|
||||||
|
private void registerDeviceToService(String sn) {
|
||||||
|
try {
|
||||||
|
// TODO: 替换为真实的业务服务调用(如Spring Bean注入后调用)
|
||||||
|
log.info("【业务服务】设备注册,SN: {}", sn);
|
||||||
|
// 示例:BusAttendanceMachineService.register(sn);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("【业务服务】设备注册失败,SN: {}", sn, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新设备状态(对应原Golang的service.BusAttendanceMachine().Change)
|
||||||
|
*/
|
||||||
|
private void updateDeviceStatus(String sn, String status) {
|
||||||
|
try {
|
||||||
|
// TODO: 替换为真实的业务服务调用
|
||||||
|
log.info("【业务服务】更新设备状态,SN: {},状态: {}", sn, status);
|
||||||
|
// 示例:BusAttendanceMachineService.changeStatus(sn, status);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("【业务服务】更新设备状态失败,SN: {}", sn, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ------------------------------ 工具方法 ------------------------------
|
// ------------------------------ 工具方法 ------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 读取设备第一条消息(阻塞直到收到消息)
|
* 发送消息给指定设备(对应原Golang的sendMessageToDevice)
|
||||||
*/
|
|
||||||
private String readFirstMessage() throws Exception {
|
|
||||||
// 使用CompletableFuture等待消息
|
|
||||||
final java.util.concurrent.CompletableFuture<String> firstMsgFuture = new java.util.concurrent.CompletableFuture<>();
|
|
||||||
// 使用AtomicReference包装临时处理器,解决未初始化问题
|
|
||||||
final java.util.concurrent.atomic.AtomicReference<MessageHandler.Whole<String>> tempHandlerRef = new java.util.concurrent.atomic.AtomicReference<>();
|
|
||||||
|
|
||||||
// 定义临时消息处理器
|
|
||||||
MessageHandler.Whole<String> tempHandler = msg -> {
|
|
||||||
if (!firstMsgFuture.isDone()) {
|
|
||||||
firstMsgFuture.complete(msg);
|
|
||||||
// 从引用中获取处理器并移除
|
|
||||||
session.removeMessageHandler(tempHandlerRef.get());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// 将处理器存入引用
|
|
||||||
tempHandlerRef.set(tempHandler);
|
|
||||||
// 注册处理器
|
|
||||||
session.addMessageHandler(tempHandler);
|
|
||||||
|
|
||||||
// 等待消息,超时10秒
|
|
||||||
return firstMsgFuture.get(10, TimeUnit.SECONDS);
|
|
||||||
// // 使用Java并发工具等待消息(模拟Golang的阻塞读取)
|
|
||||||
// final java.util.concurrent.CompletableFuture<String> firstMsgFuture = new java.util.concurrent.CompletableFuture<>();
|
|
||||||
//
|
|
||||||
// // 临时注册消息处理器,读取第一条消息后移除
|
|
||||||
// MessageHandler.Whole<String> tempHandler = msg -> {
|
|
||||||
// if (!firstMsgFuture.isDone()) {
|
|
||||||
// firstMsgFuture.complete(msg);
|
|
||||||
// // 移除临时处理器(避免重复处理)
|
|
||||||
// session.removeMessageHandler(tempHandler);
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
// session.addMessageHandler(tempHandler);
|
|
||||||
//
|
|
||||||
// // 等待消息,超时10秒(防止设备一直不发消息)
|
|
||||||
// return firstMsgFuture.get(10, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 发送消息给指定设备(对应Golang的sendMessageToDevice函数)
|
|
||||||
*/
|
*/
|
||||||
public static boolean sendMessageToDevice(String sn, String uuid, Object message) {
|
public static boolean sendMessageToDevice(String sn, String uuid, Object message) {
|
||||||
// 1. 检查设备是否在线
|
// 1. 检查设备是否在线
|
||||||
KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn);
|
KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn);
|
||||||
if (deviceInfo == null) {
|
if (deviceInfo == null) {
|
||||||
log.warn("设备不存在,SN: {}", sn);
|
log.warn("发送消息失败:设备不存在,SN: {}", sn);
|
||||||
responseChannels.remove(uuid);
|
responseChannels.remove(uuid);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. 将WebSocket转回Session(因为状态由Session管理)
|
// 2. 检查Session是否有效
|
||||||
WebSocket webSocket = deviceInfo.getConn();
|
Session session = deviceInfo.getConn();
|
||||||
if (!(webSocket instanceof Session)) {
|
if (session == null || !session.isOpen()) {
|
||||||
log.warn("设备连接类型错误,无法判断状态,SN: {}", sn);
|
log.warn("发送消息失败:设备连接已关闭,SN: {}", sn);
|
||||||
responseChannels.remove(uuid);
|
connectedDevices.remove(sn);
|
||||||
return false;
|
snToUuids.remove(sn);
|
||||||
}
|
|
||||||
Session session = (Session) webSocket;
|
|
||||||
|
|
||||||
// 3. 检查连接是否打开
|
|
||||||
if (!session.isOpen()) {
|
|
||||||
log.warn("设备连接已关闭,SN: {}", sn);
|
|
||||||
connectedDevices.remove(sn); // 移除已关闭的设备
|
|
||||||
responseChannels.remove(uuid);
|
responseChannels.remove(uuid);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 4. 序列化消息并发送
|
// 3. 序列化并发送消息
|
||||||
String msgJson = objectMapper.writeValueAsString(message);
|
String msgJson = objectMapper.writeValueAsString(message);
|
||||||
session.getBasicRemote().sendText(msgJson); // 通过Session发送消息
|
session.getBasicRemote().sendText(msgJson);
|
||||||
log.info("发送消息给设备,SN: {}, UUID: {}, 消息: {}", sn, uuid, msgJson);
|
// 关联UUID和设备SN(用于后续清理)
|
||||||
|
snToUuids.get(sn).add(uuid);
|
||||||
|
log.info("发送消息成功,SN: {},UUID: {},消息: {}", sn, uuid, msgJson);
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("发送消息失败,SN: {}, UUID: {}", sn, uuid, e);
|
log.error("发送消息失败,SN: {},UUID: {}", sn, uuid, e);
|
||||||
// 发送失败时移除设备(可能连接已异常)
|
// 清理无效资源
|
||||||
connectedDevices.remove(sn);
|
connectedDevices.remove(sn);
|
||||||
|
snToUuids.remove(sn);
|
||||||
responseChannels.remove(uuid);
|
responseChannels.remove(uuid);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// // 1. 检查设备是否在线
|
|
||||||
// KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn);
|
|
||||||
// if (deviceInfo == null || !deviceInfo.getSession().isOpen()) {
|
|
||||||
// log.warn("设备不在线,SN: {}", sn);
|
|
||||||
// // 清理无效的响应通道
|
|
||||||
// responseChannels.remove(uuid);
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// try {
|
|
||||||
// // 2. 序列化消息并发送
|
|
||||||
// String msgJson = objectMapper.writeValueAsString(message);
|
|
||||||
// deviceInfo.getSession().getBasicRemote().sendText(msgJson);
|
|
||||||
// log.info("发送消息给设备,SN: {}, UUID: {}, 消息: {}", sn, uuid, msgJson);
|
|
||||||
// return true;
|
|
||||||
//
|
|
||||||
// } catch (Exception e) {
|
|
||||||
// log.error("发送消息失败,SN: {}, UUID: {}", sn, uuid, e);
|
|
||||||
// // 清理异常的响应通道
|
|
||||||
// responseChannels.remove(uuid);
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送请求并等待响应(对应Golang的SendRequestAndWaitResponse函数)
|
* 发送请求并等待响应(对应原Golang的SendRequestAndWaitResponse)
|
||||||
*/
|
*/
|
||||||
public static KqjEntity.CommonResponse sendRequestAndWaitResponse(String sn, String uuid, Object payload) throws Exception {
|
public static KqjEntity.CommonResponse sendRequestAndWaitResponse(String sn, String uuid, Object payload) throws Exception {
|
||||||
// 1. 创建响应结果容器
|
// 1. 创建响应容器并注册
|
||||||
ResponseHolder<KqjEntity.CommonResponse> responseHolder = new ResponseHolder<>(sn);
|
ResponseHolder holder = new ResponseHolder(sn);
|
||||||
responseChannels.put(uuid, responseHolder);
|
responseChannels.put(uuid, holder);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 2. 发送请求
|
// 2. 发送请求
|
||||||
boolean sendSuccess = sendMessageToDevice(sn, uuid, payload);
|
boolean sendSuccess = sendMessageToDevice(sn, uuid, payload);
|
||||||
if (!sendSuccess) {
|
if (!sendSuccess) {
|
||||||
throw new Exception("发送请求失败,设备不在线或发送异常,SN: " + sn);
|
throw new Exception("发送请求失败,设备不在线或连接异常,SN: " + sn);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 等待响应,超时10秒
|
// 3. 等待响应(10秒超时)
|
||||||
return responseHolder.getResultFuture().get(10, TimeUnit.SECONDS);
|
return holder.getResultFuture().get(10, TimeUnit.SECONDS);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
} catch (java.util.concurrent.TimeoutException e) {
|
log.error("等待响应超时,SN: {},UUID: {}", sn, uuid);
|
||||||
log.error("等待响应超时,SN: {}, UUID: {}", sn, uuid);
|
|
||||||
responseChannels.remove(uuid);
|
responseChannels.remove(uuid);
|
||||||
throw new Exception("等待响应超时(10秒)", e);
|
snToUuids.get(sn).remove(uuid);
|
||||||
|
throw new Exception("等待响应超时(10秒),SN: " + sn, e);
|
||||||
} finally {
|
} finally {
|
||||||
// 清理响应通道(防止内存泄漏)
|
// 4. 清理响应通道(防止内存泄漏)
|
||||||
responseChannels.remove(uuid);
|
responseChannels.remove(uuid);
|
||||||
|
if (snToUuids.containsKey(sn)) {
|
||||||
|
snToUuids.get(sn).remove(uuid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 生成带6位随机数的UUID(对应Golang的GenerateUUIDWithSixRandomDigits函数)
|
* 生成带6位随机数的UUID(对应原Golang的GenerateUUIDWithSixRandomDigits)
|
||||||
*/
|
*/
|
||||||
public static String generateUUIDWithSixRandomDigits() {
|
public static String generateUUIDWithSixRandomDigits() {
|
||||||
// 生成标准UUID
|
|
||||||
String uuidStr = UUID.randomUUID().toString().replace("-", "");
|
String uuidStr = UUID.randomUUID().toString().replace("-", "");
|
||||||
// 生成6位随机数(100000-999999)
|
|
||||||
Random random = new Random();
|
Random random = new Random();
|
||||||
int randomNum = random.nextInt(900000) + 100000;
|
int randomNum = random.nextInt(900000) + 100000; // 6位随机数(100000-999999)
|
||||||
// 拼接返回
|
|
||||||
return uuidStr + "-" + randomNum;
|
return uuidStr + "-" + randomNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// ------------------------------ 业务服务调用(模拟Golang的service层) ------------------------------
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 注册设备到业务服务(对应Golang的service.BusAttendanceMachine().Register)
|
* 清理设备关联的所有响应通道(连接关闭时调用)
|
||||||
* 实际项目中替换为真实的Service调用
|
|
||||||
*/
|
*/
|
||||||
private void registerDeviceToService(String sn) {
|
private void cleanDeviceResponseChannels(String sn) {
|
||||||
try {
|
Set<String> uuids = snToUuids.remove(sn);
|
||||||
// 模拟业务服务调用(如更新数据库设备状态为在线)
|
if (uuids == null || uuids.isEmpty()) {
|
||||||
log.info("调用业务服务注册设备,SN: {}", sn);
|
return;
|
||||||
// TODO: 替换为真实的Service代码(如Spring Bean调用)
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("业务服务注册设备失败,SN: {}", sn, e);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
for (String uuid : uuids) {
|
||||||
/**
|
ResponseHolder holder = responseChannels.remove(uuid);
|
||||||
* 更新设备状态(对应Golang的service.BusAttendanceMachine().Change)
|
if (holder != null) {
|
||||||
* 实际项目中替换为真实的Service调用
|
holder.getResultFuture().completeExceptionally(
|
||||||
*/
|
new Exception("设备已离线,响应通道已清理,SN: " + sn)
|
||||||
private void updateDeviceStatus(String sn, String status) {
|
);
|
||||||
try {
|
}
|
||||||
log.info("调用业务服务更新设备状态,SN: {}, 状态: {}", sn, status);
|
|
||||||
// TODO: 替换为真实的Service代码(如Spring Bean调用)
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("业务服务更新设备状态失败,SN: {}", sn, e);
|
|
||||||
}
|
}
|
||||||
|
log.info("清理设备响应通道,SN: {},共清理UUID数量: {}", sn, uuids.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// ------------------------------ 内部辅助类 ------------------------------
|
// ------------------------------ 内部辅助类 ------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 响应结果容器(对应Golang的chan CommonResponse)
|
* 响应结果容器(替代Golang的chan,用CompletableFuture实现异步等待)
|
||||||
* 用CompletableFuture实现异步结果等待
|
|
||||||
*/
|
*/
|
||||||
private static class ResponseHolder<T> {
|
private static class ResponseHolder {
|
||||||
private final String sn; // 关联的设备SN
|
private final String sn; // 关联的设备SN
|
||||||
private final java.util.concurrent.CompletableFuture<T> resultFuture;
|
private final CompletableFuture<KqjEntity.CommonResponse> resultFuture;
|
||||||
|
|
||||||
public ResponseHolder(String sn) {
|
public ResponseHolder(String sn) {
|
||||||
this.sn = sn;
|
this.sn = sn;
|
||||||
this.resultFuture = new java.util.concurrent.CompletableFuture<>();
|
this.resultFuture = new CompletableFuture<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getSn() {
|
public String getSn() {
|
||||||
return sn;
|
return sn;
|
||||||
}
|
}
|
||||||
|
|
||||||
public java.util.concurrent.CompletableFuture<T> getResultFuture() {
|
public CompletableFuture<KqjEntity.CommonResponse> getResultFuture() {
|
||||||
return resultFuture;
|
return resultFuture;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,21 +1,20 @@
|
|||||||
package org.dromara.mobileAttendanceMachine;
|
package org.dromara.mobileAttendanceMachine;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import jakarta.websocket.Session;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
import java.net.http.WebSocket;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @Author 铁憨憨
|
* 实体类:WebSocket消息结构和设备信息
|
||||||
* @Date 2025/10/14 14:47
|
|
||||||
* @Version 1.0
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public class KqjEntity {
|
public class KqjEntity {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设备连接信息(修正conn为Jakarta Session)
|
||||||
|
*/
|
||||||
@Data
|
@Data
|
||||||
public static class DeviceInfo {
|
public static class DeviceInfo {
|
||||||
@JsonProperty("ip")
|
@JsonProperty("ip")
|
||||||
@ -25,14 +24,15 @@ public class KqjEntity {
|
|||||||
private String port;
|
private String port;
|
||||||
|
|
||||||
@JsonProperty("conn")
|
@JsonProperty("conn")
|
||||||
private WebSocket conn;
|
private Session conn; // 关键修正:使用Jakarta WebSocket的Session
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------ 以下为原有实体类,保持不变 ------------------------------
|
||||||
/**
|
/**
|
||||||
* 通用消息结构,用于解析初始的 cmd 字段
|
* 通用消息结构,用于解析初始的 cmd 字段
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class GenericMessage {
|
public static class GenericMessage {
|
||||||
@JsonProperty("cmd")
|
@JsonProperty("cmd")
|
||||||
private String cmd;
|
private String cmd;
|
||||||
}
|
}
|
||||||
@ -41,7 +41,7 @@ public class KqjEntity {
|
|||||||
* 公共响应结构
|
* 公共响应结构
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class CommonResponse {
|
public static class CommonResponse {
|
||||||
@JsonProperty("cmd")
|
@JsonProperty("cmd")
|
||||||
private String cmd;
|
private String cmd;
|
||||||
|
|
||||||
@ -56,12 +56,12 @@ public class KqjEntity {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class CommonResponseData {
|
public static class CommonResponseData {
|
||||||
@JsonProperty("cmd")
|
@JsonProperty("cmd")
|
||||||
private String cmd;
|
private String cmd;
|
||||||
|
|
||||||
@JsonProperty("userIds")
|
@JsonProperty("userIds")
|
||||||
private String[] userIds; // 用户IDS
|
private String[] userIds;
|
||||||
|
|
||||||
@JsonProperty("user_id")
|
@JsonProperty("user_id")
|
||||||
private String userId;
|
private String userId;
|
||||||
@ -77,7 +77,7 @@ public class KqjEntity {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class DelMultiUserData {
|
public static class DelMultiUserData {
|
||||||
@JsonProperty("user_id")
|
@JsonProperty("user_id")
|
||||||
private String userId;
|
private String userId;
|
||||||
|
|
||||||
@ -88,12 +88,11 @@ public class KqjEntity {
|
|||||||
private String msg;
|
private String msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设备上线消息
|
* 设备上线消息
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class DeclareMessage {
|
public static class DeclareMessage {
|
||||||
@JsonProperty("cmd")
|
@JsonProperty("cmd")
|
||||||
private String cmd;
|
private String cmd;
|
||||||
|
|
||||||
@ -134,7 +133,6 @@ public class KqjEntity {
|
|||||||
private PongMessageData data;
|
private PongMessageData data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 心跳回复消息结构
|
* 心跳回复消息结构
|
||||||
*/
|
*/
|
||||||
@ -150,13 +148,13 @@ public class KqjEntity {
|
|||||||
@Data
|
@Data
|
||||||
public class PeopleInformation {
|
public class PeopleInformation {
|
||||||
@JsonProperty("cmd")
|
@JsonProperty("cmd")
|
||||||
private String cmd; // 该接口固定为to_device
|
private String cmd;
|
||||||
|
|
||||||
@JsonProperty("from")
|
@JsonProperty("from")
|
||||||
private String from; // 可不填写,填写uuid来做为发送请求或响应的标识
|
private String from;
|
||||||
|
|
||||||
@JsonProperty("to")
|
@JsonProperty("to")
|
||||||
private String to; // 设备号(请查看公共设置中的设备号)
|
private String to;
|
||||||
|
|
||||||
@JsonProperty("data")
|
@JsonProperty("data")
|
||||||
private PeopleInData data;
|
private PeopleInData data;
|
||||||
@ -174,10 +172,10 @@ public class KqjEntity {
|
|||||||
private String name;
|
private String name;
|
||||||
|
|
||||||
@JsonProperty("face_template")
|
@JsonProperty("face_template")
|
||||||
private String faceTemplate; // http 链接图
|
private String faceTemplate;
|
||||||
|
|
||||||
@JsonProperty("id_valid")
|
@JsonProperty("id_valid")
|
||||||
private String idValid; // 人员有效期(人员在这个时间点后,无法通行)格式:yyyy-MM-dd 或者 yyyy-MM-dd HH:mm,为 "" 则为永久
|
private String idValid;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -207,7 +205,7 @@ public class KqjEntity {
|
|||||||
private String userId;
|
private String userId;
|
||||||
|
|
||||||
@JsonProperty("user_type")
|
@JsonProperty("user_type")
|
||||||
private int userType; // 删除的用户类型:0-人脸接口下发的数据 1-人证比对接口下发的数据
|
private int userType;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -237,7 +235,7 @@ public class KqjEntity {
|
|||||||
private String[] userIds;
|
private String[] userIds;
|
||||||
|
|
||||||
@JsonProperty("user_type")
|
@JsonProperty("user_type")
|
||||||
private int userType; // 删除的用户类型:0-人脸接口下发的数据 1-人证比对接口下发的数据
|
private int userType;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -264,7 +262,7 @@ public class KqjEntity {
|
|||||||
private String cmd;
|
private String cmd;
|
||||||
|
|
||||||
@JsonProperty("user_type")
|
@JsonProperty("user_type")
|
||||||
private int userType; // 删除的用户类型:0-人脸接口下发的数据 1-人证比对接口下发的数据
|
private int userType;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -273,13 +271,13 @@ public class KqjEntity {
|
|||||||
@Data
|
@Data
|
||||||
public class PersonnelInformationAcquisition {
|
public class PersonnelInformationAcquisition {
|
||||||
@JsonProperty("cmd")
|
@JsonProperty("cmd")
|
||||||
private String cmd; // 该接口固定为to_device
|
private String cmd;
|
||||||
|
|
||||||
@JsonProperty("from")
|
@JsonProperty("from")
|
||||||
private String from; // 可不填写,填写uuid来做为发送请求或响应的标识
|
private String from;
|
||||||
|
|
||||||
@JsonProperty("to")
|
@JsonProperty("to")
|
||||||
private String to; // 设备号(请查看公共设置中的设备号)
|
private String to;
|
||||||
|
|
||||||
@JsonProperty("data")
|
@JsonProperty("data")
|
||||||
private PersonnelInformationAcquisitionTwo data;
|
private PersonnelInformationAcquisitionTwo data;
|
||||||
@ -293,5 +291,4 @@ public class KqjEntity {
|
|||||||
@JsonProperty("value")
|
@JsonProperty("value")
|
||||||
private int value;
|
private int value;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,22 +1,13 @@
|
|||||||
package org.dromara.mobileAttendanceMachine;
|
package org.dromara.mobileAttendanceMachine;
|
||||||
|
|
||||||
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
||||||
|
|
||||||
/**
|
|
||||||
* @Author 铁憨憨
|
|
||||||
* @Date 2025/10/14 16:11
|
|
||||||
* @Version 1.0
|
|
||||||
*
|
|
||||||
* 系统启动就会开启ws
|
|
||||||
*/
|
|
||||||
@Configuration
|
@Configuration
|
||||||
public class WebSocketConfig {
|
public class WebSocketConfig {
|
||||||
|
|
||||||
/**
|
|
||||||
* 自动注册所有标注了@ServerEndpoint的WebSocket端点
|
|
||||||
*/
|
|
||||||
@Bean
|
@Bean
|
||||||
public ServerEndpointExporter serverEndpointExporter() {
|
public ServerEndpointExporter serverEndpointExporter() {
|
||||||
return new ServerEndpointExporter();
|
return new ServerEndpointExporter();
|
||||||
|
|||||||
Reference in New Issue
Block a user