对接逆变器

This commit is contained in:
2025-10-30 11:05:38 +08:00
parent 71dba8bb2d
commit 92f6a16106
7 changed files with 678 additions and 0 deletions

View File

@ -0,0 +1,44 @@
package org.dromara.tcpfuwu.constant;
/**
* Modbus TCP协议常量
*/
public class ModbusConstant {
// Modbus功能码读取输入寄存器0x04
public static final byte FUNC_READ_INPUT_REG = 0x04;
// CRC16校验表Modbus RTU标准
public static final Integer[] CRC16_TABLE = {
0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241,
0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440,
0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40,
0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841,
0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40,
0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41,
0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641,
0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040,
0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240,
0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441,
0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41,
0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840,
0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41,
0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40,
0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640,
0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041,
0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240,
0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441,
0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41,
0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840,
0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41,
0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40,
0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640,
0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041,
0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241,
0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440,
0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40,
0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841,
0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40,
0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41,
0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641,
0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040
};
}

View File

@ -0,0 +1,16 @@
package org.dromara.tcpfuwu.domain;
import lombok.Data;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Data
public class DeviceCache {
private String snCode; // 设备SN码
private long createTime; // 连接创建时间(毫秒)
private long lastHeartbeatTime; // 最后心跳时间(毫秒)
private boolean isExpired; // 是否过期
// 变量值缓存key=变量名value=解析后的值(带倍率)
private Map<String, Object> variableValues = new ConcurrentHashMap<>();
}

View File

@ -0,0 +1,18 @@
package org.dromara.tcpfuwu.domain;
import lombok.Data;
@Data
public class ModbusVariable {
private Long id;
private String snCode; // 设备SN码
private Integer slaveId; // 从机地址
private Integer funcCode; // 功能码
private Integer startRegAddr; // 起始寄存器地址
private Integer regQuantity; // 寄存器数量
private String variableName; // 变量名
private String dataType; // 数据类型S16/U16/S32/U32/FLOAT
private Double multiplier; // 数据倍率
private String unit; // 单位
private Byte isEnabled; // 是否启用1=启用)
}

View File

