12-01-修改

This commit is contained in:
2025-12-01 19:28:21 +08:00
parent 40b51e5e9d
commit be89a6fe9e

View File

@ -0,0 +1,149 @@
package org.dromara.websocket.websocket.service;// 路径com.ruoyi.web.websocket.InitOnStartWebSocketServer
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.contractor.domain.SubConstructionUser;
import org.dromara.contractor.service.impl.SubConstructionUserServiceImpl;
import org.dromara.gps.domain.GpsEquipmentSon;
import org.dromara.gps.service.impl.GpsEquipmentSonServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
* 项目启动即自动启动的 WebSocket 服务端
* 端点路径:/websocket/init-on-start可自定义
*/
@Slf4j
@ServerEndpoint("/websocket/rydw") // 定义 WebSocket 端点路径
@Component
public class RydwWebSocketServer {
@Autowired
private SubConstructionUserServiceImpl subConstructionUserService;
@Autowired
private GpsEquipmentSonServiceImpl gpsEquipmentSonService;
// 2. 静态会话存储(线程安全,项目启动时即初始化)
private static final Map<String, Session> ONLINE_SESSIONS = new ConcurrentHashMap<>();
// 3. 静态代码块:项目启动时执行(初始化资源、打印启动日志)
static {
// 此处可添加启动时的初始化逻辑(如加载配置、连接外部资源等)
log.info("✅ WebSocket 服务端已随项目启动初始化!端点路径:/websocket/rydw");
}
/**
* 客户端连接时触发(无需手动启动,有客户端连接时自动调用)
*/
@OnOpen
public void onOpen(Session session) {
// 存储新会话
ONLINE_SESSIONS.put(session.getId(), session);
log.info("📌 客户端连接成功会话ID{},当前在线数:{}", session.getId(), ONLINE_SESSIONS.size());
// 2. 异步获取并推送初始化数据(避免阻塞连接)
CompletableFuture.runAsync(() -> {
try {
//连接成功过后 获取当前项目下所有成员最新坐标
Map<String, List<String>> params = session.getRequestParameterMap();
List<String> subscriptionIds = params.get("projectId");
if (subscriptionIds != null && !subscriptionIds.isEmpty()){
//拿到所有人员
LambdaQueryWrapper<SubConstructionUser> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(SubConstructionUser::getProjectId, subscriptionIds.getFirst());
List<SubConstructionUser> list = subConstructionUserService.list(queryWrapper);
//拿到所有人员最新坐标
if (list != null && !list.isEmpty()){
LambdaQueryWrapper<GpsEquipmentSon> lqw = new LambdaQueryWrapper<>();
List<GpsEquipmentSon> gpsList = new ArrayList<>();
for (SubConstructionUser constructionUser : list) {
lqw.clear();
lqw.eq(GpsEquipmentSon::getUserId, constructionUser.getSysUserId());
lqw.orderByDesc(GpsEquipmentSon::getCreateTime);
lqw.last("limit 1");
GpsEquipmentSon one = gpsEquipmentSonService.getOne(lqw);
if (one != null){
gpsList.add(one);
}
}
if (!gpsList.isEmpty()){
WebSocketUtils.sendMessage(Long.valueOf(session.getId()), gpsList.toString());
}
}
}
WebSocketUtils.sendMessage(Long.valueOf(session.getId()), "初始化数据为空");
} catch (Exception e) {
log.error("会话[{}]初始化数据处理失败", session.getId(), e);
}
});
}
/**
* 接收客户端消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("📥 收到会话[{}]消息:{}", session.getId(), message);
// 可选:回复客户端(示例)
try {
session.getBasicRemote().sendText("服务端已收到消息:" + message);
} catch (IOException e) {
log.error("📤 回复会话[{}]失败:{}", session.getId(), e.getMessage());
}
}
/**
* 客户端断开连接
*/
@OnClose
public void onClose(Session session, CloseReason reason) {
ONLINE_SESSIONS.remove(session.getId());
log.info("🔌 客户端断开连接会话ID{},原因:{},当前在线数:{}",
session.getId(), reason.getReasonPhrase(), ONLINE_SESSIONS.size());
}
/**
* 连接异常
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("⚠️ 会话[{}]异常:{}", session.getId(), error.getMessage(), error);
}
// ------------------------------ 工具方法(可选,供其他服务调用) ------------------------------
/**
* 向所有在线客户端发送消息(项目启动后,其他服务可直接调用)
*/
public static void sendToAll(String message) {
if (ONLINE_SESSIONS.isEmpty()) {
log.warn("⚠️ 无在线客户端,无需发送消息");
return;
}
ONLINE_SESSIONS.values().forEach(session -> {
if (session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("📤 向会话[{}]发送消息失败:{}", session.getId(), e.getMessage());
}
}
});
}
/**
* 获取当前在线数(供外部查询)
*/
public static int getOnlineCount() {
return ONLINE_SESSIONS.size();
}
}