大屏websocket初始化

This commit is contained in:
2025-12-15 16:23:18 +08:00
parent 1263b4812d
commit 834601daa8

View File

@ -0,0 +1,229 @@
package org.dromara.websocket.websocket.service;
import cn.hutool.json.JSONUtil;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.gps.domain.bo.GpsEquipmentSonBo;
import org.dromara.gps.domain.vo.GpsEquipmentSonVo;
import org.dromara.gps.service.IGpsEquipmentSonService;
import org.dromara.websocket.websocket.domain.vo.VehicleVo;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
* 大屏 WebSocket 服务端(支持订阅消息)
* 端点路径:/websocket/bigScreen
*/
@Slf4j
@ServerEndpoint("/websocket/bigScreen") // 客户端连接时需携带订阅ID参数ws://xxx/websocket/vehicle?subscriptionId=xxx
@Component
public class BigScreenWebSocketServer {
// 1. 存储所有在线会话sessionId -> Session
private static final Map<String, Session> ONLINE_SESSIONS = new ConcurrentHashMap<>();
// 2. 核心订阅ID与会话的映射subscriptionId -> Session
private static final Map<String, Session> SUBSCRIPTION_SESSIONS = new ConcurrentHashMap<>();
// 3. 反向映射会话ID与订阅ID的映射用于断开连接时清理订阅关系
private static final Map<String, String> SESSION_TO_SUBSCRIPTION = new ConcurrentHashMap<>();
// 当前会话对应的订阅ID每个连接实例的专属变量
private String currentSubscriptionId;
static {
log.info("✅ 大屏 WebSocket 服务端已随项目启动初始化!端点路径:/websocket/bigScreen");
}
/**
* 客户端连接时触发解析订阅ID并建立映射关系
*/
@OnOpen
public void onOpen(Session session) {
// 从连接URL的查询参数中获取订阅ID客户端连接格式ws://xxx/websocket/bigScreen?subscriptionId=123
Map<String, List<String>> params = session.getRequestParameterMap();
List<String> subscriptionIds = params.get("subscriptionId");
if (subscriptionIds != null && !subscriptionIds.isEmpty()) {
this.currentSubscriptionId = subscriptionIds.get(0); // 取第一个订阅ID
// 建立映射关系
SUBSCRIPTION_SESSIONS.put(currentSubscriptionId, session);
SESSION_TO_SUBSCRIPTION.put(session.getId(), currentSubscriptionId);
log.info("📌 客户端订阅成功订阅ID{}会话ID{},当前订阅数:{}",
currentSubscriptionId, session.getId(), SUBSCRIPTION_SESSIONS.size());
} else {
log.warn("📌 客户端连接未携带订阅ID会话ID{}", session.getId());
}
// 存储会话到在线列表
ONLINE_SESSIONS.put(session.getId(), session);
log.info("📌 客户端连接成功会话ID{},当前在线数:{}", session.getId(), ONLINE_SESSIONS.size());
// 异步推送初始化数据(原有逻辑保留)
CompletableFuture.runAsync(() -> {
try {
String[] split = currentSubscriptionId.split("-");
//todo 填充不同类型大屏获取基础数据的方法判断
IGpsEquipmentSonService service = SpringUtils.getBean(IGpsEquipmentSonService.class);
GpsEquipmentSonBo bo = new GpsEquipmentSonBo();
bo.setUserId(Long.parseLong(split[0]));
bo.setTripId(Long.parseLong(split[1]));
List<GpsEquipmentSonVo> list = service.getNewVehicleList(bo);
if (list == null || list.isEmpty()) {
session.getBasicRemote().sendText("初始化数据为空");
log.warn("会话[{}]未获取到初始化数据", session.getId());
return;
}
List<VehicleVo> vehicleVos = new ArrayList<>();
for (GpsEquipmentSonVo ueClient : list) {
VehicleVo vo = new VehicleVo();
vo.setLocLatitude(ueClient.getLocLatitude());
vo.setLocLongitude(ueClient.getLocLongitude());
vehicleVos.add(vo);
}
session.getBasicRemote().sendText(JSONUtil.toJsonStr(vehicleVos));
log.info("📤 已向会话[{}]推送初始化数据,长度:{}字节", session.getId(), vehicleVos.size());
} catch (Exception e) {
log.error("会话[{}]初始化数据处理失败", session.getId(), e);
try {
if (session.isOpen()) {
session.getBasicRemote().sendText("初始化失败:" + e.getMessage());
}
} catch (IOException ex) {
log.error("会话[{}]推送错误信息失败", session.getId(), ex);
}
}
});
}
/**
* 接收客户端消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("📥 收到会话[{}]订阅ID{})消息:{}", session.getId(), currentSubscriptionId, message);
// 可选:回复客户端
//todo 填充不同类型大屏获取数据的方法判断
try {
session.getBasicRemote().sendText("服务端已收到消息:" + message);
} catch (IOException e) {
log.error("📤 回复会话[{}]失败:{}", session.getId(), e.getMessage());
}
}
/**
* 客户端断开连接(清理订阅关系)
*/
@OnClose
public void onClose(Session session, CloseReason reason) {
// 1. 移除在线会话
ONLINE_SESSIONS.remove(session.getId());
// 2. 清理订阅关系
String subscriptionId = SESSION_TO_SUBSCRIPTION.get(session.getId());
if (subscriptionId != null) {
SUBSCRIPTION_SESSIONS.remove(subscriptionId);
SESSION_TO_SUBSCRIPTION.remove(session.getId());
log.info("🔌 客户端订阅关系已清除订阅ID{}会话ID{}", subscriptionId, session.getId());
}
log.info("🔌 客户端断开连接会话ID{},原因:{},当前在线数:{},当前订阅数:{}",
session.getId(), reason.getReasonPhrase(),
ONLINE_SESSIONS.size(), SUBSCRIPTION_SESSIONS.size());
}
/**
* 连接异常
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("⚠️ 会话[{}]订阅ID{})异常:{}", session.getId(), currentSubscriptionId, error.getMessage(), error);
}
// ------------------------------ 订阅消息发送工具方法(供外部调用) ------------------------------
/**
* 向指定订阅ID的客户端发送消息
* @param subscriptionId 订阅ID
* @param message 消息内容
* @return 是否发送成功
*/
public static boolean sendToSubscription(String subscriptionId, String message) {
if (subscriptionId == null || message == null) {
log.warn("⚠️ 订阅ID或消息为空发送失败");
return false;
}
// 从订阅映射中获取目标会话
Session session = SUBSCRIPTION_SESSIONS.get(subscriptionId);
if (session == null || !session.isOpen()) {
log.warn("⚠️ 订阅ID[{}]对应的客户端未连接或已断开", subscriptionId);
return false;
}
// 发送消息
try {
session.getBasicRemote().sendText(message);
log.info("📤 已向订阅ID[{}]发送消息:{}", subscriptionId, message);
return true;
} catch (IOException e) {
log.error("📤 向订阅ID[{}]发送消息失败", subscriptionId, e);
return false;
}
}
/**
* 向所有订阅客户端广播消息
* @param message 消息内容
*/
public static void broadcastToAllSubscriptions(String message) {
if (SUBSCRIPTION_SESSIONS.isEmpty()) {
log.warn("⚠️ 无订阅客户端,无需广播消息");
return;
}
SUBSCRIPTION_SESSIONS.forEach((subscriptionId, session) -> {
if (session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
log.info("📤 已向订阅ID[{}]广播消息", subscriptionId);
} catch (IOException e) {
log.error("📤 向订阅ID[{}]广播消息失败", subscriptionId, e);
}
}
});
}
/**
* 获取当前订阅数
*/
public static int getSubscriptionCount() {
return SUBSCRIPTION_SESSIONS.size();
}
// 原有工具方法保留
public static void sendToAll(String message) {
if (ONLINE_SESSIONS.isEmpty()) {
log.warn("⚠️ 无在线客户端,无需发送消息");
return;
}
ONLINE_SESSIONS.values().forEach(session -> {
if (session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("📤 向会话[{}]发送消息失败:{}", session.getId(), e.getMessage());
}
}
});
}
public static int getOnlineCount() {
return ONLINE_SESSIONS.size();
}
}