@ -0,0 +1,340 @@
package org.dromara.tcpfuwu.handler;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.dromara.shebei.domain.dto.OpsSbLiebiaoDto;
import org.dromara.shebei.domain.vo.OpsSbMbBianliangVo;
import org.dromara.shebei.service.IOpsSbLiebiaoService;
import org.dromara.tcpfuwu.constant.ModbusConstant;
import org.dromara.tcpfuwu.domain.DeviceCache;
import org.dromara.tcpfuwu.domain.ModbusVariable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 设备连接处理类处理单设备的心跳和Modbus通信
*/
//@Component
public class DeviceHandler {
// 配置参数
private static final long HEARTBEAT_EXPIRE_MS = 30 * 1000;
private static final int HEARTBEAT_LEN = 20;
private static final int SN_CODE_LEN = 10;
private static final int MODBUS_REQUEST_CYCLE = 10; // 请求周期(秒)
// 依赖注入
// @Autowired
private final IOpsSbLiebiaoService sbLiebiaoService;
// 设备信息
private final Socket clientSocket;
private final String deviceAddr;
private final DeviceCache deviceCache;
private final Map<String, DeviceCache> globalCache;
// 状态控制
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private InputStream in;
private OutputStream out;
private ScheduledExecutorService modbusScheduler;
public DeviceHandler(
Socket clientSocket,
String deviceAddr,
DeviceCache deviceCache,
Map<String, DeviceCache> globalCache,
IOpsSbLiebiaoService sbLiebiaoService // 新增手动传递Spring服务
) {
this.clientSocket = clientSocket;
this.deviceAddr = deviceAddr;
this.deviceCache = deviceCache;
this.globalCache = globalCache;
this.sbLiebiaoService = sbLiebiaoService; // 赋值
}
/**
* 处理设备通信的主方法
*/
public void handle() {
try {
// 初始化IO流
in = clientSocket.getInputStream();
out = clientSocket.getOutputStream();
// 启动心跳接收线程
startHeartbeatReceiver();
// 启动Modbus定时请求线程
startModbusScheduler();
// 阻塞等待线程结束
synchronized (this) {
wait();
}
} catch (Exception e) {
System.err.printf("【设备处理异常】地址:%s原因%s%n", deviceAddr, e.getMessage());
} finally {
// 清理资源
stop();
}
}
/**
* 启动心跳接收线程
*/
private void startHeartbeatReceiver() {
new Thread(() -> {
try {
byte[] buffer = new byte[1024];
while (isRunning.get()) {
int len = in.read(buffer);
if (len <= 0) {
System.out.printf("【心跳接收失败】设备:%s连接断开%n", deviceAddr);
break;
}
byte[] data = Arrays.copyOf(buffer, len);
if (isHeartbeatData(data)) {
// 解析SN码
String snCode = new String(Arrays.copyOfRange(data, 0, SN_CODE_LEN)).trim();
// 更新缓存
deviceCache.setSnCode(snCode);
deviceCache.setLastHeartbeatTime(System.currentTimeMillis());
System.out.printf("【心跳更新】设备:%sSN%s%n", deviceAddr, snCode);
}
}
} catch (Exception e) {
System.err.printf("【心跳线程异常】设备:%s原因%s%n", deviceAddr, e.getMessage());
} finally {
// 心跳线程结束,标记设备离线
isRunning.set(false);
synchronized (DeviceHandler.this) {
DeviceHandler.this.notify();
}
}
}, "heartbeat-" + deviceAddr).start();
}
/**
* 启动Modbus定时请求调度器
*/
private void startModbusScheduler() {
modbusScheduler = Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "modbus-" + deviceAddr)
);
modbusScheduler.scheduleAtFixedRate(() -> {
if (!isRunning.get() || !clientSocket.isConnected() || clientSocket.isClosed()) {
stop();
return;
}
// 未获取SN码则等待
if (deviceCache.getSnCode() == null) {
System.out.printf("【等待心跳】设备:%s%n", deviceAddr);
return;
}
// 查询变量并处理
List<ModbusVariable> variables = queryVariables(deviceCache.getSnCode());
if (variables.isEmpty()) {
System.out.printf("【无变量配置】SN%s%n", deviceCache.getSnCode());
return;
}
// 遍历变量发送请求
for (ModbusVariable var : variables) {
try {
handleVariable(var);
} catch (Exception e) {
System.err.printf("【变量处理失败】SN%s变量%s原因%s%n",
deviceCache.getSnCode(), var.getVariableName(), e.getMessage());
}
}
}, 0, MODBUS_REQUEST_CYCLE, TimeUnit.SECONDS);
}
/**
* 处理单个变量的请求与解析
*/
private void handleVariable(ModbusVariable var) throws Exception {
// 生成请求帧
byte[] request = generateModbusFrame(var);
System.out.printf("【发送请求】SN%s变量%s%s%n",
deviceCache.getSnCode(), var.getVariableName(), bytesToHex(request));
// 发送请求
out.write(request);
out.flush();
// 接收响应超时3秒
byte[] response = receiveResponse(3000);
if (response == null) {
throw new Exception("响应超时");
}
// 验证响应
if (!validateResponse(response, var)) {
throw new Exception("响应验证失败");
}
// 解析数据
Object value = parseValue(response, var);
System.out.printf("【解析成功】SN%s变量%s%s %s%n",
deviceCache.getSnCode(), var.getVariableName(), value, var.getUnit());
// 更新缓存
deviceCache.getVariableValues().put(var.getVariableName(), value);
}
/**
* 从数据库查询变量列表
*/
private List<ModbusVariable> queryVariables(String snCode) {
ArrayList<ModbusVariable> modbusVariables = new ArrayList<>();
OpsSbLiebiaoDto opsSbLiebiaoDto = sbLiebiaoService.getLiebiaoBianliangList(snCode);
if (!opsSbLiebiaoDto.getSbMbBianliangVos().isEmpty()) {
for (OpsSbMbBianliangVo v : opsSbLiebiaoDto.getSbMbBianliangVos()) {
ModbusVariable modbusVariable = new ModbusVariable();
modbusVariable.setDataType(v.getShujvGeshi());
modbusVariable.setVariableName(v.getBlName());
modbusVariable.setUnit(v.getBlDanwei());
modbusVariable.setSnCode(snCode);
modbusVariable.setSlaveId(Math.toIntExact(opsSbLiebiaoDto.getSlaveId()));
modbusVariable.setFuncCode(Integer.parseInt(v.getJicunqiGnm()));
modbusVariable.setStartRegAddr(Integer.parseInt(v.getJicunqiAdd()));
switch (v.getShujvGeshi()) {
case "S16": modbusVariable.setRegQuantity(1); break;
case "U16": modbusVariable.setRegQuantity(1); break;
case "S32": modbusVariable.setRegQuantity(2); break;
case "U32": modbusVariable.setRegQuantity(2); break;
}
modbusVariables.add(modbusVariable);
}
}
return modbusVariables;
}
/**
* 停止所有资源
*/
public void stop() {
isRunning.set(false);
// 关闭Modbus调度器
if (modbusScheduler != null) {
modbusScheduler.shutdown();
}
// 关闭Socket和流
try {
if (in != null) in.close();
if (out != null) out.close();
if (clientSocket != null && !clientSocket.isClosed()) {
clientSocket.close();
}
} catch (Exception e) {
System.err.printf("【资源关闭异常】设备:%s原因%s%n", deviceAddr, e.getMessage());
}
// 从全局缓存移除
globalCache.remove(deviceAddr);
System.out.printf("【设备下线】地址:%s%n", deviceAddr);
}
// ------------------------------ 工具方法 ------------------------------
private boolean isHeartbeatData(byte[] data) {
if (data.length != HEARTBEAT_LEN) return false;
for (byte b : data) {
if (b < '0' || b > '9') return false;
}
return true;
}
private byte[] generateModbusFrame(ModbusVariable var) {
ByteBuffer buffer = ByteBuffer.allocate(6).order(ByteOrder.BIG_ENDIAN);
buffer.put(var.getSlaveId().byteValue())
.put(var.getFuncCode().byteValue())
.putShort(var.getStartRegAddr().shortValue())
.putShort(var.getRegQuantity().shortValue());
byte[] body = buffer.array();
byte[] crc = calculateCrc16(body);
byte[] frame = new byte[body.length + crc.length];
System.arraycopy(body, 0, frame, 0, body.length);
System.arraycopy(crc, 0, frame, body.length, crc.length);
return frame;
}
private byte[] receiveResponse(int timeoutMs) throws Exception {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeoutMs) {
if (in.available() > 0) {
byte[] buffer = new byte[1024];
int len = in.read(buffer);
return Arrays.copyOf(buffer, len);
}
Thread.sleep(10);
}
return null;
}
private boolean validateResponse(byte[] response, ModbusVariable var) {
if (response.length < 5) return false;
if (response[0] != var.getSlaveId() || response[1] != var.getFuncCode()) return false;
if (response[2] != var.getRegQuantity() * 2) return false;
byte[] body = Arrays.copyOf(response, response.length - 2);
byte[] receivedCrc = Arrays.copyOfRange(response, response.length - 2, response.length);
return Arrays.equals(receivedCrc, calculateCrc16(body));
}
private Object parseValue(byte[] response, ModbusVariable var) {
ByteBuffer buffer = ByteBuffer.wrap(response, 3, response[2]).order(ByteOrder.BIG_ENDIAN);
double rawValue;
switch (var.getDataType().toUpperCase()) {
case "S16": rawValue = buffer.getShort(); break;
case "U16": rawValue = buffer.getShort() & 0xFFFF; break;
case "S32": rawValue = buffer.getInt(); break;
case "U32": rawValue = buffer.getLong() & 0xFFFFFFFFL; break;
case "FLOAT": rawValue = buffer.getFloat(); break;
default: throw new IllegalArgumentException("不支持的数据类型:" + var.getDataType());
}
return rawValue * var.getMultiplier();
}
private byte[] calculateCrc16(byte[] data) {
int crc = 0xFFFF;
for (byte b : data) {
crc = (crc >> 8) ^ ModbusConstant.CRC16_TABLE[(crc ^ (b & 0xFF)) & 0xFF];
}
return new byte[]{(byte) (crc & 0xFF), (byte) ((crc >> 8) & 0xFF)};
}
private String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X ", b));
}
return sb.toString().trim();
}
}

