diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/websocket/websocket/service/BigScreenWebSocketServer.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/websocket/websocket/service/BigScreenWebSocketServer.java new file mode 100644 index 00000000..7ad43336 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/websocket/websocket/service/BigScreenWebSocketServer.java @@ -0,0 +1,229 @@ +package org.dromara.websocket.websocket.service; + +import cn.hutool.json.JSONUtil; +import jakarta.websocket.*; +import jakarta.websocket.server.ServerEndpoint; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.utils.SpringUtils; +import org.dromara.gps.domain.bo.GpsEquipmentSonBo; +import org.dromara.gps.domain.vo.GpsEquipmentSonVo; +import org.dromara.gps.service.IGpsEquipmentSonService; +import org.dromara.websocket.websocket.domain.vo.VehicleVo; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 大屏 WebSocket 服务端(支持订阅消息) + * 端点路径:/websocket/bigScreen + */ +@Slf4j +@ServerEndpoint("/websocket/bigScreen") // 客户端连接时需携带订阅ID参数:ws://xxx/websocket/vehicle?subscriptionId=xxx +@Component +public class BigScreenWebSocketServer { + + // 1. 存储所有在线会话(sessionId -> Session) + private static final Map ONLINE_SESSIONS = new ConcurrentHashMap<>(); + + // 2. 核心:订阅ID与会话的映射(subscriptionId -> Session) + private static final Map SUBSCRIPTION_SESSIONS = new ConcurrentHashMap<>(); + + // 3. 反向映射:会话ID与订阅ID的映射(用于断开连接时清理订阅关系) + private static final Map SESSION_TO_SUBSCRIPTION = new ConcurrentHashMap<>(); + + // 当前会话对应的订阅ID(每个连接实例的专属变量) + private String currentSubscriptionId; + + + static { + log.info("✅ 大屏 WebSocket 服务端已随项目启动初始化!端点路径:/websocket/bigScreen"); + } + + /** + * 客户端连接时触发(解析订阅ID并建立映射关系) + */ + @OnOpen + public void onOpen(Session session) { + // 从连接URL的查询参数中获取订阅ID(客户端连接格式:ws://xxx/websocket/bigScreen?subscriptionId=123) + Map> params = session.getRequestParameterMap(); + List subscriptionIds = params.get("subscriptionId"); + if (subscriptionIds != null && !subscriptionIds.isEmpty()) { + this.currentSubscriptionId = subscriptionIds.get(0); // 取第一个订阅ID + // 建立映射关系 + SUBSCRIPTION_SESSIONS.put(currentSubscriptionId, session); + SESSION_TO_SUBSCRIPTION.put(session.getId(), currentSubscriptionId); + log.info("📌 客户端订阅成功!订阅ID:{},会话ID:{},当前订阅数:{}", + currentSubscriptionId, session.getId(), SUBSCRIPTION_SESSIONS.size()); + } else { + log.warn("📌 客户端连接未携带订阅ID!会话ID:{}", session.getId()); + } + + // 存储会话到在线列表 + ONLINE_SESSIONS.put(session.getId(), session); + log.info("📌 客户端连接成功!会话ID:{},当前在线数:{}", session.getId(), ONLINE_SESSIONS.size()); + + // 异步推送初始化数据(原有逻辑保留) + CompletableFuture.runAsync(() -> { + try { + String[] split = currentSubscriptionId.split("-"); + //todo 填充不同类型大屏获取基础数据的方法判断 + IGpsEquipmentSonService service = SpringUtils.getBean(IGpsEquipmentSonService.class); + GpsEquipmentSonBo bo = new GpsEquipmentSonBo(); + bo.setUserId(Long.parseLong(split[0])); + bo.setTripId(Long.parseLong(split[1])); + List list = service.getNewVehicleList(bo); + if (list == null || list.isEmpty()) { + session.getBasicRemote().sendText("初始化数据为空"); + log.warn("会话[{}]未获取到初始化数据", session.getId()); + return; + } + List vehicleVos = new ArrayList<>(); + for (GpsEquipmentSonVo ueClient : list) { + VehicleVo vo = new VehicleVo(); + vo.setLocLatitude(ueClient.getLocLatitude()); + vo.setLocLongitude(ueClient.getLocLongitude()); + vehicleVos.add(vo); + } + session.getBasicRemote().sendText(JSONUtil.toJsonStr(vehicleVos)); + log.info("📤 已向会话[{}]推送初始化数据,长度:{}字节", session.getId(), vehicleVos.size()); + } catch (Exception e) { + log.error("会话[{}]初始化数据处理失败", session.getId(), e); + try { + if (session.isOpen()) { + session.getBasicRemote().sendText("初始化失败:" + e.getMessage()); + } + } catch (IOException ex) { + log.error("会话[{}]推送错误信息失败", session.getId(), ex); + } + } + }); + } + + /** + * 接收客户端消息 + */ + @OnMessage + public void onMessage(String message, Session session) { + log.info("📥 收到会话[{}](订阅ID:{})消息:{}", session.getId(), currentSubscriptionId, message); + // 可选:回复客户端 + //todo 填充不同类型大屏获取数据的方法判断 + + try { + session.getBasicRemote().sendText("服务端已收到消息:" + message); + } catch (IOException e) { + log.error("📤 回复会话[{}]失败:{}", session.getId(), e.getMessage()); + } + } + + /** + * 客户端断开连接(清理订阅关系) + */ + @OnClose + public void onClose(Session session, CloseReason reason) { + // 1. 移除在线会话 + ONLINE_SESSIONS.remove(session.getId()); + // 2. 清理订阅关系 + String subscriptionId = SESSION_TO_SUBSCRIPTION.get(session.getId()); + if (subscriptionId != null) { + SUBSCRIPTION_SESSIONS.remove(subscriptionId); + SESSION_TO_SUBSCRIPTION.remove(session.getId()); + log.info("🔌 客户端订阅关系已清除!订阅ID:{},会话ID:{}", subscriptionId, session.getId()); + } + log.info("🔌 客户端断开连接!会话ID:{},原因:{},当前在线数:{},当前订阅数:{}", + session.getId(), reason.getReasonPhrase(), + ONLINE_SESSIONS.size(), SUBSCRIPTION_SESSIONS.size()); + } + + /** + * 连接异常 + */ + @OnError + public void onError(Session session, Throwable error) { + log.error("⚠️ 会话[{}](订阅ID:{})异常:{}", session.getId(), currentSubscriptionId, error.getMessage(), error); + } + + + // ------------------------------ 订阅消息发送工具方法(供外部调用) ------------------------------ + + /** + * 向指定订阅ID的客户端发送消息 + * @param subscriptionId 订阅ID + * @param message 消息内容 + * @return 是否发送成功 + */ + public static boolean sendToSubscription(String subscriptionId, String message) { + if (subscriptionId == null || message == null) { + log.warn("⚠️ 订阅ID或消息为空,发送失败"); + return false; + } + // 从订阅映射中获取目标会话 + Session session = SUBSCRIPTION_SESSIONS.get(subscriptionId); + if (session == null || !session.isOpen()) { + log.warn("⚠️ 订阅ID[{}]对应的客户端未连接或已断开", subscriptionId); + return false; + } + // 发送消息 + try { + session.getBasicRemote().sendText(message); + log.info("📤 已向订阅ID[{}]发送消息:{}", subscriptionId, message); + return true; + } catch (IOException e) { + log.error("📤 向订阅ID[{}]发送消息失败", subscriptionId, e); + return false; + } + } + + /** + * 向所有订阅客户端广播消息 + * @param message 消息内容 + */ + public static void broadcastToAllSubscriptions(String message) { + if (SUBSCRIPTION_SESSIONS.isEmpty()) { + log.warn("⚠️ 无订阅客户端,无需广播消息"); + return; + } + SUBSCRIPTION_SESSIONS.forEach((subscriptionId, session) -> { + if (session.isOpen()) { + try { + session.getBasicRemote().sendText(message); + log.info("📤 已向订阅ID[{}]广播消息", subscriptionId); + } catch (IOException e) { + log.error("📤 向订阅ID[{}]广播消息失败", subscriptionId, e); + } + } + }); + } + + /** + * 获取当前订阅数 + */ + public static int getSubscriptionCount() { + return SUBSCRIPTION_SESSIONS.size(); + } + + // 原有工具方法保留 + public static void sendToAll(String message) { + if (ONLINE_SESSIONS.isEmpty()) { + log.warn("⚠️ 无在线客户端,无需发送消息"); + return; + } + ONLINE_SESSIONS.values().forEach(session -> { + if (session.isOpen()) { + try { + session.getBasicRemote().sendText(message); + } catch (IOException e) { + log.error("📤 向会话[{}]发送消息失败:{}", session.getId(), e.getMessage()); + } + } + }); + } + + public static int getOnlineCount() { + return ONLINE_SESSIONS.size(); + } +}