@ -1,153 +1,161 @@
package org.dromara.mobileAttendanceMachine ;
import com.fasterxml.jackson.databind.DeserializationFeature ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import jakarta.websocket.* ;
import jakarta.websocket.server.ServerEndpoint ;
import lombok.extern.log4j.Log4j2 ;
import org.springframework.stereotype.Component ;
import javax.websocket.* ;
import javax.websocket.server.ServerEndpoint ;
import java.io.IOException ;
import java.net.InetSocketAddress ;
import java.net.http.WebSocket ;
import java.util.Map ;
import java.util.Random ;
import java.util.Set ;
import java.util.UUID ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeoutException ;
import java.util.concurrent.atomic.AtomicReference ;
/**
* WebSocket服务端实现( 对应Golang的HandleWebSocket逻辑 )
* ServerEndpoint注解指定WebSocket连接路径
* WebSocket服务端(设备连接管理、消息处理 )
* 核心逻辑:通过状态标记区分设备注册阶段和正常通信阶段,避免多处理器冲突
*/
@ServerEndpoint ( " /ws/device " )
@ServerEndpoint ( " /custom- ws/device " )
@Component
@Log4j2
public class DeviceWebSocketServer {
// ------------------------------ 常量定义( 对应Golang的const) ------------------------------
public static class Constants {
public static final String DECLARE = " declare " ; // 设备初上线
public static final String PING = " ping " ; // 心跳
public static final Str ing TO_CLIENT = " to_client " ; // 服务器消息下发到客户端的响应
}
// ------------------------------ 常量定义 ------------------------------
public static final String DECLARE = " declare " ; // 设备注册消息
public static final String PING = " ping " ; // 心跳消息
public static final String TO_CLIENT = " to_client " ; // 设备响应消息
private static final int REGISTER_TIMEOUT = 10 ; // 注册超时时间(秒)
// JSON序列化/反序列化工具(单例)
private static final ObjectMapper objectMapper = new ObjectMapper ( ) ;
// 1. 存储所有连接的设备信息( key: 设备SN, value: 设备信息)
// ------------------------------ 全局静态存储 ------------------------------
// JSON序列化工具( 单例)
private static final ObjectMapper objectMapper = new ObjectMapper ( ) . configure ( DeserializationFeature . FAIL_ON_UNKNOWN_PROPERTIES , false ) ; ;
// 设备连接池: key= 设备SN, value= 设备信息( 含Session)
private static final Map < String , KqjEntity . DeviceInfo > connectedDevices = new ConcurrentHashMap < > ( ) ;
// 2. 存储UUID对应的 响应通道( key: UUID, value: 响应结果容器)
private static final Map < String , ResponseHolder < KqjEntity . CommonResponse > > responseChannels = new ConcurrentHashMap < > ( ) ;
// 响应通道: key= UUID, value= 响应结果容器
private static final Map < String , ResponseHolder > responseChannels = new ConcurrentHashMap < > ( ) ;
// 设备-SN反向映射: key=设备SN, value=关联的UUID列表( 用于连接关闭时清理)
private static final Map < String , Set < String > > snToUuids = new ConcurrentHashMap < > ( ) ;
// 当前连接的WebSocket会话
private Session session ;
// 当前连接的设备SN( 连接建立后从DECLARE消息中提取 )
private String currentDeviceSn ;
// ------------------------------ 每个连接的实例变量 ------------------------------
private Session session ; // 当前WebSocket会话
private String currentDeviceSn ; // 当前连接的设备SN( 注册后赋值 )
private volatile boolean isRegistered ; // 注册状态: false=未注册, true=已注册
private CompletableFuture < Boolean > registerFuture ; // 注册阶段的消息Future
// ------------------------------ 连接生命周期方法 ------------------------------
/**
* 连接建立时触发(对应Golang中upgrader.Upgrade后的 初始化逻辑 )
* 连接建立时触发(初始化状态 )
*/
@OnOpen
public void onOpen ( Session session ) {
this . session = session ;
log . info ( " 新的WebSocket连接建立, 会话ID: {} " , session . getId ( ) ) ;
this . isRegistered = false ;
this . registerFuture = new CompletableFuture < > ( ) ; // 初始化注册Future
log . info ( " 新连接建立, 会话ID: {},等待设备注册({}秒超时)... " , session . getId ( ) , REGISTER_TIMEOUT ) ;
// 启动注册超时监听
CompletableFuture . runAsync ( ( ) - > {
try {
// 等待注册消息,超时则关闭连接
if ( ! registerFuture . get ( REGISTER_TIMEOUT , TimeUnit . SECONDS ) ) {
throw new TimeoutException ( " 注册超时 " ) ;
}
} catch ( Exception e ) {
log . error ( " 设备注册超时/失败, 会话ID: {} " , session . getId ( ) , e ) ;
try {
if ( session . isOpen ( ) ) {
session . close ( new CloseReason ( CloseReason . CloseCodes . TRY_AGAIN_LATER , " 注册超时( 请发送DECLARE消息) " ) ) ;
}
} catch ( IOException ex ) {
log . error ( " 关闭超时连接失败 " , ex ) ;
}
}
} ) ;
}
/**
* 接收消息时触发(所有消息统一处理,按状态区分)
*/
@OnMessage
public void onMessage ( String message , Session session ) {
if ( message = = null | | message . isEmpty ( ) ) {
log . warn ( " 会话ID: {} 收到空消息,忽略 " , session . getId ( ) ) ;
return ;
}
try {
// 读取设备第一条消息( DECLARE消息) , 完成设备注册
KqjEntity . Declare Message declare Msg = registerDevice ( ) ;
this . currentDeviceSn = declare Msg. getSn ( ) ;
log . info ( " 设备注册成功, SN: {}, 设备信息: {} " , currentDeviceSn , declareMsg ) ;
// 解析通用消息的CMD( 所有消息必须包含CMD)
KqjEntity . Generic Message generic Msg = objectMapper . readValue ( message , KqjEntity . GenericMessage . class ) ;
String cmd = generic Msg. getCmd ( ) ;
if ( cmd = = null ) {
log . warn ( " 会话ID: {} 收到无CMD消息, 关闭连接: {} " , session . getId ( ) , message ) ;
session . close ( new CloseReason ( CloseReason . CloseCodes . PROTOCOL_ERROR , " 消息缺少cmd字段 " ) ) ;
return ;
}
// 未注册状态: 仅处理DECLARE注册消息
if ( ! isRegistered ) {
handleRegisterStage ( message , cmd ) ;
return ;
}
// 已注册状态:处理正常业务消息
handleNormalStage ( message , cmd ) ;
} catch ( Exception e ) {
log . error ( " 设备注册失败,关闭连接 " , e ) ;
log . error ( " 会话ID: {} 处理消息失败: {} " , session . getId ( ) , message , e ) ;
try {
session . close ( new CloseReason ( CloseReason . CloseCodes . PROTOCOL_ERROR , " 设备注册失败 " ) ) ;
if ( session . isOpen ( ) ) {
session . close ( new CloseReason ( CloseReason . CloseCodes . UNEXPECTED_CONDITION , " 消息处理异常 " ) ) ;
}
} catch ( IOException ex ) {
log . error ( " 关闭异常连接失败 " , ex ) ;
}
}
}
/**
* 接收客户端消息时触发( 对应Golang的for循环读取消息逻辑 )
*/
@OnMessage
public void onMessage ( String message , Session session ) {
if ( message = = null | | message . isEmpty ( ) ) {
log . warn ( " 收到空消息,忽略处理 " ) ;
return ;
}
try {
// 先解析通用消息的CMD字段( 对应Golang的GenericMessage)
KqjEntity . GenericMessage genericMsg = objectMapper . readValue ( message , KqjEntity . GenericMessage . class ) ;
String cmd = genericMsg . getCmd ( ) ;
if ( cmd = = null ) {
log . warn ( " 收到无CMD字段的消息, 忽略: {} " , message ) ;
return ;
}
// 根据CMD类型处理不同逻辑( 对应Golang的switch case)
switch ( cmd ) {
case Constants . DECLARE :
log . info ( " 设备在线心跳( DECLARE) , SN: {} " , currentDeviceSn ) ;
break ;
case Constants . PING :
handlePing ( message ) ;
break ;
case Constants . TO_CLIENT :
handleToClientResponse ( message ) ;
break ;
default :
log . warn ( " 收到未知CMD消息, 类型: {}, 内容: {} " , cmd , message ) ;
}
} catch ( Exception e ) {
log . error ( " 处理消息失败,消息内容: {} " , message , e ) ;
}
}
/**
* 连接关闭时触发( 对应Golang的defer conn.Close()和资源清理逻辑)
* 连接关闭时触发(清理资源 )
*/
@OnClose
public void onClose ( Session session , CloseReason closeReason ) {
log . info ( " WebSocket 连接关闭, 会话ID: {}, 原因: {}" , session . getId ( ) , closeReason ) ;
log . info ( " 连接关闭, 会话ID: {}, 原因: {} " , session . getId ( ) , closeReason ) ;
// 1. 移除设备连接信息
if ( currentDeviceSn ! = null ) {
// 1. 移除设备连接(若已注册)
if ( currentDeviceSn ! = null & & connectedDevices . containsKey ( currentDeviceSn ) ) {
connectedDevices . remove ( currentDeviceSn ) ;
// 更新设备状态为离线( 对应Golang的service.BusAttendanceMachine().Change)
log . info ( " 设备离线, SN: {}, 会话ID: {} " , currentDeviceSn , session . getId ( ) ) ;
// 2. 更新设备状态为离线(业务服务调用)
updateDeviceStatus ( currentDeviceSn , " 0 " ) ;
// 3. 清理该设备关联的响应通道
cleanDeviceResponseChannels ( currentDeviceSn ) ;
}
// 2 . 清理当前设备对应的响应通道(避免内存泄漏)
responseChannels . entrySet ( ) . removeIf ( entry - > {
if ( entry . getValue ( ) . getSn ( ) . equals ( currentDeviceSn ) ) {
entry . getValue ( ) . getResultFuture ( ) . completeExceptionally (
new Exception ( " 设备连接已关闭,响应通道清理 " )
) ;
return true ;
}
return false ;
} ) ;
// 4 . 重置实例变量
this . session = null ;
this . currentDeviceSn = null ;
this . isRegistered = false ;
this . registerFuture = null ;
}
/**
* 连接异常时触发
*/
@OnError
public void onError ( Session session , Throwable throwable ) {
log . error ( " WebSocket 连接异常, 会话ID: {}" , session . getId ( ) , throwable ) ;
// 异常时主动关闭连接
log . error ( " 连接异常, 会话ID: {},异常信息 : {} " , session . getId ( ) , throwable . getMessage ( ) , throwable ) ;
try {
if ( session . isOpen ( ) ) {
session . close ( new CloseReason ( CloseReason . CloseCodes . UNEXPECTED_CONDITION , " 连接异常 " ) ) ;
@ -157,63 +165,175 @@ public class DeviceWebSocketServer {
}
}
// ------------------------------ 核心业务方法 ------------------------------
// ------------------------------ 消息处理核心逻辑 ------------------------------
/**
* 设备注册( 对应Golang的addDevice函数 )
* 读取DECLARE消息, 解析设备信息并存储
* 处理注册阶段消息( 仅接受DECLARE )
*/
private KqjEntity . DeclareMessage registerDevice ( ) throws Exception {
// 阻塞读取第一条消息( DECLARE消息)
String firstMessage = readFirstMessage ( ) ;
KqjEntity . DeclareMessage declareMsg = objectMapper . readValue ( firstMessage , KqjEntity . DeclareMessage . class ) ;
// private void handleRegisterStage(String message, String cmd) throws Exception {
// // 非 DECLARE消息直接拒绝
// if (!DECLARE.equals(cmd)) {
// log.warn("未注册设备发送非法消息, cmd: {},关闭连接", cmd) ;
// session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "请先发送注册消息( cmd=declare) "));
// registerFuture.complete(false); // 标记注册失败
// return;
// }
//
// // 解析DECLARE消息并完成注册
// KqjEntity.DeclareMessage declareMsg = objectMapper.readValue(message, KqjEntity.DeclareMessage.class);
// String sn = declareMsg.getSn();
// if (sn == null || sn.isEmpty()) {
// log.warn("注册消息缺少SN, 关闭连接");
// session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "注册消息缺少sn字段"));
// registerFuture.complete(false);
// return;
// }
//
// // 存储设备信息( IP、端口、Session)
// InetSocketAddress remoteAddr = (InetSocketAddress) session.getUserProperties().get("jakarta.websocket.endpoint.remoteAddress");
// String ip = remoteAddr.getAddress().getHostAddress();
// String port = String.valueOf(remoteAddr.getPort());
//
// KqjEntity.DeviceInfo deviceInfo = new KqjEntity.DeviceInfo();
// deviceInfo.setIp(ip);
// deviceInfo.setPort(port);
// deviceInfo.setConn(session); // 直接存储Jakarta Session, 无类型转换
// connectedDevices.put(sn, deviceInfo);
//
// // 初始化设备-SN反向映射
// snToUuids.computeIfAbsent(sn, k -> ConcurrentHashMap.newKeySet());
//
// // 调用业务服务完成注册
// registerDeviceToService(sn);
//
// // 注册成功:更新状态
// this.currentDeviceSn = sn;
// this.isRegistered = true;
// registerFuture.complete(true); // 标记注册成功
// log.info("设备注册成功, SN: {}, IP: {}:{}, 会话ID: {}", sn, ip, port, session.getId());
// }
// 校验SN合法性
String sn = declareMsg . getSn ( ) ;
if ( sn = = null | | sn . isEmpty ( ) ) {
throw new IllegalArgumentException ( " 设备SN为空, 注册失败 " ) ;
private void handleRegisterStage ( String message , String cmd ) throws Exception {
// 非DECLARE消息直接拒绝
if ( ! DECLARE . equals ( cmd ) ) {
log . warn ( " 未注册设备发送非法消息, cmd: {},关闭连接 " , cmd ) ;
session . close ( new CloseReason ( CloseReason . CloseCodes . PROTOCOL_ERROR , " 请先发送注册消息( cmd=declare) " ) ) ;
registerFuture . complete ( false ) ; // 标记注册失败
return ;
}
// 解析客户端IP和端口( 对应Golang的parseRemoteAddr)
InetSocketAddress remoteAddr = ( InetSocketAddress ) session . getUserProperties ( ) . get ( " javax.websocket.endpoint.remoteAddress " ) ;
String ip = remoteAddr . getAddress ( ) . getHostAddress ( ) ;
String port = String . v alueOf ( remoteAddr . getPort ( ) ) ;
// 解析DECLARE消息并完成注册
KqjEntity . DeclareMessage declareMsg ;
try {
declareMsg = objectMapper . readV alue ( message , KqjEntity . DeclareMessage . class ) ;
} catch ( Exception e ) {
log . error ( " 解析DECLARE消息失败, 原始消息: {} " , message , e ) ;
session . close ( new CloseReason ( CloseReason . CloseCodes . PROTOCOL_ERROR , " DECLARE消息格式错误 " ) ) ;
registerFuture . complete ( false ) ;
return ;
}
// 存储设备信息
String sn = declareMsg . getSn ( ) ;
if ( sn = = null | | sn . isEmpty ( ) ) {
log . warn ( " 注册消息缺少SN, 关闭连接, 消息内容: {} " , message ) ;
session . close ( new CloseReason ( CloseReason . CloseCodes . PROTOCOL_ERROR , " 注册消息缺少sn字段 " ) ) ;
registerFuture . complete ( false ) ;
return ;
}
// 存储设备信息( IP、端口、Session) - 核心优化:兼容多容器,避免空指针
String ip = " unknown " ;
String port = " unknown " ;
InetSocketAddress remoteAddr = null ;
// 尝试多种容器可能的键( 优先级: Jakarta标准 -> Undertow专用 -> Javax兼容)
Object addrObj = session . getUserProperties ( ) . get ( " jakarta.websocket.endpoint.remoteAddress " ) ;
if ( addrObj = = null ) {
addrObj = session . getUserProperties ( ) . get ( " io.undertow.websocket.remoteAddress " ) ; // Undertow专用
}
if ( addrObj = = null ) {
addrObj = session . getUserProperties ( ) . get ( " javax.websocket.endpoint.remoteAddress " ) ; // 兼容旧版
}
// 校验并解析IP和端口
if ( addrObj instanceof InetSocketAddress ) {
remoteAddr = ( InetSocketAddress ) addrObj ;
// 防止getAddress()返回null( 极端情况)
if ( remoteAddr . getAddress ( ) ! = null ) {
ip = remoteAddr . getAddress ( ) . getHostAddress ( ) ;
}
port = String . valueOf ( remoteAddr . getPort ( ) ) ;
log . info ( " 解析设备地址成功, 会话ID: {}, IP: {},端口: {} " , session . getId ( ) , ip , port ) ;
} else {
// 地址获取失败时,使用默认值,不中断注册流程
log . warn ( " 无法解析设备地址, 会话ID: {}, addrObj类型: {} " ,
session . getId ( ) , addrObj = = null ? " null " : addrObj . getClass ( ) . getName ( ) ) ;
log . warn ( " 继续注册流程, 使用默认地址( unknown) " ) ;
}
// 创建设备信息( 即使IP获取失败, 也正常注册)
KqjEntity . DeviceInfo deviceInfo = new KqjEntity . DeviceInfo ( ) ;
deviceInfo . setIp ( ip ) ;
deviceInfo . setPort ( port ) ;
// ======================== 添加类型检查 ========================
// 检查session是否能转换为WebSocket, 避免强转失败
if ( ! ( session instanceof WebSocket ) ) {
throw new IllegalStateException ( " WebSocket容器不支持Session转WebSocket, 无法注册设备, SN: " + sn ) ;
}
// 强转并赋值
deviceInfo . setConn ( ( WebSocket ) session ) ; // 用Session替换Golang的*websocket.Conn
// ============================================================
deviceInfo . setConn ( session ) ;
connectedDevices . put ( sn , deviceInfo ) ;
// 调用业务服务注册设备( 对应Golang的service.BusAttendanceMachine().Register)
// 初始化设备-SN反向映射
snToUuids . computeIfAbsent ( sn , k - > ConcurrentHashMap . newKeySet ( ) ) ;
// 调用业务服务完成注册
registerDeviceToService ( sn ) ;
return declareMsg ;
// 注册成功:更新状态
this . currentDeviceSn = sn ;
this . isRegistered = true ;
registerFuture . complete ( true ) ; // 标记注册成功
log . info ( " 设备注册成功, SN: {}, IP: {}:{}, 会话ID: {} " , sn , ip , port , session . getId ( ) ) ;
}
/**
* 处理PING消息( 对应Golang的handlePing函数 )
* 处理正常通信阶段消息( PING、TO_CLIENT等 )
*/
private void handleNormalStage ( String message , String cmd ) throws Exception {
switch ( cmd ) {
case DECLARE :
// 已注册设备发送DECLARE视为在线心跳
log . info ( " 设备在线心跳, SN: {}, 会话ID: {} " , currentDeviceSn , session . getId ( ) ) ;
break ;
case PING :
// 处理心跳: 回复PONG
handlePing ( message ) ;
break ;
case TO_CLIENT :
// 处理设备响应消息
handleToClientResponse ( message ) ;
break ;
default :
log . warn ( " 收到未知消息类型, SN: {}, cmd: {},内容: {} " , currentDeviceSn , cmd , message ) ;
}
}
// ------------------------------ 业务子逻辑 ------------------------------
/**
* 处理PING心跳消息( 回复PONG)
*/
private void handlePing ( String message ) throws Exception {
// 解析PING消息中的SN( 双重校验)
KqjEntity . DeclareMessage pingMsg = objectMapper . readValue ( message , KqjEntity . DeclareMessage . class ) ;
String sn = pingMsg . getSn ( ) ;
if ( ! currentDeviceSn . equals ( sn ) ) {
log . warn ( " 心跳消息SN不匹配, 当前SN: {}, 消息SN: {} " , currentDeviceSn , sn ) ;
return ;
}
// 1. 回复 PONG消息
// 构造 PONG响应
KqjEntity . PongMessage pongMsg = new KqjEntity . PongMessage ( ) ;
pongMsg . setCmd ( Constants . PING ) ;
pongMsg . setCmd ( " pong " ) ;
pongMsg . setFrom ( " server " ) ;
pongMsg . setTo ( sn ) ;
@ -223,247 +343,185 @@ public class DeviceWebSocketServer {
// 发送PONG消息
session . getBasicRemote ( ) . sendText ( objectMapper . writeValueAsString ( pongMsg ) ) ;
log . info ( " 发送PONG消息给设备 , SN: {} " , sn ) ;
log . info ( " 发送PONG消息, SN: {}, 会话ID: {} " , sn , session . getId ( ) );
// 2. 更新设备连接状态(对应Golang的connectedDevices[sn]重设 )
if ( connectedDevices . containsKey ( sn ) ) {
KqjEntity . DeviceInfo deviceInfo = connectedDevices . get ( sn ) ;
deviceInfo . setConn ( ( WebSocket ) session ) ;
connectedDevices . put ( sn , deviceInfo ) ;
}
// 3. 调用业务服务更新设备状态( 对应Golang的service.BusAttendanceMachine().Register)
// 更新设备连接状态(业务服务调用 )
registerDeviceToService ( sn ) ;
}
/**
* 处理TO_CLIENT响应消息( 对应Golang的requestResponse函数 )
* 处理TO_CLIENT响应消息( 分发到对应的UUID通道 )
*/
private void handleToClientResponse ( String message ) throws Exception {
log . info ( " 收到TO_CLIENT响应消息: {} " , message ) ;
KqjEntity . CommonResponse commonResp = objectMapper . readValue ( message , KqjEntity . CommonResponse . class ) ;
// 根据UUID查找响应通道, 传递响应结果
String uuid = commonResp . getTo ( ) ;
ResponseHolder < KqjEntity . CommonResponse > holder = responseChannels . get ( uuid ) ;
if ( uuid = = null | | uuid . isEmpty ( ) ) {
log . warn ( " 响应消息缺少UUID( to字段) , SN: {},内容: {} " , currentDeviceSn , message ) ;
return ;
}
// 查找响应通道并分发结果
ResponseHolder holder = responseChannels . get ( uuid ) ;
if ( holder ! = null ) {
holder . getResultFuture ( ) . complete ( commonResp ) ;
responseChannels . remove ( uuid ) ; // 移除已完成的通道
log . info ( " 响应已分发到UUID: {}, 响应内容: {} " , uuid , commonResp ) ;
responseChannels . remove ( uuid ) ;
snToUuids . get ( currentDeviceSn ) . remove ( uuid ) ; // 清理反向映射
log . info ( " 响应分发成功, UUID: {}, SN: {} " , uuid , currentDeviceSn ) ;
} else {
log . warn ( " 未找到UUID: {} 对应的响应通道,响应丢弃 " , uuid ) ;
log . warn ( " 未找到UUID对应的响应通道, UUID: {}, SN: {} " , uuid , currentDeviceSn ) ;
}
}
// ------------------------------ 业务服务调用(需替换为真实实现) ------------------------------
/**
* 设备注册到业务服务( 对应原Golang的service.BusAttendanceMachine().Register)
*/
private void registerDeviceToService ( String sn ) {
try {
// TODO: 替换为真实的业务服务调用( 如Spring Bean注入后调用)
log . info ( " 【业务服务】设备注册, SN: {} " , sn ) ;
// 示例: BusAttendanceMachineService.register(sn);
} catch ( Exception e ) {
log . error ( " 【业务服务】设备注册失败, SN: {} " , sn , e ) ;
}
}
/**
* 更新设备状态( 对应原Golang的service.BusAttendanceMachine().Change)
*/
private void updateDeviceStatus ( String sn , String status ) {
try {
// TODO: 替换为真实的业务服务调用
log . info ( " 【业务服务】更新设备状态, SN: {},状态: {} " , sn , status ) ;
// 示例: BusAttendanceMachineService.changeStatus(sn, status);
} catch ( Exception e ) {
log . error ( " 【业务服务】更新设备状态失败, SN: {} " , sn , e ) ;
}
}
// ------------------------------ 工具方法 ------------------------------
/**
* 读取设备第一条消息(阻塞直到收到消息 )
*/
private String readFirstMessage ( ) throws Exception {
// 使用CompletableFuture等待消息
final java . util . concurrent . CompletableFuture < String > firstMsgFuture = new java . util . concurrent . CompletableFuture < > ( ) ;
// 使用AtomicReference包装临时处理器, 解决未初始化问题
final java . util . concurrent . atomic . AtomicReference < MessageHandler . Whole < String > > tempHandlerRef = new java . util . concurrent . atomic . AtomicReference < > ( ) ;
// 定义临时消息处理器
MessageHandler . Whole < String > tempHandler = msg - > {
if ( ! firstMsgFuture . isDone ( ) ) {
firstMsgFuture . complete ( msg ) ;
// 从引用中获取处理器并移除
session . removeMessageHandler ( tempHandlerRef . get ( ) ) ;
}
} ;
// 将处理器存入引用
tempHandlerRef . set ( tempHandler ) ;
// 注册处理器
session . addMessageHandler ( tempHandler ) ;
// 等待消息, 超时10秒
return firstMsgFuture . get ( 10 , TimeUnit . SECONDS ) ;
// // 使用Java并发工具等待消息( 模拟Golang的阻塞读取)
// final java.util.concurrent.CompletableFuture<String> firstMsgFuture = new java.util.concurrent.CompletableFuture<>();
//
// // 临时注册消息处理器,读取第一条消息后移除
// MessageHandler.Whole<String> tempHandler = msg -> {
// if (!firstMsgFuture.isDone()) {
// firstMsgFuture.complete(msg);
// // 移除临时处理器(避免重复处理)
// session.removeMessageHandler(tempHandler);
// }
// };
// session.addMessageHandler(tempHandler);
//
// // 等待消息, 超时10秒( 防止设备一直不发消息)
// return firstMsgFuture.get(10, TimeUnit.SECONDS);
}
/**
* 发送消息给指定设备( 对应Golang的sendMessageToDevice函数)
* 发送消息给指定设备( 对应原Golang的sendMessageToDevice )
*/
public static boolean sendMessageToDevice ( String sn , String uuid , Object message ) {
// 1. 检查设备是否在线
KqjEntity . DeviceInfo deviceInfo = connectedDevices . get ( sn ) ;
if ( deviceInfo = = null ) {
log . warn ( " 设备不存在, SN: {} " , sn ) ;
log . warn ( " 发送消息失败: 设备不存在, SN: {}" , sn ) ;
responseChannels . remove ( uuid ) ;
return false ;
}
// 2. 将WebSocket转回Session( 因为状态由 Session管理)
WebSocket webSocket = deviceInfo . getConn ( ) ;
if ( ! ( webSocket instanceof Session ) ) {
log . warn ( " 设备连接类型错误,无法判断状态 , SN: {}" , sn ) ;
responseChannel s. remove ( uuid ) ;
return false ;
}
Session session = ( Session ) webSocket ;
// 3. 检查连接是否打开
if ( ! session . isOpen ( ) ) {
log . warn ( " 设备连接已关闭, SN: {} " , sn ) ;
connectedDevices . remove ( sn ) ; // 移除已关闭的设备
// 2. 检查 Session是否有效
Session session = deviceInfo . getConn ( ) ;
if ( session = = null | | ! session . isOpen ( ) ) {
log . warn ( " 发送消息失败:设备连接已关闭 , SN: {}" , sn ) ;
connectedDevice s. remove ( sn ) ;
snToUuids . remove ( sn ) ;
responseChannels . remove ( uuid ) ;
return false ;
}
try {
// 4 . 序列化消息 并发送
// 3 . 序列化并发送消息
String msgJson = objectMapper . writeValueAsString ( message ) ;
session . getBasicRemote ( ) . sendText ( msgJson ) ; // 通过Session发送消息
log . info ( " 发送消息给设备, SN: {}, UUID: {}, 消息: {} " , sn , uuid , msgJson ) ;
session . getBasicRemote ( ) . sendText ( msgJson ) ;
// 关联UUID和设备SN( 用于后续清理)
snToUuids . get ( sn ) . add ( uuid ) ;
log . info ( " 发送消息成功, SN: {}, UUID: {},消息: {} " , sn , uuid , msgJson ) ;
return true ;
} catch ( Exception e ) {
log . error ( " 发送消息失败, SN: {}, UUID: {} " , sn , uuid , e ) ;
// 发送失败时移除设备(可能连接已异常)
log . error ( " 发送消息失败, SN: {}, UUID: {} " , sn , uuid , e ) ;
// 清理无效资源
connectedDevices . remove ( sn ) ;
snToUuids . remove ( sn ) ;
responseChannels . remove ( uuid ) ;
return false ;
}
// // 1. 检查设备是否在线
// KqjEntity.DeviceInfo deviceInfo = connectedDevices.get(sn);
// if (deviceInfo == null || !deviceInfo.getSession().isOpen()) {
// log.warn("设备不在线, SN: {}", sn);
// // 清理无效的响应通道
// responseChannels.remove(uuid);
// return false;
// }
//
// try {
// // 2. 序列化消息并发送
// String msgJson = objectMapper.writeValueAsString(message);
// deviceInfo.getSession().getBasicRemote().sendText(msgJson);
// log.info("发送消息给设备, SN: {}, UUID: {}, 消息: {}", sn, uuid, msgJson);
// return true;
//
// } catch (Exception e) {
// log.error("发送消息失败, SN: {}, UUID: {}", sn, uuid, e);
// // 清理异常的响应通道
// responseChannels.remove(uuid);
// return false;
// }
}
/**
* 发送请求并等待响应( 对应Golang的SendRequestAndWaitResponse函数 )
* 发送请求并等待响应(对应原 Golang的SendRequestAndWaitResponse)
*/
public static KqjEntity . CommonResponse sendRequestAndWaitResponse ( String sn , String uuid , Object payload ) throws Exception {
// 1. 创建响应结果 容器
ResponseHolder < KqjEntity . CommonResponse > responseH older = new ResponseHolder < > ( sn ) ;
responseChannels . put ( uuid , responseH older) ;
// 1. 创建响应容器并注册
ResponseHolder h older = new ResponseHolder ( sn ) ;
responseChannels . put ( uuid , h older) ;
try {
// 2. 发送请求
boolean sendSuccess = sendMessageToDevice ( sn , uuid , payload ) ;
if ( ! sendSuccess ) {
throw new Exception ( " 发送请求失败,设备不在线或发送 异常, SN: " + sn ) ;
throw new Exception ( " 发送请求失败,设备不在线或连接 异常, SN: " + sn ) ;
}
// 3. 等待响应, 超时10秒
return responseH older. getResultFuture ( ) . get ( 10 , TimeUnit . SECONDS ) ;
} catch ( java . util . concurrent . TimeoutException e ) {
log . error ( " 等待响应超时, SN: {}, UUID: {} " , sn , uuid ) ;
// 3. 等待响应( 10秒超时)
return h older. getResultFuture ( ) . get ( 10 , TimeUnit . SECONDS ) ;
} catch ( TimeoutException e ) {
log . error ( " 等待响应超时, SN: {}, UUID: {} " , sn , uuid ) ;
responseChannels . remove ( uuid ) ;
throw new Exception ( " 等待响应超时( 10秒) " , e ) ;
snToUuids . get ( sn ) . remove ( uuid ) ;
throw new Exception ( " 等待响应超时( 10秒) , SN: " + sn , e ) ;
} finally {
// 清理响应通道(防止内存泄漏)
// 4. 清理响应通道(防止内存泄漏)
responseChannels . remove ( uuid ) ;
if ( snToUuids . containsKey ( sn ) ) {
snToUuids . get ( sn ) . remove ( uuid ) ;
}
}
}
/**
* 生成带6位随机数的UUID( 对应Golang的GenerateUUIDWithSixRandomDigits函数 )
* 生成带6位随机数的UUID( 对应原 Golang的GenerateUUIDWithSixRandomDigits)
*/
public static String generateUUIDWithSixRandomDigits ( ) {
// 生成标准UUID
String uuidStr = UUID . randomUUID ( ) . toString ( ) . replace ( " - " , " " ) ;
// 生成6位随机数( 100000-999999)
Random random = new Random ( ) ;
int randomNum = random . nextInt ( 900000 ) + 100000 ;
// 拼接返回
int randomNum = random . nextInt ( 900000 ) + 100000 ; // 6位随机数( 100000-999999)
return uuidStr + " - " + randomNum ;
}
// ------------------------------ 业务服务调用( 模拟Golang的service层) ------------------------------
/**
* 注册设备到业务服务( 对应Golang的service.BusAttendanceMachine().Register )
* 实际项目中替换为真实的Service调用
* 清理设备关联的所有响应通道(连接关闭时调用 )
*/
private void registerDeviceToService ( String sn ) {
try {
// 模拟业务服务调用(如更新数据库设备状态为在线)
log . info ( " 调用业务服务注册设备, SN: {} " , sn ) ;
// TODO: 替换为真实的Service代码( 如Spring Bean调用)
} catch ( Exception e ) {
log . error ( " 业务服务注册设备失败, SN: {} " , sn , e ) ;
private void cleanDeviceResponseChannels ( String sn ) {
Set < String > uuids = snToUuids . remove ( sn ) ;
if ( uuids = = null | | uuids . isEmpty ( ) ) {
return ;
}
}
/**
* 更新设备状态( 对应Golang的service.BusAttendanceMachine().Change)
* 实际项目中替换为真实的Service调用
*/
private void updateDeviceStatus ( String sn , String status ) {
try {
log . info ( " 调用业务服务更新设备状态, SN: {}, 状态: {} " , sn , status ) ;
// TODO: 替换为真实的Service代码( 如Spring Bean调用)
} catch ( Exception e ) {
log . error ( " 业务服务更新设备状态失败, SN: {} " , sn , e ) ;
for ( String uuid : uuids ) {
ResponseHolder holder = responseChannels . remove ( uuid ) ;
if ( holder ! = null ) {
holder . getResultFuture ( ) . completeExceptionally (
new Exception ( " 设备已离线, 响应通道已清理, SN: " + sn )
) ;
}
}
log . info ( " 清理设备响应通道, SN: {}, 共清理UUID数量: {} " , sn , uuids . size ( ) ) ;
}
// ------------------------------ 内部辅助类 ------------------------------
/**
* 响应结果容器(对应 Golang的chan CommonResponse )
* 用CompletableFuture实现异步结果等待
* 响应结果容器(替代 Golang的chan,用 CompletableFuture实现异步等待 )
*/
private static class ResponseHolder < T > {
private static class ResponseHolder {
private final String sn ; // 关联的设备SN
private final java . util . concurrent . CompletableFuture < T > resultFuture ;
private final CompletableFuture < KqjEntity . CommonResponse > resultFuture ;
public ResponseHolder ( String sn ) {
this . sn = sn ;
this . resultFuture = new java . util . concurrent . CompletableFuture< > ( ) ;
this . resultFuture = new CompletableFuture < > ( ) ;
}
public String getSn ( ) {
return sn ;
}
public java . util . concurrent . CompletableFuture < T > getResultFuture ( ) {
public CompletableFuture < KqjEntity . CommonResponse > getResultFuture ( ) {
return resultFuture ;
}
}