diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/websocket/websocket/service/RydwWebSocketServer.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/websocket/websocket/service/RydwWebSocketServer.java new file mode 100644 index 00000000..0bbbea60 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/websocket/websocket/service/RydwWebSocketServer.java @@ -0,0 +1,149 @@ +package org.dromara.websocket.websocket.service;// 路径:com.ruoyi.web.websocket.InitOnStartWebSocketServer + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import jakarta.websocket.*; +import jakarta.websocket.server.ServerEndpoint; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.websocket.utils.WebSocketUtils; +import org.dromara.contractor.domain.SubConstructionUser; +import org.dromara.contractor.service.impl.SubConstructionUserServiceImpl; +import org.dromara.gps.domain.GpsEquipmentSon; +import org.dromara.gps.service.impl.GpsEquipmentSonServiceImpl; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 项目启动即自动启动的 WebSocket 服务端 + * 端点路径:/websocket/init-on-start(可自定义) + */ +@Slf4j +@ServerEndpoint("/websocket/rydw") // 定义 WebSocket 端点路径 +@Component +public class RydwWebSocketServer { + + @Autowired + private SubConstructionUserServiceImpl subConstructionUserService; + @Autowired + private GpsEquipmentSonServiceImpl gpsEquipmentSonService; + + // 2. 静态会话存储(线程安全,项目启动时即初始化) + private static final Map ONLINE_SESSIONS = new ConcurrentHashMap<>(); + + + // 3. 静态代码块:项目启动时执行(初始化资源、打印启动日志) + static { + // 此处可添加启动时的初始化逻辑(如加载配置、连接外部资源等) + log.info("✅ WebSocket 服务端已随项目启动初始化!端点路径:/websocket/rydw"); + } + + /** + * 客户端连接时触发(无需手动启动,有客户端连接时自动调用) + */ + @OnOpen + public void onOpen(Session session) { + // 存储新会话 + ONLINE_SESSIONS.put(session.getId(), session); + log.info("📌 客户端连接成功!会话ID:{},当前在线数:{}", session.getId(), ONLINE_SESSIONS.size()); + // 2. 异步获取并推送初始化数据(避免阻塞连接) + CompletableFuture.runAsync(() -> { + try { + //连接成功过后 获取当前项目下所有成员最新坐标 + Map> params = session.getRequestParameterMap(); + List subscriptionIds = params.get("projectId"); + if (subscriptionIds != null && !subscriptionIds.isEmpty()){ + //拿到所有人员 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(SubConstructionUser::getProjectId, subscriptionIds.getFirst()); + List list = subConstructionUserService.list(queryWrapper); + //拿到所有人员最新坐标 + if (list != null && !list.isEmpty()){ + LambdaQueryWrapper lqw = new LambdaQueryWrapper<>(); + List gpsList = new ArrayList<>(); + for (SubConstructionUser constructionUser : list) { + lqw.clear(); + lqw.eq(GpsEquipmentSon::getUserId, constructionUser.getSysUserId()); + lqw.orderByDesc(GpsEquipmentSon::getCreateTime); + lqw.last("limit 1"); + GpsEquipmentSon one = gpsEquipmentSonService.getOne(lqw); + if (one != null){ + gpsList.add(one); + } + } + if (!gpsList.isEmpty()){ + WebSocketUtils.sendMessage(Long.valueOf(session.getId()), gpsList.toString()); + } + } + } + WebSocketUtils.sendMessage(Long.valueOf(session.getId()), "初始化数据为空"); + } catch (Exception e) { + log.error("会话[{}]初始化数据处理失败", session.getId(), e); + } + }); + } + + /** + * 接收客户端消息 + */ + @OnMessage + public void onMessage(String message, Session session) { + log.info("📥 收到会话[{}]消息:{}", session.getId(), message); + // 可选:回复客户端(示例) + try { + session.getBasicRemote().sendText("服务端已收到消息:" + message); + } catch (IOException e) { + log.error("📤 回复会话[{}]失败:{}", session.getId(), e.getMessage()); + } + } + + /** + * 客户端断开连接 + */ + @OnClose + public void onClose(Session session, CloseReason reason) { + ONLINE_SESSIONS.remove(session.getId()); + log.info("🔌 客户端断开连接!会话ID:{},原因:{},当前在线数:{}", + session.getId(), reason.getReasonPhrase(), ONLINE_SESSIONS.size()); + } + + /** + * 连接异常 + */ + @OnError + public void onError(Session session, Throwable error) { + log.error("⚠️ 会话[{}]异常:{}", session.getId(), error.getMessage(), error); + } + + // ------------------------------ 工具方法(可选,供其他服务调用) ------------------------------ + /** + * 向所有在线客户端发送消息(项目启动后,其他服务可直接调用) + */ + public static void sendToAll(String message) { + if (ONLINE_SESSIONS.isEmpty()) { + log.warn("⚠️ 无在线客户端,无需发送消息"); + return; + } + ONLINE_SESSIONS.values().forEach(session -> { + if (session.isOpen()) { + try { + session.getBasicRemote().sendText(message); + } catch (IOException e) { + log.error("📤 向会话[{}]发送消息失败:{}", session.getId(), e.getMessage()); + } + } + }); + } + + /** + * 获取当前在线数(供外部查询) + */ + public static int getOnlineCount() { + return ONLINE_SESSIONS.size(); + } +}