View File

@ -0,0 +1,170 @@
package org.dromara.tcpfuwu.server;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.dromara.shebei.service.IOpsSbLiebiaoService;
import org.dromara.tcpfuwu.domain.DeviceCache;
import org.dromara.tcpfuwu.handler.DeviceHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.*;
/**
* 统一TCP服务器发送Modbus请求前必须验证心跳状态
*/
@Component
@Slf4j
public class UnifiedTcpServer {
@Value("${tcp.server.port:8888}")
private int tcpPort;
@Value("${tcp.server.heartbeat-timeout: 60000}")
private int heartbeatExpireMs;
// 线程池:处理设备连接
private ExecutorService clientExecutor;
// 服务器Socket
private ServerSocket serverSocket;
// 设备缓存
private final Map<String, DeviceCache> deviceCache = new ConcurrentHashMap<>();
// 缓存清理线程
private ScheduledExecutorService cacheCleaner;
// 服务运行状态
private volatile boolean isRunning = false;
@Autowired
private IOpsSbLiebiaoService sbLiebiaoService;
/**
* 初始化方法Spring容器启动后自动调用
*/
@PostConstruct
public void start() {
try {
// 初始化服务器Socket
serverSocket = new ServerSocket(tcpPort);
// 初始化线程池(处理设备连接)
clientExecutor = Executors.newCachedThreadPool(r -> {
Thread thread = new Thread(r);
thread.setName("tcp-client-handler-" + thread.getId());
thread.setDaemon(true); // 守护线程,随主线程退出
return thread;
});
// 初始化缓存清理线程
initCacheCleaner();
isRunning = true;
System.out.printf("【TCP服务启动成功】监听端口%d%n", tcpPort);
// 启动接受连接的线程
new Thread(this::acceptConnections, "tcp-acceptor").start();
} catch (Exception e) {
System.err.println("【TCP服务启动失败】" + e.getMessage());
e.printStackTrace();
}
}
/**
* 接受设备连接的循环
*/
private void acceptConnections() {
while (isRunning) {
try {
// 阻塞等待设备连接
Socket clientSocket = serverSocket.accept();
String deviceAddr = clientSocket.getInetAddress() + ":" + clientSocket.getPort();
System.out.printf("%n【设备上线】地址%s%n", deviceAddr);
// 初始化设备缓存
DeviceCache cache = new DeviceCache();
cache.setCreateTime(System.currentTimeMillis());
deviceCache.put(deviceAddr, cache);
// 提交设备处理任务
clientExecutor.submit(() ->
new DeviceHandler(clientSocket, deviceAddr, cache, deviceCache,sbLiebiaoService).handle()
);
} catch (Exception e) {
if (isRunning) { // 非关闭状态下的异常
System.err.println("【连接接受异常】" + e.getMessage());
}
}
}
}
/**
* 初始化缓存清理线程
*/
private void initCacheCleaner() {
cacheCleaner = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("cache-cleaner");
thread.setDaemon(true);
return thread;
});
// 每20秒清理一次过期设备
cacheCleaner.scheduleAtFixedRate(() -> {
long currentTime = System.currentTimeMillis();
for (Map.Entry<String, DeviceCache> e : deviceCache.entrySet()) {
String deviceAddr = e.getKey();
DeviceCache cache = e.getValue();
// 判断是否过期:未收到过心跳 或 超过过期时间
boolean isExpired = cache.getLastHeartbeatTime() == 0
? (currentTime - cache.getCreateTime() > heartbeatExpireMs)
: (currentTime - cache.getLastHeartbeatTime() > heartbeatExpireMs);
if (isExpired && !cache.isExpired()) {
cache.setExpired(true);
deviceCache.remove(deviceAddr);
System.out.printf("【设备过期】设备地址:%s已清理缓存%n", deviceAddr);
}
}
}, 0, 20, TimeUnit.SECONDS);
}
/**
* 销毁方法Spring容器关闭前自动调用
*/
@PreDestroy
public void stop() {
isRunning = false;
System.out.println("【TCP服务开始关闭】");
// 关闭服务器Socket
try {
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
} catch (Exception e) {
System.err.println("【关闭ServerSocket异常】" + e.getMessage());
}
// 关闭线程池
if (clientExecutor != null) {
clientExecutor.shutdown();
}
// 关闭缓存清理线程
if (cacheCleaner != null) {
cacheCleaner.shutdown();
}
// 清理设备缓存
deviceCache.clear();
System.out.println("【TCP服务已关闭】");
}
}

