websocket优化以及传递初始值
This commit is contained in:
@ -8,6 +8,7 @@ import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.listener.PatternTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
|
||||
|
||||
|
||||
@ -1,23 +1,20 @@
|
||||
package org.dromara.bigscreen.manager;
|
||||
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.dromara.websocket.websocket.service.InitOnStartWebSocketServer;
|
||||
import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.drone.domain.DroProjectDrone;
|
||||
import org.dromara.drone.service.IDroProjectDroneService;
|
||||
import org.dromara.websocket.websocket.service.InitOnStartWebSocketServer;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.data.redis.listener.PatternTopic;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.listener.ChannelTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -44,13 +41,20 @@ public class RedisSubscribeManager {
|
||||
private IDroProjectDroneService droProjectDroneService;
|
||||
|
||||
// 3. 维护已订阅的主题关系:key=完整主题(如wrj:8UUXN4P00A06NK),value=对应的PatternTopic
|
||||
private final Map<String, PatternTopic> subscribedTopics = new ConcurrentHashMap<>();
|
||||
private final Map<String, ChannelTopic> subscribedTopics = new ConcurrentHashMap<>();
|
||||
|
||||
// 定义锁对象
|
||||
private final Object lock = new Object();
|
||||
|
||||
// 注入 RedisConnectionFactory
|
||||
@Resource
|
||||
private RedisConnectionFactory redisConnectionFactory;
|
||||
|
||||
|
||||
/**
|
||||
* 项目启动后立即执行一次订阅(避免等待定时任务首次执行)
|
||||
*/
|
||||
@PostConstruct
|
||||
// @PostConstruct
|
||||
public void initSubscribe() {
|
||||
log.info("项目启动,初始化Redis订阅...");
|
||||
// 步骤1:从数据库获取最新的主题列表(原逻辑:getTopicsByKeyPrefix)
|
||||
@ -136,6 +140,7 @@ public class RedisSubscribeManager {
|
||||
@Scheduled(cron = "0/10 * * * * ?")
|
||||
// @JobExecutor(name = "ueWsConnect")
|
||||
public void ueWsConnect() {
|
||||
synchronized (lock) { // 确保同一时间只有一个线程修改
|
||||
try {
|
||||
int onlineCount = InitOnStartWebSocketServer.getOnlineCount();
|
||||
Object object = RedisUtils.getCacheObject("xmjdap:ws");
|
||||
@ -147,7 +152,7 @@ public class RedisSubscribeManager {
|
||||
if (object != null) {
|
||||
long oldTime = Long.parseLong(String.valueOf(object));
|
||||
long now = System.currentTimeMillis();
|
||||
if (now-oldTime > 300000) {
|
||||
if (now - oldTime > 300000) {
|
||||
RedisUtils.deleteObject("xmjdap:ws");
|
||||
cancelAllSubscribes();
|
||||
return;
|
||||
@ -185,11 +190,17 @@ public class RedisSubscribeManager {
|
||||
// 异常时不修改现有订阅,避免误删
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消过期的订阅(现有订阅不在最新列表中的主题)
|
||||
*/
|
||||
public void cancelExpiredSubscribes(Set<String> latestFullTopics) {
|
||||
synchronized (lock) { // 确保同一时间只有一个线程修改
|
||||
if (!redisMessageListenerContainer.isRunning()) {
|
||||
log.error("RedisMessageListenerContainer 未运行,无法取消订阅");
|
||||
return;
|
||||
}
|
||||
// 遍历现有订阅,找出需要删除的主题
|
||||
Set<String> expiredTopics = subscribedTopics.keySet().stream()
|
||||
.filter(topic -> !latestFullTopics.contains(topic))
|
||||
@ -202,7 +213,7 @@ public class RedisSubscribeManager {
|
||||
|
||||
// 取消每个过期主题的订阅
|
||||
for (String expiredTopic : expiredTopics) {
|
||||
PatternTopic topic = subscribedTopics.get(expiredTopic);
|
||||
ChannelTopic topic = subscribedTopics.get(expiredTopic);
|
||||
if (topic != null) {
|
||||
// 从容器中移除监听器(取消订阅)
|
||||
redisMessageListenerContainer.removeMessageListener(redisMessageListenerAdapter, topic);
|
||||
@ -211,11 +222,25 @@ public class RedisSubscribeManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 新增未订阅的主题(最新列表中有但现有订阅没有的主题)
|
||||
*/
|
||||
public void addNewSubscribes(Set<String> latestFullTopics) {
|
||||
synchronized (lock) { // 确保同一时间只有一个线程修改
|
||||
if (!redisMessageListenerContainer.isRunning()) {
|
||||
log.error("RedisMessageListenerContainer 未运行,无法添加订阅");
|
||||
return;
|
||||
}
|
||||
// // 测试 Redis 连接
|
||||
// try (RedisConnection conn = redisConnectionFactory.getConnection()) {
|
||||
// conn.ping(); // 发送 ping 命令,验证连接
|
||||
// log.info("Redis 连接正常,可执行命令");
|
||||
// } catch (Exception e) {
|
||||
// log.error("Redis 连接失败,无法订阅主题", e);
|
||||
// return; // 连接失败,终止订阅
|
||||
// }
|
||||
// 遍历最新主题,找出需要新增的主题
|
||||
Set<String> newTopics = latestFullTopics.stream()
|
||||
.filter(topic -> !subscribedTopics.containsKey(topic))
|
||||
@ -225,16 +250,21 @@ public class RedisSubscribeManager {
|
||||
log.info("无新增订阅主题,无需添加");
|
||||
return;
|
||||
}
|
||||
// Object delegate = redisMessageListenerAdapter.getDelegate();
|
||||
// log.info("适配器绑定的监听器实例:{}",
|
||||
// delegate.getClass().getName());
|
||||
|
||||
// 为每个新主题添加订阅
|
||||
for (String newTopic : newTopics) {
|
||||
PatternTopic topic = new PatternTopic(newTopic);
|
||||
ChannelTopic topic = new ChannelTopic(newTopic);
|
||||
// 向容器中添加监听器(新增订阅)
|
||||
// log.info("添加订阅前,容器状态:{}", redisMessageListenerContainer.isRunning() ? "运行中" : "已停止");
|
||||
redisMessageListenerContainer.addMessageListener(redisMessageListenerAdapter, topic);
|
||||
subscribedTopics.put(newTopic, topic);
|
||||
log.info("已新增订阅:{}", newTopic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消所有现有订阅(兜底方法)
|
||||
@ -244,7 +274,7 @@ public class RedisSubscribeManager {
|
||||
return;
|
||||
}
|
||||
// 遍历所有已订阅主题,取消订阅
|
||||
for (Map.Entry<String, PatternTopic> entry : subscribedTopics.entrySet()) {
|
||||
for (Map.Entry<String, ChannelTopic> entry : subscribedTopics.entrySet()) {
|
||||
redisMessageListenerContainer.removeMessageListener(redisMessageListenerAdapter, entry.getValue());
|
||||
log.info("已取消订阅:{}", entry.getKey());
|
||||
}
|
||||
|
||||
@ -0,0 +1,28 @@
|
||||
package org.dromara.bigscreen.manager;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 项目启动完成后执行 Redis 订阅初始化
|
||||
*/
|
||||
@Component
|
||||
public class RedisSubscribeStartupRunner implements ApplicationRunner {
|
||||
|
||||
// 注入 Redis 订阅管理器
|
||||
@Resource
|
||||
private RedisSubscribeManager redisSubscribeManager;
|
||||
|
||||
/**
|
||||
* 项目完全启动后执行(替代 @PostConstruct)
|
||||
*/
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
// 此时 Spring 容器已完全启动,所有组件就绪
|
||||
redisSubscribeManager.initSubscribe();
|
||||
}
|
||||
}
|
||||
@ -89,6 +89,7 @@ public interface ProjectBigScreenService {
|
||||
void setList(GpsEquipmentBo bo);
|
||||
|
||||
List<Map<String, Object>> getClientList(Long projectId);
|
||||
List<String> getUeClientList();
|
||||
|
||||
/**
|
||||
* 更新无人机缓存
|
||||
|
||||
@ -92,7 +92,7 @@ public class AsyncMessageHandlerServiceImpl implements IAsyncMessageHandlerServi
|
||||
new LambdaQueryWrapper<DroProjectDrone>()
|
||||
.eq(DroProjectDrone::getDroneSn, gateway)
|
||||
);
|
||||
log.info("【异步消息处理】osd3类型消息,gateway:{}", gateway);
|
||||
// log.info("【异步消息处理】osd3类型消息,gateway:{}", gateway);
|
||||
// 调用setWs方法(非osd4类型)
|
||||
setWs(messageContent, gateway, droProjectDrone, false);
|
||||
} else {
|
||||
@ -101,7 +101,7 @@ public class AsyncMessageHandlerServiceImpl implements IAsyncMessageHandlerServi
|
||||
new LambdaQueryWrapper<DroProjectDrone>()
|
||||
.eq(DroProjectDrone::getAirplaneSn, gateway)
|
||||
);
|
||||
log.info("【异步消息处理】osd4类型消息,gateway:{}", gateway);
|
||||
// log.info("【异步消息处理】osd4类型消息,gateway:{}", gateway);
|
||||
isOsd4 = true;
|
||||
// 调用setWs方法(osd4类型)
|
||||
setWs(messageContent, gateway, droProjectDrone, true);
|
||||
@ -109,7 +109,7 @@ public class AsyncMessageHandlerServiceImpl implements IAsyncMessageHandlerServi
|
||||
|
||||
// 6. 存储消息到Redis(使用Objects.requireNonNull避免空指针)
|
||||
stringRedisTemplate.opsForValue().set(key, Objects.requireNonNull(messageContent));
|
||||
log.info("【异步消息处理】Redis存储成功,key:{}", key);
|
||||
// log.info("【异步消息处理】Redis存储成功,key:{}", key);
|
||||
|
||||
} catch (Exception e) {
|
||||
// 捕获所有异常,避免线程终止
|
||||
@ -168,9 +168,9 @@ public class AsyncMessageHandlerServiceImpl implements IAsyncMessageHandlerServi
|
||||
|
||||
// 位置信息
|
||||
JSONObject position = new JSONObject();
|
||||
position.put("lat", JSONUtil.parseObj(message).getJSONObject("data").get("latitude").toString()); // 纬度
|
||||
position.put("lng", JSONUtil.parseObj(message).getJSONObject("data").get("longitude").toString()); // 经度
|
||||
position.put("alt", JSONUtil.parseObj(message).getJSONObject("data").get("height").toString()); // 海拔
|
||||
position.put("lat", JSONUtil.parseObj(message).getJSONObject("data").get("latitude")); // 纬度
|
||||
position.put("lng", JSONUtil.parseObj(message).getJSONObject("data").get("longitude")); // 经度
|
||||
position.put("alt", JSONUtil.parseObj(message).getJSONObject("data").get("height")); // 海拔
|
||||
|
||||
data.put("position", position);
|
||||
messageObj.put("data", data); // 设备唯一标识
|
||||
|
||||
@ -16,6 +16,7 @@ import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.common.utils.BigDecimalUtil;
|
||||
import org.dromara.contractor.domain.SubConstructionUser;
|
||||
import org.dromara.contractor.service.ISubConstructionUserService;
|
||||
import org.dromara.drone.domain.vo.DroProjectDroneVo;
|
||||
import org.dromara.drone.service.IDroProjectDroneService;
|
||||
import org.dromara.gps.domain.bo.GpsEquipmentBo;
|
||||
import org.dromara.gps.domain.vo.GpsEquipmentSonVo;
|
||||
@ -702,6 +703,111 @@ public class ProjectBigScreenServiceImpl implements ProjectBigScreenService {
|
||||
return maps;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getUeClientList() {
|
||||
// 获取当天的开始时间(00:00:00)
|
||||
LocalDateTime startOfDay = LocalDateTime.now().with(LocalTime.MIN);
|
||||
|
||||
// 获取当前时间
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
List<GpsEquipmentSonVo> voList = gpsEquipmentService.getUeClientList(startOfDay, now);
|
||||
List<GpsEquipmentSonVo> appList = gpsEquipmentService.getUeUserListByProjectId( startOfDay, now);
|
||||
// List<LocationVo> anqmList = deviceService.getUeUserListByProjectId( startOfDay, now);
|
||||
// List<OthYs7Device> othYs7DeviceList = othYs7DeviceService.lambdaQuery()
|
||||
// .list();
|
||||
List<DroProjectDroneVo> wrjKeys = droProjectDroneService.getUeTopicsByProjectId();
|
||||
List<String> maps = new ArrayList<>();
|
||||
if (voList != null && !voList.isEmpty()) {
|
||||
for (GpsEquipmentSonVo item : voList) {
|
||||
JSONObject messageObj = new JSONObject();
|
||||
messageObj.put("type", "location"); // 消息类型
|
||||
|
||||
JSONObject data = new JSONObject();
|
||||
data.put("id", item.getModelId());
|
||||
|
||||
// 位置信息
|
||||
JSONObject position = new JSONObject();
|
||||
position.put("lat", item.getLocLatitude()); // 纬度
|
||||
position.put("lng", item.getLocLongitude()); // 经度
|
||||
position.put("alt", item.getLocAltitude()); // 海拔
|
||||
|
||||
data.put("position", position);
|
||||
messageObj.put("data", data); // 设备唯一标识
|
||||
maps.add(messageObj.toString());
|
||||
}
|
||||
}
|
||||
if (appList != null && !appList.isEmpty()) {
|
||||
for (GpsEquipmentSonVo item : appList) {
|
||||
JSONObject messageObj = new JSONObject();
|
||||
messageObj.put("type", "location"); // 消息类型
|
||||
|
||||
JSONObject data = new JSONObject();
|
||||
data.put("id", item.getModelId());
|
||||
|
||||
// 位置信息
|
||||
JSONObject position = new JSONObject();
|
||||
position.put("lat", item.getLocLatitude()); // 纬度
|
||||
position.put("lng", item.getLocLongitude()); // 经度
|
||||
position.put("alt", item.getLocAltitude()); // 海拔
|
||||
|
||||
data.put("position", position);
|
||||
messageObj.put("data", data); // 设备唯一标识
|
||||
maps.add(messageObj.toString());
|
||||
}
|
||||
}
|
||||
if (wrjKeys != null && !wrjKeys.isEmpty()) {
|
||||
for (DroProjectDroneVo key : wrjKeys) {
|
||||
Object object = stringRedisTemplate.opsForValue().get("wrj:osd4:" + key.getAirplaneSn());
|
||||
Object object6 = stringRedisTemplate.opsForValue().get("wrj:osd2:" + key.getDroneSn());
|
||||
String status = "";
|
||||
if (object6 != null) {
|
||||
JSONObject object1 = JSONUtil.parseObj(object6);
|
||||
status = object1.getJSONObject("data").get("flighttask_step_code").toString();
|
||||
}
|
||||
if (object != null) {
|
||||
JSONObject object1 = JSONUtil.parseObj(object);
|
||||
JSONObject messageObj = new JSONObject();
|
||||
messageObj.put("type", "location"); // 消息类型
|
||||
|
||||
JSONObject data = new JSONObject();
|
||||
data.put("id", key.getAirplaneModleId());
|
||||
|
||||
// 位置信息
|
||||
JSONObject position = new JSONObject();
|
||||
position.put("lat", object1.getJSONObject("data").get("latitude")); // 纬度
|
||||
position.put("lng", object1.getJSONObject("data").get("longitude")); // 经度
|
||||
position.put("alt", object1.getJSONObject("data").get("height")); // 海拔
|
||||
|
||||
data.put("position", position);
|
||||
messageObj.put("data", data); // 设备唯一标识
|
||||
maps.add(messageObj.toString());
|
||||
} else {
|
||||
Object object2 = stringRedisTemplate.opsForValue().get("wrj:osd3:" + key);
|
||||
if (object2 != null) {
|
||||
JSONObject object3 = JSONUtil.parseObj(object2);
|
||||
|
||||
JSONObject messageObj = new JSONObject();
|
||||
messageObj.put("type", "location"); // 消息类型
|
||||
|
||||
JSONObject data = new JSONObject();
|
||||
data.put("id", key.getDroneModleId());
|
||||
|
||||
// 位置信息
|
||||
JSONObject position = new JSONObject();
|
||||
position.put("lat", object3.getJSONObject("data").get("latitude")); // 纬度
|
||||
position.put("lng", object3.getJSONObject("data").get("longitude")); // 经度
|
||||
position.put("alt", object3.getJSONObject("data").get("height")); // 海拔
|
||||
|
||||
data.put("position", position);
|
||||
messageObj.put("data", data); // 设备唯一标识
|
||||
maps.add(messageObj.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return maps;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWrjHc() {
|
||||
RedisUtils.setCacheObject("xmjdap:ws", System.currentTimeMillis());
|
||||
|
||||
@ -18,4 +18,7 @@ public interface DroProjectDroneMapper extends BaseMapperPlus<DroProjectDrone, D
|
||||
List<String> getTopicsByKeyPrefix();
|
||||
|
||||
List<String> getTopicsByProjectId(@Param("projectId") Long projectId);
|
||||
|
||||
List<DroProjectDroneVo> getUeTopicsByProjectId();
|
||||
|
||||
}
|
||||
|
||||
@ -83,4 +83,7 @@ public interface IDroProjectDroneService extends IService<DroProjectDrone> {
|
||||
List<String> getTopicsByKeyPrefix();
|
||||
|
||||
List<String> getTopicsByProjectId(Long projectId);
|
||||
|
||||
List<DroProjectDroneVo> getUeTopicsByProjectId();
|
||||
|
||||
}
|
||||
|
||||
@ -167,4 +167,9 @@ public class DroProjectDroneServiceImpl extends ServiceImpl<DroProjectDroneMappe
|
||||
public List<String> getTopicsByProjectId(Long projectId) {
|
||||
return baseMapper.getTopicsByProjectId(projectId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DroProjectDroneVo> getUeTopicsByProjectId() {
|
||||
return baseMapper.getUeTopicsByProjectId();
|
||||
}
|
||||
}
|
||||
|
||||
@ -149,6 +149,10 @@ public class GpsEquipmentSonVo implements Serializable {
|
||||
*/
|
||||
@ExcelProperty(value = "备注")
|
||||
private String remark;
|
||||
/**
|
||||
* 模型id
|
||||
*/
|
||||
private String modelId;
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -92,4 +92,43 @@ public interface GpsEquipmentSonMapper extends BaseMapperPlus<GpsEquipmentSon, G
|
||||
" AND create_time BETWEEN #{bo.startTime} AND #{bo.endTime}\n" +
|
||||
" GROUP BY DATE(create_time)")
|
||||
List<GpsStatusVo> getRlList(@Param("bo") GpsEquipmentSonBo bo);
|
||||
|
||||
@Select("WITH RankedData AS (\n" +
|
||||
" SELECT\n" +
|
||||
" *,\n" +
|
||||
" ROW_NUMBER() OVER (PARTITION BY client_id ORDER BY create_time DESC) AS rn\n" +
|
||||
" FROM\n" +
|
||||
" gps_equipment_son \n" +
|
||||
"WHERE \n" +
|
||||
"create_time BETWEEN #{startTime} AND #{endTime}\n" +
|
||||
")\n" +
|
||||
"SELECT\n" +
|
||||
" r.*,su.nick_name as userName,ge.model_id as modelId\n" +
|
||||
"FROM\n" +
|
||||
" RankedData r\n" +
|
||||
"LEFT JOIN sys_user su ON r.user_id=su.user_id\n" +
|
||||
"LEFT JOIN gps_equipment ge ON r.client_id = ge.client_id \n" +
|
||||
"WHERE\n" +
|
||||
" rn = 1;")
|
||||
List<GpsEquipmentSonVo> getUeClientList(@Param("startTime") LocalDateTime startOfDay, @Param("endTime") LocalDateTime now);
|
||||
|
||||
@Select("WITH RankedData AS (\n" +
|
||||
" SELECT\n" +
|
||||
" *,\n" +
|
||||
" ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY create_time DESC) AS rn\n" +
|
||||
" FROM\n" +
|
||||
" gps_equipment_son \n " +
|
||||
"WHERE \n" +
|
||||
"client_id IS NULL \n" +
|
||||
"AND create_time BETWEEN #{startTime} AND #{endTime} \n" +
|
||||
")\n" +
|
||||
"SELECT\n" +
|
||||
" r.*,su.nick_name as userName,ge.model_id as modelId\n" +
|
||||
"FROM\n" +
|
||||
" RankedData r " +
|
||||
"LEFT JOIN sys_user su ON r.user_id=su.user_id\n" +
|
||||
"LEFT JOIN gps_equipment ge ON r.user_id = ge.user_id \n" +
|
||||
"WHERE\n" +
|
||||
" rn = 1;")
|
||||
List<GpsEquipmentSonVo> getUeUserListByProjectId(@Param("startTime") LocalDateTime startOfDay, @Param("endTime") LocalDateTime now);
|
||||
}
|
||||
|
||||
@ -102,4 +102,8 @@ public interface IGpsEquipmentService extends IService<GpsEquipment>{
|
||||
void setData(String jsonData);
|
||||
|
||||
List<GpsEquipmentSonVo> getUserListByProjectId(Long projectId, LocalDateTime startOfDay, LocalDateTime now);
|
||||
|
||||
List<GpsEquipmentSonVo> getUeClientList(LocalDateTime startOfDay, LocalDateTime now);
|
||||
|
||||
List<GpsEquipmentSonVo> getUeUserListByProjectId(LocalDateTime startOfDay, LocalDateTime now);
|
||||
}
|
||||
|
||||
@ -81,4 +81,8 @@ public interface IGpsEquipmentSonService extends IService<GpsEquipmentSon>{
|
||||
List<GpsEquipmentSonVo> getUserListByProjectId(Long projectId, LocalDateTime startOfDay, LocalDateTime now);
|
||||
|
||||
List<GpsStatusVo> getRlList(GpsEquipmentSonBo bo);
|
||||
|
||||
List<GpsEquipmentSonVo> getUeClientList(LocalDateTime startOfDay, LocalDateTime now);
|
||||
|
||||
List<GpsEquipmentSonVo> getUeUserListByProjectId(LocalDateTime startOfDay, LocalDateTime now);
|
||||
}
|
||||
|
||||
@ -529,5 +529,15 @@ public class GpsEquipmentServiceImpl extends ServiceImpl<GpsEquipmentMapper, Gps
|
||||
return gpsEquipmentSonService.getUserListByProjectId(projectId,startOfDay,now);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<GpsEquipmentSonVo> getUeClientList(LocalDateTime startOfDay, LocalDateTime now) {
|
||||
return gpsEquipmentSonService.getUeClientList(startOfDay,now);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<GpsEquipmentSonVo> getUeUserListByProjectId(LocalDateTime startOfDay, LocalDateTime now) {
|
||||
return gpsEquipmentSonService.getUeUserListByProjectId(startOfDay,now);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -211,4 +211,14 @@ public class GpsEquipmentSonServiceImpl extends ServiceImpl<GpsEquipmentSonMappe
|
||||
}
|
||||
return baseMapper.getRlList(bo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<GpsEquipmentSonVo> getUeClientList(LocalDateTime startOfDay, LocalDateTime now) {
|
||||
return baseMapper.getUeClientList(startOfDay,now);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<GpsEquipmentSonVo> getUeUserListByProjectId(LocalDateTime startOfDay, LocalDateTime now) {
|
||||
return baseMapper.getUeUserListByProjectId(startOfDay,now);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,10 +2,14 @@ package org.dromara.websocket.websocket.service;// 路径:com.ruoyi.web.websoc
|
||||
import jakarta.websocket.*;
|
||||
import jakarta.websocket.server.ServerEndpoint;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.bigscreen.service.ProjectBigScreenService;
|
||||
import org.dromara.common.core.utils.SpringUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@ -20,6 +24,9 @@ public class InitOnStartWebSocketServer {
|
||||
// 2. 静态会话存储(线程安全,项目启动时即初始化)
|
||||
private static final Map<String, Session> ONLINE_SESSIONS = new ConcurrentHashMap<>();
|
||||
|
||||
/* @Autowired
|
||||
private ProjectBigScreenService projectBigScreenService;*/
|
||||
|
||||
// 3. 静态代码块:项目启动时执行(初始化资源、打印启动日志)
|
||||
static {
|
||||
// 此处可添加启动时的初始化逻辑(如加载配置、连接外部资源等)
|
||||
@ -35,6 +42,35 @@ public class InitOnStartWebSocketServer {
|
||||
ONLINE_SESSIONS.put(session.getId(), session);
|
||||
// RedisUtils.setCacheObject("xmjdap:ws",System.currentTimeMillis() );
|
||||
log.info("📌 客户端连接成功!会话ID:{},当前在线数:{}", session.getId(), ONLINE_SESSIONS.size());
|
||||
// 2. 异步获取并推送初始化数据(避免阻塞连接)
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
// 2.1 获取数据
|
||||
ProjectBigScreenService service = SpringUtils.getBean(ProjectBigScreenService.class);
|
||||
List<String> ueClientList = service.getUeClientList();
|
||||
if (ueClientList == null || ueClientList.isEmpty()) {
|
||||
session.getBasicRemote().sendText("初始化数据为空");
|
||||
log.warn("会话[{}]未获取到初始化数据", session.getId());
|
||||
return;
|
||||
}
|
||||
for (String ueClient : ueClientList) {
|
||||
session.getBasicRemote().sendText(ueClient);
|
||||
log.info("📤 已向会话[{}]推送初始化数据,长度:{}字节", session.getId(), ueClient.length());
|
||||
}
|
||||
// 2.3 推送数据
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("会话[{}]初始化数据处理失败", session.getId(), e);
|
||||
// 推送错误信息
|
||||
try {
|
||||
if (session.isOpen()) { // 检查会话是否仍打开
|
||||
session.getBasicRemote().sendText("初始化失败:" + e.getMessage());
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
log.error("会话[{}]推送错误信息失败", session.getId(), ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -19,4 +19,11 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||
WHERE project_id = #{projectId}
|
||||
GROUP BY drone_sn
|
||||
</select>
|
||||
<select id="getUeTopicsByProjectId" resultType="org.dromara.drone.domain.vo.DroProjectDroneVo">
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
`dro_project_drone`
|
||||
GROUP BY drone_sn
|
||||
</select>
|
||||
</mapper>
|
||||
|
||||
Reference in New Issue
Block a user