对ue开发接口

This commit is contained in:
2025-10-31 20:08:44 +08:00
parent fb72063369
commit b88a92b7e1
15 changed files with 586 additions and 92 deletions

View File

@ -134,6 +134,7 @@ security:
# todo 仅测试
- /facility/matrix/**
- /hat/device/data
- /websocket/ue
# 多租户配置
tenant:

View File

@ -0,0 +1,43 @@
package org.dromara.bigscreen.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 无人机消息redis订阅专用线程池
*/
@Configuration // 标记为配置类让Spring扫描加载
@EnableAsync // 启用Spring异步功能若项目已在启动类加过此处可省略但加上更稳妥
public class AsyncConfig {
/**
* 定义名为 "messageAsyncExecutor" 的线程池 Bean与 @Async("messageAsyncExecutor") 对应)
*/
@Bean(name = "messageAsyncExecutor") // Bean名称必须是 "messageAsyncExecutor",大小写敏感
public Executor messageAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 1. 核心线程数根据CPU核心数或业务量调整示例5个
executor.setCorePoolSize(5);
// 2. 最大线程数避免线程过多导致资源耗尽示例10个
executor.setMaxPoolSize(10);
// 3. 任务队列容量缓冲待处理的异步任务示例50个
executor.setQueueCapacity(50);
// 4. 线程名前缀便于日志排查格式message-async-1, message-async-2...
executor.setThreadNamePrefix("message-async-");
// 5. 线程空闲时间超过60秒空闲则销毁释放资源
executor.setKeepAliveSeconds(60);
// 6. 任务拒绝策略:队列满时,由提交任务的线程临时执行(避免消息丢失)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 7. 初始化线程池(必须调用,否则线程池不会生效)
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,20 @@
package org.dromara.bigscreen.config;// 路径com.ruoyi.framework.config.WebSocketConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* WebSocket 配置类:确保 @ServerEndpoint 端点随项目启动注册
*/
@Configuration
public class WebSocketConfig {
/**
* 注册 WebSocket 端点处理器
* 作用Spring 启动时自动扫描并初始化 @ServerEndpoint 注解的类
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

View File

@ -5,7 +5,11 @@ import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.bigscreen.service.IAsyncMessageHandlerService;
import org.dromara.bigscreen.service.impl.InitOnStartWebSocketServer;
import org.dromara.common.websocket.dto.WebSocketMessageDto;
import org.dromara.common.websocket.holder.WebSocketSessionHolder;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.drone.domain.DroProjectDrone;
import org.dromara.drone.service.IDroProjectDroneService;
@ -17,25 +21,18 @@ import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
/**
* Redis消息监听器用于处理订阅频道收到的消息
*/
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {
@Lazy
private final StringRedisTemplate stringRedisTemplate;
// 注入异步消息处理服务
@Resource
@Lazy
private IDroProjectDroneService droProjectDroneService;
// 构造函数注入StringRedisTemplate
public RedisMessageListener(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
private IAsyncMessageHandlerService asyncMessageHandlerService;
/**
* 处理接收到的消息
* @param message 消息对象
@ -43,50 +40,19 @@ public class RedisMessageListener implements MessageListener {
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 处理消息
// System.out.println("返回:"+stringRedisTemplate.getStringSerializer().deserialize(message.getBody()));
String gateway = JSONUtil.parseObj(stringRedisTemplate.getStringSerializer().deserialize(message.getBody())).getStr("gateway");
String key = "";
if (JSONUtil.parseObj(stringRedisTemplate.getStringSerializer().deserialize(message.getBody())).getJSONObject("data").get("job_number") != null) {
key = "wrj:osd1:"+gateway;
}else if (JSONUtil.parseObj(stringRedisTemplate.getStringSerializer().deserialize(message.getBody())).getJSONObject("data").get("wireless_link") != null) {
key = "wrj:osd2:"+gateway;
}else if (JSONUtil.parseObj(stringRedisTemplate.getStringSerializer().deserialize(message.getBody())).getJSONObject("data").get("network_state") != null) {
key = "wrj:osd3:"+gateway;
DroProjectDrone droProjectDrone = droProjectDroneService.getBaseMapper().selectOne(new LambdaQueryWrapper<DroProjectDrone>().eq(DroProjectDrone::getDroneSn, gateway));
setWs(message, gateway, droProjectDrone);
}else{
key = "wrj:osd4:"+gateway;
DroProjectDrone droProjectDrone = droProjectDroneService.getBaseMapper().selectOne(new LambdaQueryWrapper<DroProjectDrone>().eq(DroProjectDrone::getDroneSn, gateway));
setWs(message, gateway, droProjectDrone);
try {
// 1. 快速日志记录(证明监听到消息)
log.info("【Redis消息监听】收到消息长度{}字节,提交异步处理", message.getBody().length);
// 2. 提交给异步服务处理(核心:线程分离,监听线程立即返回)
asyncMessageHandlerService.handleRedisMessageAsync(message);
} catch (Exception e) {
// 捕获提交过程中的异常(如异步服务注入失败)
log.error("【Redis消息监听】提交异步处理失败", e);
}
stringRedisTemplate
.opsForValue()
.set(key
, Objects.requireNonNull(stringRedisTemplate.getStringSerializer().deserialize(message.getBody())));
}
private void setWs(Message message, String gateway, DroProjectDrone droProjectDrone) {
String pushContent = buildPushMessage(gateway,stringRedisTemplate.getStringSerializer().deserialize(message.getBody()), droProjectDrone.getProjectId());
// 发送给指定用户equipment.getUserId()
WebSocketMessageDto messageDto = new WebSocketMessageDto();
messageDto.setMessage(pushContent);
messageDto.setSessionKeys(Collections.singletonList(droProjectDrone.getProjectId()));
WebSocketUtils.publishMessage(messageDto);
}
private String buildPushMessage(String key, String message, Long projectId) {
JSONObject messageObj = new JSONObject();
messageObj.put("type", "wrj_DATA_UPDATE");
messageObj.put("projectId",projectId.toString());
messageObj.put("clientId",key);
// 位置信息
JSONObject locationObj = new JSONObject();
locationObj.put("latitude", JSONUtil.parseObj(message).getJSONObject("data").get("latitude").toString()); // 纬度
locationObj.put("longitude", JSONUtil.parseObj(message).getJSONObject("data").get("longitude").toString()); // 经度
messageObj.put("location", locationObj);
return messageObj.toString();
}
}

View File

@ -1,9 +1,13 @@
package org.dromara.bigscreen.manager;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.bigscreen.service.impl.InitOnStartWebSocketServer;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.drone.domain.DroProjectDrone;
import org.dromara.drone.service.IDroProjectDroneService;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.listener.PatternTopic;
@ -49,7 +53,7 @@ public class RedisSubscribeManager {
public void initSubscribe() {
log.info("项目启动初始化Redis订阅...");
// 步骤1从数据库获取最新的主题列表原逻辑getTopicsByKeyPrefix
List<String> latestKeys = droProjectDroneService.getTopicsByKeyPrefix();
List<DroProjectDrone> latestKeys = droProjectDroneService.getBaseMapper().selectList(new LambdaQueryWrapper<DroProjectDrone>().groupBy(DroProjectDrone::getDroneSn));
if (latestKeys == null || latestKeys.isEmpty()) {
log.warn("未获取到任何主题,将取消所有现有订阅");
cancelAllSubscribes();
@ -58,11 +62,13 @@ public class RedisSubscribeManager {
// 步骤2构建最新的完整主题格式wrj:key
Set<String> latestFullTopics = new HashSet<>();
for (String key : latestKeys) {
latestFullTopics.add("wrj:osd1:" + key);
latestFullTopics.add("wrj:osd2:" + key);
latestFullTopics.add("wrj:osd3:" + key);
latestFullTopics.add("wrj:osd4:" + key);
for (DroProjectDrone key : latestKeys) {
latestFullTopics.add("wrj:osd1:" + key.getDroneSn());
latestFullTopics.add("wrj:osd2:" + key.getDroneSn());
latestFullTopics.add("wrj:osd3:" + key.getDroneSn());
if (key.getAirplaneSn() != null && !key.getAirplaneSn().isEmpty()) {
latestFullTopics.add("wrj:osd4:" + key.getAirplaneSn());
}
}
@ -77,7 +83,7 @@ public class RedisSubscribeManager {
* 4. 定时任务定期更新订阅每5分钟执行一次可调整cron表达式
* cron格式秒 分 时 日 月 周 年示例0 0/5 * * * ? 表示每5分钟
*/
@Scheduled(cron = "0 0/6 * * * ?")
// @Scheduled(cron = "0 0/6 * * * ?")
public void dynamicUpdateSubscribe() {
try {
Object object = RedisUtils.getCacheObject("xmjdap:ws");
@ -94,7 +100,7 @@ public class RedisSubscribeManager {
return;
}
// 步骤1从数据库获取最新的主题列表原逻辑getTopicsByKeyPrefix
List<String> latestKeys = droProjectDroneService.getTopicsByKeyPrefix();
List<DroProjectDrone> latestKeys = droProjectDroneService.getBaseMapper().selectList(new LambdaQueryWrapper<DroProjectDrone>().groupBy(DroProjectDrone::getDroneSn));
if (latestKeys == null || latestKeys.isEmpty()) {
log.warn("定时任务未获取到任何主题,将取消所有现有订阅");
cancelAllSubscribes();
@ -103,11 +109,67 @@ public class RedisSubscribeManager {
// 步骤2构建最新的完整主题格式wrj:key
Set<String> latestFullTopics = new HashSet<>();
for (String key : latestKeys) {
latestFullTopics.add("wrj:osd1:" + key);
latestFullTopics.add("wrj:osd2:" + key);
latestFullTopics.add("wrj:osd3:" + key);
latestFullTopics.add("wrj:osd4:" + key);
for (DroProjectDrone key : latestKeys) {
latestFullTopics.add("wrj:osd1:" + key.getDroneSn());
latestFullTopics.add("wrj:osd2:" + key.getDroneSn());
latestFullTopics.add("wrj:osd3:" + key.getDroneSn());
if (key.getAirplaneSn() != null && !key.getAirplaneSn().isEmpty()) {
latestFullTopics.add("wrj:osd4:" + key.getAirplaneSn());
}
}
// 步骤3对比现有订阅删除过期主题
cancelExpiredSubscribes(latestFullTopics);
// 步骤4对比现有订阅新增未订阅的主题
addNewSubscribes(latestFullTopics);
log.info("Redis订阅更新完成当前已订阅主题数{}", subscribedTopics.size());
} catch (Exception e) {
log.error("Redis订阅更新定时任务执行失败", e);
// 异常时不修改现有订阅,避免误删
}
}
// @Scheduled(cron = "0/10 * * * * ?")
@JobExecutor(name = "ueWsConnect")
public void ueWsConnect() {
try {
int onlineCount = InitOnStartWebSocketServer.getOnlineCount();
Object object = RedisUtils.getCacheObject("xmjdap:ws");
log.info("开始执行Redis订阅更新定时任务...");
if (onlineCount == 0 && object == null) {
cancelAllSubscribes();
return;
}
if (object != null) {
long oldTime = Long.parseLong(String.valueOf(object));
long now = System.currentTimeMillis();
if (now-oldTime > 300000) {
RedisUtils.deleteObject("xmjdap:ws");
cancelAllSubscribes();
return;
}
log.info("Redis时间缓存更新定时任务");
}
// 步骤1从数据库获取最新的主题列表原逻辑getTopicsByKeyPrefix
List<DroProjectDrone> latestKeys = droProjectDroneService.getBaseMapper().selectList(new LambdaQueryWrapper<DroProjectDrone>().groupBy(DroProjectDrone::getDroneSn));
if (latestKeys == null || latestKeys.isEmpty()) {
log.warn("定时任务未获取到任何主题,将取消所有现有订阅");
cancelAllSubscribes();
return;
}
// 步骤2构建最新的完整主题格式wrj:key
Set<String> latestFullTopics = new HashSet<>();
for (DroProjectDrone key : latestKeys) {
latestFullTopics.add("wrj:osd1:" + key.getDroneSn());
latestFullTopics.add("wrj:osd2:" + key.getDroneSn());
latestFullTopics.add("wrj:osd3:" + key.getDroneSn());
if (key.getAirplaneSn() != null && !key.getAirplaneSn().isEmpty()) {
latestFullTopics.add("wrj:osd4:" + key.getAirplaneSn());
}
}
// 步骤3对比现有订阅删除过期主题
@ -126,7 +188,7 @@ public class RedisSubscribeManager {
/**
* 取消过期的订阅(现有订阅不在最新列表中的主题)
*/
private void cancelExpiredSubscribes(Set<String> latestFullTopics) {
public void cancelExpiredSubscribes(Set<String> latestFullTopics) {
// 遍历现有订阅,找出需要删除的主题
Set<String> expiredTopics = subscribedTopics.keySet().stream()
.filter(topic -> !latestFullTopics.contains(topic))
@ -152,7 +214,7 @@ public class RedisSubscribeManager {
/**
* 新增未订阅的主题(最新列表中有但现有订阅没有的主题)
*/
private void addNewSubscribes(Set<String> latestFullTopics) {
public void addNewSubscribes(Set<String> latestFullTopics) {
// 遍历最新主题,找出需要新增的主题
Set<String> newTopics = latestFullTopics.stream()
.filter(topic -> !subscribedTopics.containsKey(topic))

View File

@ -0,0 +1,14 @@
package org.dromara.bigscreen.service;
import org.springframework.data.redis.connection.Message;
/**
* 异步消息处理服务接口扩展原有接口增加Redis消息处理方法
*/
public interface IAsyncMessageHandlerService {
/**
* 异步处理Redis消息核心接收Message对象处理业务逻辑
* @param message Redis消息对象
*/
void handleRedisMessageAsync(Message message);
}

View File

@ -0,0 +1,180 @@
package org.dromara.bigscreen.service.impl;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.bigscreen.service.IAsyncMessageHandlerService;
import org.dromara.common.websocket.dto.WebSocketMessageDto;
import org.dromara.common.websocket.holder.WebSocketSessionHolder;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.drone.domain.DroProjectDrone;
import org.dromara.drone.mapper.DroProjectDroneMapper;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
/**
* 异步消息处理实现核心原onMessage逻辑迁移至此
*/
@Slf4j
@Service
public class AsyncMessageHandlerServiceImpl implements IAsyncMessageHandlerService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private DroProjectDroneMapper droProjectDroneMapper; // 直接注入Mapper减少Service层调用开销
/**
* 异步处理消息指定线程池messageAsyncExecutor
*/
@Override
@Async("messageAsyncExecutor")
public void handleRedisMessageAsync(Message message) {
try {
// 1. 解析消息体(只解析一次,避免重复操作)
byte[] messageBody = message.getBody();
if (messageBody == null || messageBody.length == 0) {
log.warn("【异步消息处理】消息体为空,忽略处理");
return;
}
String messageContent = stringRedisTemplate.getStringSerializer().deserialize(messageBody);
if (messageContent == null) {
log.warn("【异步消息处理】消息反序列化失败,内容:{}", new String(messageBody));
return;
}
// 2. 解析JSON使用框架兼容的JSONUtil处理可能的解析异常
JSONObject messageJson;
try {
messageJson = JSONUtil.parseObj(messageContent);
} catch (Exception e) {
log.error("【异步消息处理】JSON解析失败内容{}", messageContent, e);
return;
}
// 3. 获取gateway字段核心业务参数
String gateway = messageJson.getStr("gateway");
if (gateway == null) {
log.warn("【异步消息处理】消息缺少gateway字段内容{}", messageContent);
return;
}
// 4. 解析data字段判断key类型
JSONObject dataJson = messageJson.getJSONObject("data");
if (dataJson == null) {
log.warn("【异步消息处理】消息缺少data字段gateway{}", gateway);
return;
}
String key = "";
DroProjectDrone droProjectDrone = null;
boolean isOsd4 = false;
// 5. 根据data字段内容判断key类型并查询无人机信息
if (dataJson.get("job_number") != null) {
key = "wrj:osd1:" + gateway;
} else if (dataJson.get("wireless_link") != null) {
key = "wrj:osd2:" + gateway;
} else if (dataJson.get("network_state") != null) {
key = "wrj:osd3:" + gateway;
// 查询无人机信息使用LambdaQueryWrapper框架规范
droProjectDrone = droProjectDroneMapper.selectOne(
new LambdaQueryWrapper<DroProjectDrone>()
.eq(DroProjectDrone::getDroneSn, gateway)
);
log.info("【异步消息处理】osd3类型消息gateway{}", gateway);
// 调用setWs方法非osd4类型
setWs(messageContent, gateway, droProjectDrone, false);
} else {
key = "wrj:osd4:" + gateway;
droProjectDrone = droProjectDroneMapper.selectOne(
new LambdaQueryWrapper<DroProjectDrone>()
.eq(DroProjectDrone::getAirplaneSn, gateway)
);
log.info("【异步消息处理】osd4类型消息gateway{}", gateway);
isOsd4 = true;
// 调用setWs方法osd4类型
setWs(messageContent, gateway, droProjectDrone, true);
}
// 6. 存储消息到Redis使用Objects.requireNonNull避免空指针
stringRedisTemplate.opsForValue().set(key, Objects.requireNonNull(messageContent));
log.info("【异步消息处理】Redis存储成功key{}", key);
} catch (Exception e) {
// 捕获所有异常,避免线程终止
log.error("【异步消息处理】整体逻辑异常", e);
}
}
private void setWs(String message, String gateway, DroProjectDrone droProjectDrone, boolean b) {
//判断是否有连接
Set<Long> sessionsAll = WebSocketSessionHolder.getSessionsAll();
if (!sessionsAll.isEmpty()) {
String pushContent = buildPushMessage(gateway, message, droProjectDrone.getProjectId());
// 发送给指定用户equipment.getUserId()
WebSocketMessageDto messageDto = new WebSocketMessageDto();
messageDto.setMessage(pushContent);
messageDto.setSessionKeys(Collections.singletonList(droProjectDrone.getProjectId()));
WebSocketUtils.publishMessage(messageDto);
System.out.println("大屏已推送消息");
}
int onlineCount = InitOnStartWebSocketServer.getOnlineCount();
if (onlineCount > 0){
String modleId = b ? droProjectDrone.getAirplaneModleId() : droProjectDrone.getDroneModleId();
if (modleId != null) {
String ued = ueStructureJsonMessage(message, modleId);
InitOnStartWebSocketServer.sendToAll(ued);
System.out.println("ue已推送消息");
}
}
}
private String buildPushMessage(String key, String message, Long projectId) {
JSONObject messageObj = new JSONObject();
messageObj.put("type", "wrj_DATA_UPDATE");
messageObj.put("projectId",projectId.toString());
messageObj.put("clientId",key);
// 位置信息
JSONObject locationObj = new JSONObject();
locationObj.put("latitude", JSONUtil.parseObj(message).getJSONObject("data").get("latitude").toString()); // 纬度
locationObj.put("longitude", JSONUtil.parseObj(message).getJSONObject("data").get("longitude").toString()); // 经度
messageObj.put("location", locationObj);
return messageObj.toString();
}
/**
* 构建推送消息内容String类型
*/
private String ueStructureJsonMessage(String message, String modelId) {
// 构造消息对象(包含关键信息)
JSONObject messageObj = new JSONObject();
messageObj.put("type", "location"); // 消息类型
JSONObject data = new JSONObject();
data.put("id", modelId);
// 位置信息
JSONObject position = new JSONObject();
position.put("lat", JSONUtil.parseObj(message).getJSONObject("data").get("latitude").toString()); // 纬度
position.put("lng", JSONUtil.parseObj(message).getJSONObject("data").get("longitude").toString()); // 经度
position.put("alt", JSONUtil.parseObj(message).getJSONObject("data").get("height").toString()); // 海拔
data.put("position", position);
messageObj.put("data", data); // 设备唯一标识
// 转换为String类型返回
return messageObj.toString();
}
}

View File

@ -0,0 +1,110 @@
package org.dromara.bigscreen.service.impl;// 路径com.ruoyi.web.websocket.InitOnStartWebSocketServer
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.Resource;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.dromara.bigscreen.manager.RedisSubscribeManager;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.drone.domain.DroProjectDrone;
import org.dromara.drone.service.IDroProjectDroneService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 项目启动即自动启动的 WebSocket 服务端
* 端点路径:/websocket/init-on-start可自定义
*/
@Slf4j
@ServerEndpoint("/websocket/ue") // 定义 WebSocket 端点路径
@Component // 交给 Spring 管理,确保启动时被扫描
public class InitOnStartWebSocketServer {
// 2. 静态会话存储(线程安全,项目启动时即初始化)
private static final Map<String, Session> ONLINE_SESSIONS = new ConcurrentHashMap<>();
// 3. 静态代码块:项目启动时执行(初始化资源、打印启动日志)
static {
// 此处可添加启动时的初始化逻辑(如加载配置、连接外部资源等)
log.info("✅ WebSocket 服务端已随项目启动初始化!端点路径:/websocket/ue");
}
/**
* 客户端连接时触发(无需手动启动,有客户端连接时自动调用)
*/
@OnOpen
public void onOpen(Session session) {
// 存储新会话
ONLINE_SESSIONS.put(session.getId(), session);
// RedisUtils.setCacheObject("xmjdap:ws",System.currentTimeMillis() );
log.info("📌 客户端连接成功会话ID{},当前在线数:{}", session.getId(), ONLINE_SESSIONS.size());
}
/**
* 接收客户端消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("📥 收到会话[{}]消息:{}", session.getId(), message);
// 可选:回复客户端(示例)
try {
session.getBasicRemote().sendText("服务端已收到消息:" + message);
} catch (IOException e) {
log.error("📤 回复会话[{}]失败:{}", session.getId(), e.getMessage());
}
}
/**
* 客户端断开连接
*/
@OnClose
public void onClose(Session session, CloseReason reason) {
ONLINE_SESSIONS.remove(session.getId());
log.info("🔌 客户端断开连接会话ID{},原因:{},当前在线数:{}",
session.getId(), reason.getReasonPhrase(), ONLINE_SESSIONS.size());
}
/**
* 连接异常
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("⚠️ 会话[{}]异常:{}", session.getId(), error.getMessage(), error);
}
// ------------------------------ 工具方法(可选,供其他服务调用) ------------------------------
/**
* 向所有在线客户端发送消息(项目启动后,其他服务可直接调用)
*/
public static void sendToAll(String message) {
if (ONLINE_SESSIONS.isEmpty()) {
log.warn("⚠️ 无在线客户端,无需发送消息");
return;
}
ONLINE_SESSIONS.values().forEach(session -> {
if (session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("📤 向会话[{}]发送消息失败:{}", session.getId(), e.getMessage());
}
}
});
}
/**
* 获取当前在线数(供外部查询)
*/
public static int getOnlineCount() {
return ONLINE_SESSIONS.size();
}
}

View File

@ -40,6 +40,18 @@ public class DroProjectDrone implements Serializable {
* 备注
*/
private String remark;
/**
* 飞机sn
*/
private String airplaneSn;
/**
* 无人机机场模型id
*/
private String droneModleId;
/**
* 飞机模型id
*/
private String airplaneModleId;
}

View File

@ -43,5 +43,18 @@ public class DroProjectDroneBo extends BaseEntity {
*/
private String remark;
/**
* 飞机sn
*/
private String airplaneSn;
/**
* 无人机机场模型id
*/
private String droneModleId;
/**
* 飞机模型id
*/
private String airplaneModleId;
}

View File

@ -51,6 +51,17 @@ public class DroProjectDroneVo implements Serializable {
*/
@ExcelProperty(value = "备注")
private String remark;
/**
* 飞机sn
*/
private String airplaneSn;
/**
* 无人机机场模型id
*/
private String droneModleId;
/**
* 飞机模型id
*/
private String airplaneModleId;
}

View File

@ -88,5 +88,10 @@ public class GpsEquipment extends BaseEntity {
*/
private Integer gpsType;
/**
* 模型id
*/
private String modelId;
}

View File

@ -87,4 +87,9 @@ public class GpsEquipmentBo extends BaseEntity {
*/
private Integer gpsType;
/**
* 模型id
*/
private String modelId;
}

View File

@ -106,5 +106,10 @@ public class GpsEquipmentVo implements Serializable {
*/
private Integer gpsType;
/**
* 模型id
*/
private String modelId;
}

View File

@ -6,6 +6,7 @@ import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.dromara.bigscreen.service.impl.InitOnStartWebSocketServer;
import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.StringUtils;
@ -17,6 +18,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.websocket.dto.WebSocketMessageDto;
import org.dromara.common.websocket.holder.WebSocketSessionHolder;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.gps.domain.GpsManmachine;
import org.dromara.gps.domain.bo.GpsEquipmentSonBo;
@ -103,7 +105,7 @@ public class GpsEquipmentServiceImpl extends ServiceImpl<GpsEquipmentMapper, Gps
}
SysUserVo sysUserVo = userService.queryById(item.getUserId());
if (sysUserVo != null) {
item.setUserName(sysUserVo.getUserName());
item.setUserName(sysUserVo.getNickName());
}
BusProjectVo busProjectVo = projectService.selectById(item.getProjectId());
if (busProjectVo != null) {
@ -143,7 +145,7 @@ public class GpsEquipmentServiceImpl extends ServiceImpl<GpsEquipmentMapper, Gps
private LambdaQueryWrapper<GpsEquipment> buildQueryWrapper(GpsEquipmentBo bo) {
Map<String, Object> params = bo.getParams();
LambdaQueryWrapper<GpsEquipment> lqw = Wrappers.lambdaQuery();
lqw.orderByDesc(GpsEquipment::getId);
lqw.orderByDesc(GpsEquipment::getCreateTime);
lqw.eq(bo.getGpsType() != null, GpsEquipment::getGpsType, bo.getGpsType());
lqw.eq(bo.getProjectId() != null, GpsEquipment::getProjectId, bo.getProjectId());
lqw.eq(bo.getUserId() != null, GpsEquipment::getUserId, bo.getUserId());
@ -245,19 +247,30 @@ public class GpsEquipmentServiceImpl extends ServiceImpl<GpsEquipmentMapper, Gps
// 发布String类型订阅消息
// --------------------------
// 1. 构造需要推送的消息内容String类型
String pushContent = buildPushMessage(gpsEquipmentSon);
//判断是否有连接
int onlineCount = InitOnStartWebSocketServer.getOnlineCount();
if (onlineCount > 0){
if (equipment != null && StringUtils.isNotEmpty(equipment.getModelId())) {
String ued = ueStructureJsonMessage(gpsEquipmentSon, equipment.getModelId());
InitOnStartWebSocketServer.sendToAll(ued);
}
}
Set<Long> sessionsAll = WebSocketSessionHolder.getSessionsAll();
if (!sessionsAll.isEmpty()) {
String pushContent = buildPushMessage(gpsEquipmentSon);
// WebSocketUtils.publishAll(pushContent);
// 2. 发布消息根据是否有用户ID决定发送给指定用户或广播
if (equipment != null && equipment.getProjectId() != null) {
// 发送给指定用户equipment.getUserId()
WebSocketMessageDto messageDto = new WebSocketMessageDto();
messageDto.setMessage(pushContent);
messageDto.setSessionKeys(Collections.singletonList(equipment.getProjectId()));
WebSocketUtils.publishMessage(messageDto);
} else {
// 无用户ID则广播给所有在线客户端
WebSocketUtils.publishAll(pushContent);
// 2. 发布消息根据是否有用户ID决定发送给指定用户或广播
if (equipment != null && equipment.getProjectId() != null) {
// 发送给指定用户equipment.getUserId()
WebSocketMessageDto messageDto = new WebSocketMessageDto();
messageDto.setMessage(pushContent);
messageDto.setSessionKeys(Collections.singletonList(equipment.getProjectId()));
WebSocketUtils.publishMessage(messageDto);
} else {
// 无用户ID则广播给所有在线客户端
WebSocketUtils.publishAll(pushContent);
}
}
}
// 保存到Redis并设置过期监听
@ -286,6 +299,30 @@ public class GpsEquipmentServiceImpl extends ServiceImpl<GpsEquipmentMapper, Gps
return messageObj.toString();
}
/**
* 构建推送消息内容String类型
*/
private String ueStructureJsonMessage(GpsEquipmentSonBo sonBo,String modelId) {
// 构造消息对象(包含关键信息)
JSONObject messageObj = new JSONObject();
messageObj.put("type", "location"); // 消息类型
JSONObject data = new JSONObject();
data.put("id", modelId);
// 位置信息
JSONObject position = new JSONObject();
position.put("lat", sonBo.getLocLatitude().toString()); // 纬度
position.put("lng", sonBo.getLocLongitude().toString()); // 经度
position.put("alt", sonBo.getLocAltitude().toString()); // 海拔
data.put("position", position);
messageObj.put("data", data); // 设备唯一标识
// 转换为String类型返回
return messageObj.toString();
}
private static final int DEVICE_ALIVE_TIMEOUT = 120; // 5分钟
/**
@ -459,19 +496,29 @@ public class GpsEquipmentServiceImpl extends ServiceImpl<GpsEquipmentMapper, Gps
gpsEquipmentSonService.insertByBo(gpsEquipmentSonBo);
String pushContent = buildPushMessage(gpsEquipmentSonBo);
//判断是否有连接
int onlineCount = InitOnStartWebSocketServer.getOnlineCount();
if (onlineCount > 0){
if (equipment != null && StringUtils.isNotEmpty(equipment.getModelId())) {
String ued = ueStructureJsonMessage(gpsEquipmentSonBo, equipment.getModelId());
InitOnStartWebSocketServer.sendToAll(ued);
}
}
Set<Long> sessionsAll = WebSocketSessionHolder.getSessionsAll();
if (!sessionsAll.isEmpty()) {
String pushContent = buildPushMessage(gpsEquipmentSonBo);
// WebSocketUtils.publishAll(pushContent);
// 2. 发布消息根据是否有用户ID决定发送给指定用户或广播
if (equipment != null && equipment.getProjectId() != null) {
// 发送给指定用户equipment.getUserId()
WebSocketMessageDto messageDto = new WebSocketMessageDto();
messageDto.setMessage(pushContent);
messageDto.setSessionKeys(Collections.singletonList(equipment.getProjectId()));
WebSocketUtils.publishMessage(messageDto);
} else {
// 无用户ID则广播给所有在线客户端
WebSocketUtils.publishAll(pushContent);
// 2. 发布消息根据是否有用户ID决定发送给指定用户或广播
if (equipment != null && equipment.getProjectId() != null) {
// 发送给指定用户equipment.getUserId()
WebSocketMessageDto messageDto = new WebSocketMessageDto();
messageDto.setMessage(pushContent);
messageDto.setSessionKeys(Collections.singletonList(equipment.getProjectId()));
WebSocketUtils.publishMessage(messageDto);
} else {
// 无用户ID则广播给所有在线客户端
WebSocketUtils.publishAll(pushContent);
}
}
// 保存到Redis并设置过期监听
updateDeviceAliveStatus(gpsEquipment.getClientId());