View File

@ -0,0 +1,45 @@
package org.dromara.tcpfuwu.starter;
import lombok.extern.slf4j.Slf4j;
import org.dromara.tcpfuwu.server.UnifiedTcpServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* Spring启动时自动初始化TCP服务器关闭时释放资源
*/
@Component
@Slf4j
public class TcpServersStarter implements CommandLineRunner {
private final UnifiedTcpServer unifiedTcpServer;
// 构造注入两个TCP服务器Bean
public TcpServersStarter( UnifiedTcpServer unifiedTcpServer) {
this.unifiedTcpServer = unifiedTcpServer;
}
/**
* Spring Boot启动后执行CommandLineRunner接口方法
*/
@Override
public void run(String... args) throws Exception {
log.info("【TCP服务器启动器】开始初始化Modbus和逆变器心跳服务器...");
// 启动两个服务器独立线程不阻塞Spring主线程
unifiedTcpServer.start();
log.info("【TCP服务器启动器】所有TCP服务器初始化完成");
}
/**
* Spring容器销毁前执行释放资源
*/
@PreDestroy
public void stopTcpServers() {
log.info("【TCP服务器启动器】开始关闭所有TCP服务器...");
unifiedTcpServer.stop();
log.info("【TCP服务器启动器】所有TCP服务器已关闭");
}
}

View File

@ -0,0 +1,45 @@
package org.dromara.tcpfuwu.util;
import org.springframework.stereotype.Component;
/**
* 字节数组处理工具类
*/
@Component
public class ByteUtils {
/**
* 字节数组转16进制字符串空格分隔
*/
public String bytesToHex(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return "";
}
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X ", b));
}
return sb.toString().trim();
}
/**
* 字节数组转ASCII字符串不可打印字符用[0xXX]表示)
*/
public String bytesToAsciiString(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return "";
}
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
// 可打印ASCII范围32空格~126~
if (b >= 32 && b <= 126) {
sb.append((char) b);
} else {
sb.append("[0x").append(String.format("%02X", b)).append("]");
}
}
return sb.toString();
}
}