From b88a92b7e1c7a03cd370e808d282c640df1891a9 Mon Sep 17 00:00:00 2001 From: dfdg <2710245601@qq.com> Date: Fri, 31 Oct 2025 20:08:44 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=B9ue=E5=BC=80=E5=8F=91=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application.yml | 1 + .../dromara/bigscreen/config/AsyncConfig.java | 43 +++++ .../bigscreen/config/WebSocketConfig.java | 20 ++ .../listener/RedisMessageListener.java | 68 ++----- .../manager/RedisSubscribeManager.java | 92 +++++++-- .../service/IAsyncMessageHandlerService.java | 14 ++ .../impl/AsyncMessageHandlerServiceImpl.java | 180 ++++++++++++++++++ .../impl/InitOnStartWebSocketServer.java | 110 +++++++++++ .../dromara/drone/domain/DroProjectDrone.java | 12 ++ .../drone/domain/bo/DroProjectDroneBo.java | 13 ++ .../drone/domain/vo/DroProjectDroneVo.java | 13 +- .../org/dromara/gps/domain/GpsEquipment.java | 5 + .../dromara/gps/domain/bo/GpsEquipmentBo.java | 5 + .../dromara/gps/domain/vo/GpsEquipmentVo.java | 5 + .../service/impl/GpsEquipmentServiceImpl.java | 97 +++++++--- 15 files changed, 586 insertions(+), 92 deletions(-) create mode 100644 xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/config/AsyncConfig.java create mode 100644 xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/config/WebSocketConfig.java create mode 100644 xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/IAsyncMessageHandlerService.java create mode 100644 xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/impl/AsyncMessageHandlerServiceImpl.java create mode 100644 xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/impl/InitOnStartWebSocketServer.java diff --git a/xinnengyuan/ruoyi-admin/src/main/resources/application.yml b/xinnengyuan/ruoyi-admin/src/main/resources/application.yml index f384688c..33f043de 100644 --- a/xinnengyuan/ruoyi-admin/src/main/resources/application.yml +++ b/xinnengyuan/ruoyi-admin/src/main/resources/application.yml @@ -134,6 +134,7 @@ security: # todo 仅测试 - /facility/matrix/** - /hat/device/data + - /websocket/ue # 多租户配置 tenant: diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/config/AsyncConfig.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/config/AsyncConfig.java new file mode 100644 index 00000000..516931a3 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/config/AsyncConfig.java @@ -0,0 +1,43 @@ +package org.dromara.bigscreen.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 无人机消息redis订阅专用线程池 + */ +@Configuration // 标记为配置类,让Spring扫描加载 +@EnableAsync // 启用Spring异步功能(若项目已在启动类加过,此处可省略,但加上更稳妥) +public class AsyncConfig { + + /** + * 定义名为 "messageAsyncExecutor" 的线程池 Bean(与 @Async("messageAsyncExecutor") 对应) + */ + @Bean(name = "messageAsyncExecutor") // Bean名称必须是 "messageAsyncExecutor",大小写敏感 + public Executor messageAsyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + // 1. 核心线程数:根据CPU核心数或业务量调整(示例:5个) + executor.setCorePoolSize(5); + // 2. 最大线程数:避免线程过多导致资源耗尽(示例:10个) + executor.setMaxPoolSize(10); + // 3. 任务队列容量:缓冲待处理的异步任务(示例:50个) + executor.setQueueCapacity(50); + // 4. 线程名前缀:便于日志排查(格式:message-async-1, message-async-2...) + executor.setThreadNamePrefix("message-async-"); + // 5. 线程空闲时间:超过60秒空闲则销毁(释放资源) + executor.setKeepAliveSeconds(60); + // 6. 任务拒绝策略:队列满时,由提交任务的线程临时执行(避免消息丢失) + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + + // 7. 初始化线程池(必须调用,否则线程池不会生效) + executor.initialize(); + + return executor; + } +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/config/WebSocketConfig.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/config/WebSocketConfig.java new file mode 100644 index 00000000..03787b59 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/config/WebSocketConfig.java @@ -0,0 +1,20 @@ +package org.dromara.bigscreen.config;// 路径:com.ruoyi.framework.config.WebSocketConfig +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +/** + * WebSocket 配置类:确保 @ServerEndpoint 端点随项目启动注册 + */ +@Configuration +public class WebSocketConfig { + + /** + * 注册 WebSocket 端点处理器 + * 作用:Spring 启动时自动扫描并初始化 @ServerEndpoint 注解的类 + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/listener/RedisMessageListener.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/listener/RedisMessageListener.java index 90b6ab33..c2b37f9d 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/listener/RedisMessageListener.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/listener/RedisMessageListener.java @@ -5,7 +5,11 @@ import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.dromara.bigscreen.service.IAsyncMessageHandlerService; +import org.dromara.bigscreen.service.impl.InitOnStartWebSocketServer; import org.dromara.common.websocket.dto.WebSocketMessageDto; +import org.dromara.common.websocket.holder.WebSocketSessionHolder; import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.drone.domain.DroProjectDrone; import org.dromara.drone.service.IDroProjectDroneService; @@ -17,25 +21,18 @@ import org.springframework.stereotype.Component; import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * Redis消息监听器,用于处理订阅频道收到的消息 */ +@Slf4j @Component public class RedisMessageListener implements MessageListener { - @Lazy - private final StringRedisTemplate stringRedisTemplate; - + // 注入异步消息处理服务 @Resource - @Lazy - private IDroProjectDroneService droProjectDroneService; - - // 构造函数注入StringRedisTemplate - public RedisMessageListener(StringRedisTemplate stringRedisTemplate) { - this.stringRedisTemplate = stringRedisTemplate; - } - + private IAsyncMessageHandlerService asyncMessageHandlerService; /** * 处理接收到的消息 * @param message 消息对象 @@ -43,50 +40,19 @@ public class RedisMessageListener implements MessageListener { */ @Override public void onMessage(Message message, byte[] pattern) { - // 处理消息 -// System.out.println("返回:"+stringRedisTemplate.getStringSerializer().deserialize(message.getBody())); - String gateway = JSONUtil.parseObj(stringRedisTemplate.getStringSerializer().deserialize(message.getBody())).getStr("gateway"); - String key = ""; - if (JSONUtil.parseObj(stringRedisTemplate.getStringSerializer().deserialize(message.getBody())).getJSONObject("data").get("job_number") != null) { - key = "wrj:osd1:"+gateway; - }else if (JSONUtil.parseObj(stringRedisTemplate.getStringSerializer().deserialize(message.getBody())).getJSONObject("data").get("wireless_link") != null) { - key = "wrj:osd2:"+gateway; - }else if (JSONUtil.parseObj(stringRedisTemplate.getStringSerializer().deserialize(message.getBody())).getJSONObject("data").get("network_state") != null) { - key = "wrj:osd3:"+gateway; - DroProjectDrone droProjectDrone = droProjectDroneService.getBaseMapper().selectOne(new LambdaQueryWrapper().eq(DroProjectDrone::getDroneSn, gateway)); - setWs(message, gateway, droProjectDrone); - }else{ - key = "wrj:osd4:"+gateway; - DroProjectDrone droProjectDrone = droProjectDroneService.getBaseMapper().selectOne(new LambdaQueryWrapper().eq(DroProjectDrone::getDroneSn, gateway)); - setWs(message, gateway, droProjectDrone); + try { + // 1. 快速日志记录(证明监听到消息) + log.info("【Redis消息监听】收到消息,长度:{}字节,提交异步处理", message.getBody().length); + // 2. 提交给异步服务处理(核心:线程分离,监听线程立即返回) + asyncMessageHandlerService.handleRedisMessageAsync(message); + + } catch (Exception e) { + // 捕获提交过程中的异常(如异步服务注入失败) + log.error("【Redis消息监听】提交异步处理失败", e); } - stringRedisTemplate - .opsForValue() - .set(key - , Objects.requireNonNull(stringRedisTemplate.getStringSerializer().deserialize(message.getBody()))); } - private void setWs(Message message, String gateway, DroProjectDrone droProjectDrone) { - String pushContent = buildPushMessage(gateway,stringRedisTemplate.getStringSerializer().deserialize(message.getBody()), droProjectDrone.getProjectId()); - // 发送给指定用户(equipment.getUserId()) - WebSocketMessageDto messageDto = new WebSocketMessageDto(); - messageDto.setMessage(pushContent); - messageDto.setSessionKeys(Collections.singletonList(droProjectDrone.getProjectId())); - WebSocketUtils.publishMessage(messageDto); - } - private String buildPushMessage(String key, String message, Long projectId) { - JSONObject messageObj = new JSONObject(); - messageObj.put("type", "wrj_DATA_UPDATE"); - messageObj.put("projectId",projectId.toString()); - messageObj.put("clientId",key); - // 位置信息 - JSONObject locationObj = new JSONObject(); - locationObj.put("latitude", JSONUtil.parseObj(message).getJSONObject("data").get("latitude").toString()); // 纬度 - locationObj.put("longitude", JSONUtil.parseObj(message).getJSONObject("data").get("longitude").toString()); // 经度 - messageObj.put("location", locationObj); - return messageObj.toString(); - } } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/manager/RedisSubscribeManager.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/manager/RedisSubscribeManager.java index 795b448f..9ebf6d6b 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/manager/RedisSubscribeManager.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/manager/RedisSubscribeManager.java @@ -1,9 +1,13 @@ package org.dromara.bigscreen.manager; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.dromara.bigscreen.service.impl.InitOnStartWebSocketServer; import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.drone.domain.DroProjectDrone; import org.dromara.drone.service.IDroProjectDroneService; import org.springframework.context.annotation.Lazy; import org.springframework.data.redis.listener.PatternTopic; @@ -49,7 +53,7 @@ public class RedisSubscribeManager { public void initSubscribe() { log.info("项目启动,初始化Redis订阅..."); // 步骤1:从数据库获取最新的主题列表(原逻辑:getTopicsByKeyPrefix) - List latestKeys = droProjectDroneService.getTopicsByKeyPrefix(); + List latestKeys = droProjectDroneService.getBaseMapper().selectList(new LambdaQueryWrapper().groupBy(DroProjectDrone::getDroneSn)); if (latestKeys == null || latestKeys.isEmpty()) { log.warn("未获取到任何主题,将取消所有现有订阅"); cancelAllSubscribes(); @@ -58,11 +62,13 @@ public class RedisSubscribeManager { // 步骤2:构建最新的完整主题(格式:wrj:key) Set latestFullTopics = new HashSet<>(); - for (String key : latestKeys) { - latestFullTopics.add("wrj:osd1:" + key); - latestFullTopics.add("wrj:osd2:" + key); - latestFullTopics.add("wrj:osd3:" + key); - latestFullTopics.add("wrj:osd4:" + key); + for (DroProjectDrone key : latestKeys) { + latestFullTopics.add("wrj:osd1:" + key.getDroneSn()); + latestFullTopics.add("wrj:osd2:" + key.getDroneSn()); + latestFullTopics.add("wrj:osd3:" + key.getDroneSn()); + if (key.getAirplaneSn() != null && !key.getAirplaneSn().isEmpty()) { + latestFullTopics.add("wrj:osd4:" + key.getAirplaneSn()); + } } @@ -77,7 +83,7 @@ public class RedisSubscribeManager { * 4. 定时任务:定期更新订阅(每5分钟执行一次,可调整cron表达式) * cron格式:秒 分 时 日 月 周 年(示例:0 0/5 * * * ? 表示每5分钟) */ - @Scheduled(cron = "0 0/6 * * * ?") +// @Scheduled(cron = "0 0/6 * * * ?") public void dynamicUpdateSubscribe() { try { Object object = RedisUtils.getCacheObject("xmjdap:ws"); @@ -94,7 +100,7 @@ public class RedisSubscribeManager { return; } // 步骤1:从数据库获取最新的主题列表(原逻辑:getTopicsByKeyPrefix) - List latestKeys = droProjectDroneService.getTopicsByKeyPrefix(); + List latestKeys = droProjectDroneService.getBaseMapper().selectList(new LambdaQueryWrapper().groupBy(DroProjectDrone::getDroneSn)); if (latestKeys == null || latestKeys.isEmpty()) { log.warn("定时任务未获取到任何主题,将取消所有现有订阅"); cancelAllSubscribes(); @@ -103,11 +109,67 @@ public class RedisSubscribeManager { // 步骤2:构建最新的完整主题(格式:wrj:key) Set latestFullTopics = new HashSet<>(); - for (String key : latestKeys) { - latestFullTopics.add("wrj:osd1:" + key); - latestFullTopics.add("wrj:osd2:" + key); - latestFullTopics.add("wrj:osd3:" + key); - latestFullTopics.add("wrj:osd4:" + key); + for (DroProjectDrone key : latestKeys) { + latestFullTopics.add("wrj:osd1:" + key.getDroneSn()); + latestFullTopics.add("wrj:osd2:" + key.getDroneSn()); + latestFullTopics.add("wrj:osd3:" + key.getDroneSn()); + if (key.getAirplaneSn() != null && !key.getAirplaneSn().isEmpty()) { + latestFullTopics.add("wrj:osd4:" + key.getAirplaneSn()); + } + } + + // 步骤3:对比现有订阅,删除过期主题 + cancelExpiredSubscribes(latestFullTopics); + + // 步骤4:对比现有订阅,新增未订阅的主题 + addNewSubscribes(latestFullTopics); + + log.info("Redis订阅更新完成,当前已订阅主题数:{}", subscribedTopics.size()); + } catch (Exception e) { + log.error("Redis订阅更新定时任务执行失败", e); + // 异常时不修改现有订阅,避免误删 + } + } + + +// @Scheduled(cron = "0/10 * * * * ?") + @JobExecutor(name = "ueWsConnect") + public void ueWsConnect() { + try { + int onlineCount = InitOnStartWebSocketServer.getOnlineCount(); + Object object = RedisUtils.getCacheObject("xmjdap:ws"); + log.info("开始执行Redis订阅更新定时任务..."); + if (onlineCount == 0 && object == null) { + cancelAllSubscribes(); + return; + } + if (object != null) { + long oldTime = Long.parseLong(String.valueOf(object)); + long now = System.currentTimeMillis(); + if (now-oldTime > 300000) { + RedisUtils.deleteObject("xmjdap:ws"); + cancelAllSubscribes(); + return; + } + log.info("Redis时间缓存更新定时任务"); + } + // 步骤1:从数据库获取最新的主题列表(原逻辑:getTopicsByKeyPrefix) + List latestKeys = droProjectDroneService.getBaseMapper().selectList(new LambdaQueryWrapper().groupBy(DroProjectDrone::getDroneSn)); + if (latestKeys == null || latestKeys.isEmpty()) { + log.warn("定时任务未获取到任何主题,将取消所有现有订阅"); + cancelAllSubscribes(); + return; + } + + // 步骤2:构建最新的完整主题(格式:wrj:key) + Set latestFullTopics = new HashSet<>(); + for (DroProjectDrone key : latestKeys) { + latestFullTopics.add("wrj:osd1:" + key.getDroneSn()); + latestFullTopics.add("wrj:osd2:" + key.getDroneSn()); + latestFullTopics.add("wrj:osd3:" + key.getDroneSn()); + if (key.getAirplaneSn() != null && !key.getAirplaneSn().isEmpty()) { + latestFullTopics.add("wrj:osd4:" + key.getAirplaneSn()); + } } // 步骤3:对比现有订阅,删除过期主题 @@ -126,7 +188,7 @@ public class RedisSubscribeManager { /** * 取消过期的订阅(现有订阅不在最新列表中的主题) */ - private void cancelExpiredSubscribes(Set latestFullTopics) { + public void cancelExpiredSubscribes(Set latestFullTopics) { // 遍历现有订阅,找出需要删除的主题 Set expiredTopics = subscribedTopics.keySet().stream() .filter(topic -> !latestFullTopics.contains(topic)) @@ -152,7 +214,7 @@ public class RedisSubscribeManager { /** * 新增未订阅的主题(最新列表中有但现有订阅没有的主题) */ - private void addNewSubscribes(Set latestFullTopics) { + public void addNewSubscribes(Set latestFullTopics) { // 遍历最新主题,找出需要新增的主题 Set newTopics = latestFullTopics.stream() .filter(topic -> !subscribedTopics.containsKey(topic)) diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/IAsyncMessageHandlerService.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/IAsyncMessageHandlerService.java new file mode 100644 index 00000000..f820d06e --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/IAsyncMessageHandlerService.java @@ -0,0 +1,14 @@ +package org.dromara.bigscreen.service; + +import org.springframework.data.redis.connection.Message; + +/** + * 异步消息处理服务接口(扩展原有接口,增加Redis消息处理方法) + */ +public interface IAsyncMessageHandlerService { + /** + * 异步处理Redis消息(核心:接收Message对象,处理业务逻辑) + * @param message Redis消息对象 + */ + void handleRedisMessageAsync(Message message); +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/impl/AsyncMessageHandlerServiceImpl.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/impl/AsyncMessageHandlerServiceImpl.java new file mode 100644 index 00000000..2a2971d2 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/impl/AsyncMessageHandlerServiceImpl.java @@ -0,0 +1,180 @@ +package org.dromara.bigscreen.service.impl; + +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.dromara.bigscreen.service.IAsyncMessageHandlerService; +import org.dromara.common.websocket.dto.WebSocketMessageDto; +import org.dromara.common.websocket.holder.WebSocketSessionHolder; +import org.dromara.common.websocket.utils.WebSocketUtils; +import org.dromara.drone.domain.DroProjectDrone; +import org.dromara.drone.mapper.DroProjectDroneMapper; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +/** + * 异步消息处理实现(核心:原onMessage逻辑迁移至此) + */ +@Slf4j +@Service +public class AsyncMessageHandlerServiceImpl implements IAsyncMessageHandlerService { + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private DroProjectDroneMapper droProjectDroneMapper; // 直接注入Mapper,减少Service层调用开销 + + /** + * 异步处理消息(指定线程池:messageAsyncExecutor) + */ + @Override + @Async("messageAsyncExecutor") + public void handleRedisMessageAsync(Message message) { + try { + // 1. 解析消息体(只解析一次,避免重复操作) + byte[] messageBody = message.getBody(); + if (messageBody == null || messageBody.length == 0) { + log.warn("【异步消息处理】消息体为空,忽略处理"); + return; + } + String messageContent = stringRedisTemplate.getStringSerializer().deserialize(messageBody); + if (messageContent == null) { + log.warn("【异步消息处理】消息反序列化失败,内容:{}", new String(messageBody)); + return; + } + + // 2. 解析JSON(使用框架兼容的JSONUtil,处理可能的解析异常) + JSONObject messageJson; + try { + messageJson = JSONUtil.parseObj(messageContent); + } catch (Exception e) { + log.error("【异步消息处理】JSON解析失败,内容:{}", messageContent, e); + return; + } + + // 3. 获取gateway字段(核心业务参数) + String gateway = messageJson.getStr("gateway"); + if (gateway == null) { + log.warn("【异步消息处理】消息缺少gateway字段,内容:{}", messageContent); + return; + } + + // 4. 解析data字段,判断key类型 + JSONObject dataJson = messageJson.getJSONObject("data"); + if (dataJson == null) { + log.warn("【异步消息处理】消息缺少data字段,gateway:{}", gateway); + return; + } + + String key = ""; + DroProjectDrone droProjectDrone = null; + boolean isOsd4 = false; + + // 5. 根据data字段内容判断key类型,并查询无人机信息 + if (dataJson.get("job_number") != null) { + key = "wrj:osd1:" + gateway; + } else if (dataJson.get("wireless_link") != null) { + key = "wrj:osd2:" + gateway; + } else if (dataJson.get("network_state") != null) { + key = "wrj:osd3:" + gateway; + // 查询无人机信息(使用LambdaQueryWrapper,框架规范) + droProjectDrone = droProjectDroneMapper.selectOne( + new LambdaQueryWrapper() + .eq(DroProjectDrone::getDroneSn, gateway) + ); + log.info("【异步消息处理】osd3类型消息,gateway:{}", gateway); + // 调用setWs方法(非osd4类型) + setWs(messageContent, gateway, droProjectDrone, false); + } else { + key = "wrj:osd4:" + gateway; + droProjectDrone = droProjectDroneMapper.selectOne( + new LambdaQueryWrapper() + .eq(DroProjectDrone::getAirplaneSn, gateway) + ); + log.info("【异步消息处理】osd4类型消息,gateway:{}", gateway); + isOsd4 = true; + // 调用setWs方法(osd4类型) + setWs(messageContent, gateway, droProjectDrone, true); + } + + // 6. 存储消息到Redis(使用Objects.requireNonNull避免空指针) + stringRedisTemplate.opsForValue().set(key, Objects.requireNonNull(messageContent)); + log.info("【异步消息处理】Redis存储成功,key:{}", key); + + } catch (Exception e) { + // 捕获所有异常,避免线程终止 + log.error("【异步消息处理】整体逻辑异常", e); + } + } + + private void setWs(String message, String gateway, DroProjectDrone droProjectDrone, boolean b) { + //判断是否有连接 + Set sessionsAll = WebSocketSessionHolder.getSessionsAll(); + if (!sessionsAll.isEmpty()) { + String pushContent = buildPushMessage(gateway, message, droProjectDrone.getProjectId()); + // 发送给指定用户(equipment.getUserId()) + WebSocketMessageDto messageDto = new WebSocketMessageDto(); + messageDto.setMessage(pushContent); + messageDto.setSessionKeys(Collections.singletonList(droProjectDrone.getProjectId())); + WebSocketUtils.publishMessage(messageDto); + System.out.println("大屏已推送消息"); + } + int onlineCount = InitOnStartWebSocketServer.getOnlineCount(); + if (onlineCount > 0){ + String modleId = b ? droProjectDrone.getAirplaneModleId() : droProjectDrone.getDroneModleId(); + if (modleId != null) { + String ued = ueStructureJsonMessage(message, modleId); + InitOnStartWebSocketServer.sendToAll(ued); + System.out.println("ue已推送消息"); + } + } + + } + + private String buildPushMessage(String key, String message, Long projectId) { + JSONObject messageObj = new JSONObject(); + messageObj.put("type", "wrj_DATA_UPDATE"); + messageObj.put("projectId",projectId.toString()); + messageObj.put("clientId",key); + // 位置信息 + JSONObject locationObj = new JSONObject(); + locationObj.put("latitude", JSONUtil.parseObj(message).getJSONObject("data").get("latitude").toString()); // 纬度 + locationObj.put("longitude", JSONUtil.parseObj(message).getJSONObject("data").get("longitude").toString()); // 经度 + messageObj.put("location", locationObj); + return messageObj.toString(); + } + + + /** + * 构建推送消息内容(String类型) + */ + private String ueStructureJsonMessage(String message, String modelId) { + // 构造消息对象(包含关键信息) + JSONObject messageObj = new JSONObject(); + messageObj.put("type", "location"); // 消息类型 + + JSONObject data = new JSONObject(); + data.put("id", modelId); + + // 位置信息 + JSONObject position = new JSONObject(); + position.put("lat", JSONUtil.parseObj(message).getJSONObject("data").get("latitude").toString()); // 纬度 + position.put("lng", JSONUtil.parseObj(message).getJSONObject("data").get("longitude").toString()); // 经度 + position.put("alt", JSONUtil.parseObj(message).getJSONObject("data").get("height").toString()); // 海拔 + + data.put("position", position); + messageObj.put("data", data); // 设备唯一标识 + + // 转换为String类型返回 + return messageObj.toString(); + } +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/impl/InitOnStartWebSocketServer.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/impl/InitOnStartWebSocketServer.java new file mode 100644 index 00000000..c4148da7 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/bigscreen/service/impl/InitOnStartWebSocketServer.java @@ -0,0 +1,110 @@ +package org.dromara.bigscreen.service.impl;// 路径:com.ruoyi.web.websocket.InitOnStartWebSocketServer +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import jakarta.annotation.Resource; +import jakarta.websocket.*; +import jakarta.websocket.server.ServerEndpoint; +import lombok.extern.slf4j.Slf4j; +import org.dromara.bigscreen.manager.RedisSubscribeManager; +import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.drone.domain.DroProjectDrone; +import org.dromara.drone.service.IDroProjectDroneService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 项目启动即自动启动的 WebSocket 服务端 + * 端点路径:/websocket/init-on-start(可自定义) + */ +@Slf4j +@ServerEndpoint("/websocket/ue") // 定义 WebSocket 端点路径 +@Component // 交给 Spring 管理,确保启动时被扫描 +public class InitOnStartWebSocketServer { + + // 2. 静态会话存储(线程安全,项目启动时即初始化) + private static final Map ONLINE_SESSIONS = new ConcurrentHashMap<>(); + + // 3. 静态代码块:项目启动时执行(初始化资源、打印启动日志) + static { + // 此处可添加启动时的初始化逻辑(如加载配置、连接外部资源等) + log.info("✅ WebSocket 服务端已随项目启动初始化!端点路径:/websocket/ue"); + } + + /** + * 客户端连接时触发(无需手动启动,有客户端连接时自动调用) + */ + @OnOpen + public void onOpen(Session session) { + // 存储新会话 + ONLINE_SESSIONS.put(session.getId(), session); +// RedisUtils.setCacheObject("xmjdap:ws",System.currentTimeMillis() ); + log.info("📌 客户端连接成功!会话ID:{},当前在线数:{}", session.getId(), ONLINE_SESSIONS.size()); + } + + /** + * 接收客户端消息 + */ + @OnMessage + public void onMessage(String message, Session session) { + log.info("📥 收到会话[{}]消息:{}", session.getId(), message); + // 可选:回复客户端(示例) + try { + session.getBasicRemote().sendText("服务端已收到消息:" + message); + } catch (IOException e) { + log.error("📤 回复会话[{}]失败:{}", session.getId(), e.getMessage()); + } + } + + /** + * 客户端断开连接 + */ + @OnClose + public void onClose(Session session, CloseReason reason) { + ONLINE_SESSIONS.remove(session.getId()); + log.info("🔌 客户端断开连接!会话ID:{},原因:{},当前在线数:{}", + session.getId(), reason.getReasonPhrase(), ONLINE_SESSIONS.size()); + } + + /** + * 连接异常 + */ + @OnError + public void onError(Session session, Throwable error) { + log.error("⚠️ 会话[{}]异常:{}", session.getId(), error.getMessage(), error); + } + + // ------------------------------ 工具方法(可选,供其他服务调用) ------------------------------ + /** + * 向所有在线客户端发送消息(项目启动后,其他服务可直接调用) + */ + public static void sendToAll(String message) { + if (ONLINE_SESSIONS.isEmpty()) { + log.warn("⚠️ 无在线客户端,无需发送消息"); + return; + } + ONLINE_SESSIONS.values().forEach(session -> { + if (session.isOpen()) { + try { + session.getBasicRemote().sendText(message); + } catch (IOException e) { + log.error("📤 向会话[{}]发送消息失败:{}", session.getId(), e.getMessage()); + } + } + }); + } + + /** + * 获取当前在线数(供外部查询) + */ + public static int getOnlineCount() { + return ONLINE_SESSIONS.size(); + } +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/DroProjectDrone.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/DroProjectDrone.java index 8d1b8197..5adb5be2 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/DroProjectDrone.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/DroProjectDrone.java @@ -40,6 +40,18 @@ public class DroProjectDrone implements Serializable { * 备注 */ private String remark; + /** + * 飞机sn + */ + private String airplaneSn; + /** + * 无人机机场模型id + */ + private String droneModleId; + /** + * 飞机模型id + */ + private String airplaneModleId; } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/bo/DroProjectDroneBo.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/bo/DroProjectDroneBo.java index 2e024892..0aa329d5 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/bo/DroProjectDroneBo.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/bo/DroProjectDroneBo.java @@ -43,5 +43,18 @@ public class DroProjectDroneBo extends BaseEntity { */ private String remark; + /** + * 飞机sn + */ + private String airplaneSn; + /** + * 无人机机场模型id + */ + private String droneModleId; + /** + * 飞机模型id + */ + private String airplaneModleId; + } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/vo/DroProjectDroneVo.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/vo/DroProjectDroneVo.java index 4d770816..b86b54eb 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/vo/DroProjectDroneVo.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/drone/domain/vo/DroProjectDroneVo.java @@ -51,6 +51,17 @@ public class DroProjectDroneVo implements Serializable { */ @ExcelProperty(value = "备注") private String remark; - + /** + * 飞机sn + */ + private String airplaneSn; + /** + * 无人机机场模型id + */ + private String droneModleId; + /** + * 飞机模型id + */ + private String airplaneModleId; } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/GpsEquipment.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/GpsEquipment.java index e2c06703..e2e72882 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/GpsEquipment.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/GpsEquipment.java @@ -88,5 +88,10 @@ public class GpsEquipment extends BaseEntity { */ private Integer gpsType; + /** + * 模型id + */ + private String modelId; + } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/bo/GpsEquipmentBo.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/bo/GpsEquipmentBo.java index 10b7c178..3b0bd5ba 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/bo/GpsEquipmentBo.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/bo/GpsEquipmentBo.java @@ -87,4 +87,9 @@ public class GpsEquipmentBo extends BaseEntity { */ private Integer gpsType; + /** + * 模型id + */ + private String modelId; + } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/vo/GpsEquipmentVo.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/vo/GpsEquipmentVo.java index 0430b240..bd4d835b 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/vo/GpsEquipmentVo.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/domain/vo/GpsEquipmentVo.java @@ -106,5 +106,10 @@ public class GpsEquipmentVo implements Serializable { */ private Integer gpsType; + /** + * 模型id + */ + private String modelId; + } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/service/impl/GpsEquipmentServiceImpl.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/service/impl/GpsEquipmentServiceImpl.java index 98e54505..5ffc9937 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/service/impl/GpsEquipmentServiceImpl.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/gps/service/impl/GpsEquipmentServiceImpl.java @@ -6,6 +6,7 @@ import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; +import org.dromara.bigscreen.service.impl.InitOnStartWebSocketServer; import org.dromara.common.core.exception.ServiceException; import org.dromara.common.core.utils.MapstructUtils; import org.dromara.common.core.utils.StringUtils; @@ -17,6 +18,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.RequiredArgsConstructor; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.websocket.dto.WebSocketMessageDto; +import org.dromara.common.websocket.holder.WebSocketSessionHolder; import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.gps.domain.GpsManmachine; import org.dromara.gps.domain.bo.GpsEquipmentSonBo; @@ -103,7 +105,7 @@ public class GpsEquipmentServiceImpl extends ServiceImpl buildQueryWrapper(GpsEquipmentBo bo) { Map params = bo.getParams(); LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); - lqw.orderByDesc(GpsEquipment::getId); + lqw.orderByDesc(GpsEquipment::getCreateTime); lqw.eq(bo.getGpsType() != null, GpsEquipment::getGpsType, bo.getGpsType()); lqw.eq(bo.getProjectId() != null, GpsEquipment::getProjectId, bo.getProjectId()); lqw.eq(bo.getUserId() != null, GpsEquipment::getUserId, bo.getUserId()); @@ -245,19 +247,30 @@ public class GpsEquipmentServiceImpl extends ServiceImpl 0){ + if (equipment != null && StringUtils.isNotEmpty(equipment.getModelId())) { + String ued = ueStructureJsonMessage(gpsEquipmentSon, equipment.getModelId()); + InitOnStartWebSocketServer.sendToAll(ued); + } + } + Set sessionsAll = WebSocketSessionHolder.getSessionsAll(); + if (!sessionsAll.isEmpty()) { + String pushContent = buildPushMessage(gpsEquipmentSon); // WebSocketUtils.publishAll(pushContent); - // 2. 发布消息(根据是否有用户ID决定发送给指定用户或广播) - if (equipment != null && equipment.getProjectId() != null) { - // 发送给指定用户(equipment.getUserId()) - WebSocketMessageDto messageDto = new WebSocketMessageDto(); - messageDto.setMessage(pushContent); - messageDto.setSessionKeys(Collections.singletonList(equipment.getProjectId())); - WebSocketUtils.publishMessage(messageDto); - } else { - // 无用户ID则广播给所有在线客户端 - WebSocketUtils.publishAll(pushContent); + // 2. 发布消息(根据是否有用户ID决定发送给指定用户或广播) + if (equipment != null && equipment.getProjectId() != null) { + // 发送给指定用户(equipment.getUserId()) + WebSocketMessageDto messageDto = new WebSocketMessageDto(); + messageDto.setMessage(pushContent); + messageDto.setSessionKeys(Collections.singletonList(equipment.getProjectId())); + WebSocketUtils.publishMessage(messageDto); + } else { + // 无用户ID则广播给所有在线客户端 + WebSocketUtils.publishAll(pushContent); + } } } // 保存到Redis并设置过期监听 @@ -286,6 +299,30 @@ public class GpsEquipmentServiceImpl extends ServiceImpl 0){ + if (equipment != null && StringUtils.isNotEmpty(equipment.getModelId())) { + String ued = ueStructureJsonMessage(gpsEquipmentSonBo, equipment.getModelId()); + InitOnStartWebSocketServer.sendToAll(ued); + } + } + Set sessionsAll = WebSocketSessionHolder.getSessionsAll(); + if (!sessionsAll.isEmpty()) { + String pushContent = buildPushMessage(gpsEquipmentSonBo); // WebSocketUtils.publishAll(pushContent); - // 2. 发布消息(根据是否有用户ID决定发送给指定用户或广播) - if (equipment != null && equipment.getProjectId() != null) { - // 发送给指定用户(equipment.getUserId()) - WebSocketMessageDto messageDto = new WebSocketMessageDto(); - messageDto.setMessage(pushContent); - messageDto.setSessionKeys(Collections.singletonList(equipment.getProjectId())); - WebSocketUtils.publishMessage(messageDto); - } else { - // 无用户ID则广播给所有在线客户端 - WebSocketUtils.publishAll(pushContent); + // 2. 发布消息(根据是否有用户ID决定发送给指定用户或广播) + if (equipment != null && equipment.getProjectId() != null) { + // 发送给指定用户(equipment.getUserId()) + WebSocketMessageDto messageDto = new WebSocketMessageDto(); + messageDto.setMessage(pushContent); + messageDto.setSessionKeys(Collections.singletonList(equipment.getProjectId())); + WebSocketUtils.publishMessage(messageDto); + } else { + // 无用户ID则广播给所有在线客户端 + WebSocketUtils.publishAll(pushContent); + } } // 保存到Redis并设置过期监听 updateDeviceAliveStatus(gpsEquipment.getClientId());