diff --git a/ruoyi-modules/xny-ops/pom.xml b/ruoyi-modules/xny-ops/pom.xml index 6d19103..e8ae5e9 100644 --- a/ruoyi-modules/xny-ops/pom.xml +++ b/ruoyi-modules/xny-ops/pom.xml @@ -98,6 +98,11 @@ + + org.redisson + redisson-spring-boot-starter + + diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/controller/OpsSbDataController.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/controller/OpsSbDataController.java new file mode 100644 index 0000000..3d42239 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/controller/OpsSbDataController.java @@ -0,0 +1,106 @@ +package org.dromara.shebei.controller; + +import java.util.List; + +import lombok.RequiredArgsConstructor; +import jakarta.servlet.http.HttpServletResponse; +import jakarta.validation.constraints.*; +import cn.dev33.satoken.annotation.SaCheckPermission; +import org.springframework.web.bind.annotation.*; +import org.springframework.validation.annotation.Validated; +import org.dromara.common.idempotent.annotation.RepeatSubmit; +import org.dromara.common.log.annotation.Log; +import org.dromara.common.web.core.BaseController; +import org.dromara.common.mybatis.core.page.PageQuery; +import org.dromara.common.core.domain.R; +import org.dromara.common.core.validate.AddGroup; +import org.dromara.common.core.validate.EditGroup; +import org.dromara.common.log.enums.BusinessType; +import org.dromara.common.excel.utils.ExcelUtil; +import org.dromara.shebei.domain.vo.OpsSbDataVo; +import org.dromara.shebei.domain.bo.OpsSbDataBo; +import org.dromara.shebei.service.IOpsSbDataService; +import org.dromara.common.mybatis.core.page.TableDataInfo; + +/** + * 运维-设备管理-设备数据 + * 前端访问路由地址为:/shebei/sbData + * + * @author LionLi + * @date 2025-11-13 + */ +@Validated +@RequiredArgsConstructor +@RestController +@RequestMapping("/sbData") +public class OpsSbDataController extends BaseController { + + private final IOpsSbDataService opsSbDataService; + + /** + * 查询运维-设备管理-设备数据列表 + */ + @SaCheckPermission("shebei:sbData:list") + @GetMapping("/list") + public TableDataInfo list(OpsSbDataBo bo, PageQuery pageQuery) { + return opsSbDataService.queryPageList(bo, pageQuery); + } + + /** + * 导出运维-设备管理-设备数据列表 + */ + @SaCheckPermission("shebei:sbData:export") + @Log(title = "运维-设备管理-设备数据", businessType = BusinessType.EXPORT) + @PostMapping("/export") + public void export(OpsSbDataBo bo, HttpServletResponse response) { + List list = opsSbDataService.queryList(bo); + ExcelUtil.exportExcel(list, "运维-设备管理-设备数据", OpsSbDataVo.class, response); + } + + /** + * 获取运维-设备管理-设备数据详细信息 + * + * @param id 主键 + */ + @SaCheckPermission("shebei:sbData:query") + @GetMapping("/{id}") + public R getInfo(@NotNull(message = "主键不能为空") + @PathVariable("id") Long id) { + return R.ok(opsSbDataService.queryById(id)); + } + + /** + * 新增运维-设备管理-设备数据 + */ + @SaCheckPermission("shebei:sbData:add") + @Log(title = "运维-设备管理-设备数据", businessType = BusinessType.INSERT) + @RepeatSubmit() + @PostMapping() + public R add(@Validated(AddGroup.class) @RequestBody OpsSbDataBo bo) { + return toAjax(opsSbDataService.insertByBo(bo)); + } + + /** + * 修改运维-设备管理-设备数据 + */ + @SaCheckPermission("shebei:sbData:edit") + @Log(title = "运维-设备管理-设备数据", businessType = BusinessType.UPDATE) + @RepeatSubmit() + @PutMapping() + public R edit(@Validated(EditGroup.class) @RequestBody OpsSbDataBo bo) { + return toAjax(opsSbDataService.updateByBo(bo)); + } + + /** + * 删除运维-设备管理-设备数据 + * + * @param ids 主键串 + */ + @SaCheckPermission("shebei:sbData:remove") + @Log(title = "运维-设备管理-设备数据", businessType = BusinessType.DELETE) + @DeleteMapping("/{ids}") + public R remove(@NotEmpty(message = "主键不能为空") + @PathVariable("ids") Long[] ids) { + return toAjax(opsSbDataService.deleteWithValidByIds(List.of(ids), true)); + } +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/domain/OpsSbData.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/domain/OpsSbData.java new file mode 100644 index 0000000..e82e8f6 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/domain/OpsSbData.java @@ -0,0 +1,62 @@ +package org.dromara.shebei.domain; + +import org.dromara.common.mybatis.core.domain.BaseEntity; +import com.baomidou.mybatisplus.annotation.*; +import lombok.Data; +import lombok.EqualsAndHashCode; +import java.math.BigDecimal; + +import java.io.Serial; + +/** + * 运维-设备管理-设备数据对象 ops_sb_data + * + * @author LionLi + * @date 2025-11-13 + */ +@Data +@EqualsAndHashCode(callSuper = true) +@TableName("ops_sb_data") +public class OpsSbData extends BaseEntity { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * id + */ + @TableId(value = "id") + private Long id; + + /** + * 项目id + */ + private Long projectId; + + /** + * 设备sn码 + */ + private String sn; + + /** + * 功能码 + */ + private String functionCode; + + /** + * 变量名 + */ + private String variableName; + + /** + * 数量 + */ + private Double count; + + /** + * 单位 + */ + private String unit; + + +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/domain/bo/OpsSbDataBo.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/domain/bo/OpsSbDataBo.java new file mode 100644 index 0000000..20a2578 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/domain/bo/OpsSbDataBo.java @@ -0,0 +1,66 @@ +package org.dromara.shebei.domain.bo; + +import org.dromara.shebei.domain.OpsSbData; +import org.dromara.common.mybatis.core.domain.BaseEntity; +import org.dromara.common.core.validate.AddGroup; +import org.dromara.common.core.validate.EditGroup; +import io.github.linpeilie.annotations.AutoMapper; +import lombok.Data; +import lombok.EqualsAndHashCode; +import jakarta.validation.constraints.*; +import java.math.BigDecimal; + +/** + * 运维-设备管理-设备数据业务对象 ops_sb_data + * + * @author LionLi + * @date 2025-11-13 + */ +@Data +@EqualsAndHashCode(callSuper = true) +@AutoMapper(target = OpsSbData.class, reverseConvertGenerate = false) +public class OpsSbDataBo extends BaseEntity { + + /** + * id + */ + private Long id; + + /** + * 项目id + */ + @NotNull(message = "项目id不能为空", groups = { AddGroup.class, EditGroup.class }) + private Long projectId; + + /** + * 设备sn码 + */ + @NotBlank(message = "设备sn码不能为空", groups = { AddGroup.class, EditGroup.class }) + private String sn; + + /** + * 功能码 + */ + @NotBlank(message = "功能码不能为空", groups = { AddGroup.class, EditGroup.class }) + private String functionCode; + + /** + * 变量名 + */ + @NotBlank(message = "变量名不能为空", groups = { AddGroup.class, EditGroup.class }) + private String variableName; + + /** + * 数量 + */ + @NotNull(message = "数量不能为空", groups = { AddGroup.class, EditGroup.class }) + private Double count; + + /** + * 单位 + */ + @NotBlank(message = "单位不能为空", groups = { AddGroup.class, EditGroup.class }) + private String unit; + + +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/domain/vo/OpsSbDataVo.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/domain/vo/OpsSbDataVo.java new file mode 100644 index 0000000..647d9c2 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/domain/vo/OpsSbDataVo.java @@ -0,0 +1,75 @@ +package org.dromara.shebei.domain.vo; + +import java.math.BigDecimal; +import org.dromara.shebei.domain.OpsSbData; +import cn.idev.excel.annotation.ExcelIgnoreUnannotated; +import cn.idev.excel.annotation.ExcelProperty; +import org.dromara.common.excel.annotation.ExcelDictFormat; +import org.dromara.common.excel.convert.ExcelDictConvert; +import io.github.linpeilie.annotations.AutoMapper; +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; +import java.util.Date; + + + +/** + * 运维-设备管理-设备数据视图对象 ops_sb_data + * + * @author LionLi + * @date 2025-11-13 + */ +@Data +@ExcelIgnoreUnannotated +@AutoMapper(target = OpsSbData.class) +public class OpsSbDataVo implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * id + */ + @ExcelProperty(value = "id") + private Long id; + + /** + * 项目id + */ + @ExcelProperty(value = "项目id") + private Long projectId; + + /** + * 设备sn码 + */ + @ExcelProperty(value = "设备sn码") + private String sn; + + /** + * 功能码 + */ + @ExcelProperty(value = "功能码") + private String functionCode; + + /** + * 变量名 + */ + @ExcelProperty(value = "变量名") + private String variableName; + + /** + * 数量 + */ + @ExcelProperty(value = "数量") + private Double count; + + /** + * 单位 + */ + @ExcelProperty(value = "单位") + private String unit; + + +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/mapper/OpsSbDataMapper.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/mapper/OpsSbDataMapper.java new file mode 100644 index 0000000..ee4dadf --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/mapper/OpsSbDataMapper.java @@ -0,0 +1,15 @@ +package org.dromara.shebei.mapper; + +import org.dromara.shebei.domain.OpsSbData; +import org.dromara.shebei.domain.vo.OpsSbDataVo; +import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; + +/** + * 运维-设备管理-设备数据Mapper接口 + * + * @author LionLi + * @date 2025-11-13 + */ +public interface OpsSbDataMapper extends BaseMapperPlus { + +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/service/IOpsSbDataService.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/service/IOpsSbDataService.java new file mode 100644 index 0000000..41a2405 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/service/IOpsSbDataService.java @@ -0,0 +1,70 @@ +package org.dromara.shebei.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import org.dromara.shebei.domain.OpsSbData; +import org.dromara.shebei.domain.vo.OpsSbDataVo; +import org.dromara.shebei.domain.bo.OpsSbDataBo; +import org.dromara.common.mybatis.core.page.TableDataInfo; +import org.dromara.common.mybatis.core.page.PageQuery; + +import java.util.Collection; +import java.util.List; + +/** + * 运维-设备管理-设备数据Service接口 + * + * @author LionLi + * @date 2025-11-13 + */ +public interface IOpsSbDataService extends IService { + + /** + * 查询运维-设备管理-设备数据 + * + * @param id 主键 + * @return 运维-设备管理-设备数据 + */ + OpsSbDataVo queryById(Long id); + + /** + * 分页查询运维-设备管理-设备数据列表 + * + * @param bo 查询条件 + * @param pageQuery 分页参数 + * @return 运维-设备管理-设备数据分页列表 + */ + TableDataInfo queryPageList(OpsSbDataBo bo, PageQuery pageQuery); + + /** + * 查询符合条件的运维-设备管理-设备数据列表 + * + * @param bo 查询条件 + * @return 运维-设备管理-设备数据列表 + */ + List queryList(OpsSbDataBo bo); + + /** + * 新增运维-设备管理-设备数据 + * + * @param bo 运维-设备管理-设备数据 + * @return 是否新增成功 + */ + Boolean insertByBo(OpsSbDataBo bo); + + /** + * 修改运维-设备管理-设备数据 + * + * @param bo 运维-设备管理-设备数据 + * @return 是否修改成功 + */ + Boolean updateByBo(OpsSbDataBo bo); + + /** + * 校验并批量删除运维-设备管理-设备数据信息 + * + * @param ids 待删除的主键集合 + * @param isValid 是否进行有效性校验 + * @return 是否删除成功 + */ + Boolean deleteWithValidByIds(Collection ids, Boolean isValid); +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/service/impl/OpsSbDataServiceImpl.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/service/impl/OpsSbDataServiceImpl.java new file mode 100644 index 0000000..44e22ee --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/shebei/service/impl/OpsSbDataServiceImpl.java @@ -0,0 +1,140 @@ +package org.dromara.shebei.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.dromara.common.core.utils.MapstructUtils; +import org.dromara.common.core.utils.StringUtils; +import org.dromara.common.mybatis.core.page.TableDataInfo; +import org.dromara.common.mybatis.core.page.PageQuery; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.shebei.domain.OpsSbLiebiao; +import org.dromara.shebei.mapper.OpsSbLiebiaoMapper; +import org.springframework.stereotype.Service; +import org.dromara.shebei.domain.bo.OpsSbDataBo; +import org.dromara.shebei.domain.vo.OpsSbDataVo; +import org.dromara.shebei.domain.OpsSbData; +import org.dromara.shebei.mapper.OpsSbDataMapper; +import org.dromara.shebei.service.IOpsSbDataService; + +import java.util.List; +import java.util.Map; +import java.util.Collection; + +/** + * 运维-设备管理-设备数据Service业务层处理 + * + * @author LionLi + * @date 2025-11-13 + */ +@Slf4j +@RequiredArgsConstructor +@Service +public class OpsSbDataServiceImpl extends ServiceImpl implements IOpsSbDataService { + + private final OpsSbDataMapper baseMapper; + + /** + * 查询运维-设备管理-设备数据 + * + * @param id 主键 + * @return 运维-设备管理-设备数据 + */ + @Override + public OpsSbDataVo queryById(Long id){ + return baseMapper.selectVoById(id); + } + + /** + * 分页查询运维-设备管理-设备数据列表 + * + * @param bo 查询条件 + * @param pageQuery 分页参数 + * @return 运维-设备管理-设备数据分页列表 + */ + @Override + public TableDataInfo queryPageList(OpsSbDataBo bo, PageQuery pageQuery) { + LambdaQueryWrapper lqw = buildQueryWrapper(bo); + Page result = baseMapper.selectVoPage(pageQuery.build(), lqw); + return TableDataInfo.build(result); + } + + /** + * 查询符合条件的运维-设备管理-设备数据列表 + * + * @param bo 查询条件 + * @return 运维-设备管理-设备数据列表 + */ + @Override + public List queryList(OpsSbDataBo bo) { + LambdaQueryWrapper lqw = buildQueryWrapper(bo); + return baseMapper.selectVoList(lqw); + } + + private LambdaQueryWrapper buildQueryWrapper(OpsSbDataBo bo) { + Map params = bo.getParams(); + LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); + lqw.orderByAsc(OpsSbData::getId); + lqw.eq(bo.getProjectId() != null, OpsSbData::getProjectId, bo.getProjectId()); + lqw.eq(StringUtils.isNotBlank(bo.getSn()), OpsSbData::getSn, bo.getSn()); + lqw.eq(StringUtils.isNotBlank(bo.getFunctionCode()), OpsSbData::getFunctionCode, bo.getFunctionCode()); + lqw.like(StringUtils.isNotBlank(bo.getVariableName()), OpsSbData::getVariableName, bo.getVariableName()); + lqw.eq(bo.getCount() != null, OpsSbData::getCount, bo.getCount()); + lqw.eq(StringUtils.isNotBlank(bo.getUnit()), OpsSbData::getUnit, bo.getUnit()); + return lqw; + } + + /** + * 新增运维-设备管理-设备数据 + * + * @param bo 运维-设备管理-设备数据 + * @return 是否新增成功 + */ + @Override + public Boolean insertByBo(OpsSbDataBo bo) { + OpsSbData add = MapstructUtils.convert(bo, OpsSbData.class); + validEntityBeforeSave(add); + boolean flag = baseMapper.insert(add) > 0; + if (flag) { + bo.setId(add.getId()); + } + return flag; + } + + /** + * 修改运维-设备管理-设备数据 + * + * @param bo 运维-设备管理-设备数据 + * @return 是否修改成功 + */ + @Override + public Boolean updateByBo(OpsSbDataBo bo) { + OpsSbData update = MapstructUtils.convert(bo, OpsSbData.class); + validEntityBeforeSave(update); + return baseMapper.updateById(update) > 0; + } + + /** + * 保存前的数据校验 + */ + private void validEntityBeforeSave(OpsSbData entity){ + //TODO 做一些数据校验,如唯一约束 + } + + /** + * 校验并批量删除运维-设备管理-设备数据信息 + * + * @param ids 待删除的主键集合 + * @param isValid 是否进行有效性校验 + * @return 是否删除成功 + */ + @Override + public Boolean deleteWithValidByIds(Collection ids, Boolean isValid) { + if(isValid){ + //TODO 做一些业务上的校验,判断是否需要校验 + } + return baseMapper.deleteByIds(ids) > 0; + } +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/constant/ModbusConstant.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/constant/ModbusConstant.java index eed0502..b337dc2 100644 --- a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/constant/ModbusConstant.java +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/constant/ModbusConstant.java @@ -41,4 +41,6 @@ public class ModbusConstant { 0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641, 0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040 }; + public static final int SOCKET_TIMEOUT = 30000; + public static final int PARAM_REFRESH_INTERVAL = 10; } diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/DeviceCache.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/DeviceCache.java index d7c4a08..3caf8f7 100644 --- a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/DeviceCache.java +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/DeviceCache.java @@ -1,3 +1,4 @@ +// DeviceCache.java(设备缓存类) package org.dromara.tcpfuwu.domain; import lombok.Data; @@ -7,10 +8,9 @@ import java.util.concurrent.ConcurrentHashMap; @Data public class DeviceCache { - private String snCode; // 设备SN码 - private long createTime; // 连接创建时间(毫秒) - private long lastHeartbeatTime; // 最后心跳时间(毫秒) - private boolean isExpired; // 是否过期 - // 变量值缓存:key=变量名,value=解析后的值(带倍率) - private Map variableValues = new ConcurrentHashMap<>(); + private String deviceAddr; // 设备地址(IP:PORT) + private String snCode; // 设备SN码 + private long lastHeartbeatTime; // 最后心跳时间 + private Map variableValues = new ConcurrentHashMap<>(); // 变量值缓存 + } diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusRequestContext.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusRequestContext.java new file mode 100644 index 0000000..74147da --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusRequestContext.java @@ -0,0 +1,27 @@ +package org.dromara.tcpfuwu.domain; + +import lombok.Data; + +import java.util.UUID; + +/** + * 记录发送的请求信息,用于匹配响应 + */ +@Data +public class ModbusRequestContext { + private String requestId; // 唯一标识(如UUID) + private String sn; // 设备SN + private String variable; // 变量名称(如“有功功率”“总发电量”) + private String unit; // 单位 + private Long id; + private int slaveAddr; // 从站地址 + private int funcCode; // 功能码 + private int registerCount; // 读取寄存器数量 + private int startRegister; // 起始寄存器地址(十进制,用于精准匹配) + private long sendTime; // 发送时间(用于超时判断) + + // 构造时自动生成唯一ID + public ModbusRequestContext() { + this.requestId = UUID.randomUUID().toString().replace("-", ""); + } +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusVariable.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusVariable.java index ff85f43..ce78315 100644 --- a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusVariable.java +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/domain/ModbusVariable.java @@ -6,10 +6,10 @@ import lombok.Data; public class ModbusVariable { private Long id; private String snCode; // 设备SN码 - private Integer slaveId; // 从机地址 - private Integer funcCode; // 功能码 - private Integer startRegAddr; // 起始寄存器地址 - private Integer regQuantity; // 寄存器数量 + private int slaveId; // 从机地址 + private int funcCode; // 功能码 + private int startRegAddr; // 起始寄存器地址 + private int regQuantity; // 寄存器数量 private String variableName; // 变量名 private String dataType; // 数据类型(S16/U16/S32/U32/FLOAT) private Double multiplier; // 数据倍率 diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/handler/DeviceHandler.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/handler/DeviceHandler.java index 6325c8b..185c461 100644 --- a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/handler/DeviceHandler.java +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/handler/DeviceHandler.java @@ -1,430 +1,858 @@ package org.dromara.tcpfuwu.handler; -import org.apache.ibatis.session.SqlSession; -import org.apache.ibatis.session.SqlSessionFactory; +import cn.hutool.json.JSONUtil; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.shebei.domain.OpsSbData; import org.dromara.shebei.domain.dto.OpsSbLiebiaoDto; import org.dromara.shebei.domain.vo.OpsSbMbBianliangVo; +import org.dromara.shebei.service.IOpsSbDataService; import org.dromara.shebei.service.IOpsSbLiebiaoService; import org.dromara.tcpfuwu.constant.ModbusConstant; import org.dromara.tcpfuwu.domain.DeviceCache; +import org.dromara.tcpfuwu.domain.ModbusRequestContext; import org.dromara.tcpfuwu.domain.ModbusVariable; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.dromara.tcpfuwu.util.ThreadPoolUtil; +import org.springframework.scheduling.annotation.Scheduled; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.math.BigDecimal; import java.net.Socket; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** - * 设备连接处理类:处理单设备的心跳和Modbus通信 + * 设备处理类:共用单线程池+单线程读流分逻辑(心跳/Modbus) */ -//@Component +@Slf4j public class DeviceHandler { // 配置参数 - private static final long HEARTBEAT_EXPIRE_MS = 30 * 1000; - private static final int HEARTBEAT_LEN = 20; - private static final int SN_CODE_LEN = 20; - private static final int MODBUS_REQUEST_CYCLE = 10; // 请求周期(秒) + private static final long HEARTBEAT_EXPIRE_MS = 120 * 1000; // 心跳超时60秒 + private static final int HEARTBEAT_LEN = 20; // 心跳包固定20字节(纯数字) + private static final int MODBUS_REQUEST_CYCLE = 20; // Modbus请求周期20秒 + private static final int MODBUS_RECEIVE_TIMEOUT = 30000; // Socket读超时30秒 - // 依赖注入 -// @Autowired + // 依赖服务 private final IOpsSbLiebiaoService sbLiebiaoService; + private final IOpsSbDataService sbDataService; - // 设备信息 + // 设备基础信息 private final Socket clientSocket; private final String deviceAddr; private final DeviceCache deviceCache; private final Map globalCache; - // 状态控制 + // 状态控制(原子变量确保线程安全) private final AtomicBoolean isRunning = new AtomicBoolean(true); + private final AtomicBoolean isReadThreadStarted = new AtomicBoolean(false); // 读流线程启动标记 + + // IO流 private InputStream in; private OutputStream out; - private ScheduledExecutorService modbusScheduler; + // 任务句柄(用于停止定时任务) + private ScheduledFuture modbusSendFuture; // Modbus定时发送任务 + private Future readThreadFuture; // 单线程读流任务 + + // 定义线程安全的请求队列(存储未匹配响应的请求) + private final Queue pendingRequests = new ConcurrentLinkedQueue<>(); + // 定义队列锁(与pendingRequests绑定) + private final Object queueLock = new Object(); + + // 构造函数 public DeviceHandler( Socket clientSocket, String deviceAddr, DeviceCache deviceCache, Map globalCache, - IOpsSbLiebiaoService sbLiebiaoService // 新增:手动传递Spring服务 + IOpsSbLiebiaoService sbLiebiaoService, + IOpsSbDataService sbDataService ) { this.clientSocket = clientSocket; this.deviceAddr = deviceAddr; this.deviceCache = deviceCache; this.globalCache = globalCache; - this.sbLiebiaoService = sbLiebiaoService; // 赋值 + this.sbLiebiaoService = sbLiebiaoService; + this.sbDataService = sbDataService; + + // 配置Socket读超时(避免读流永久阻塞) + try { + this.clientSocket.setSoTimeout(MODBUS_RECEIVE_TIMEOUT); + } catch (IOException e) { + System.err.printf("【Socket配置异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); + this.isRunning.set(false); + } } /** - * 处理设备通信的主方法 + * 主处理逻辑:初始化资源+启动单线程读流+Modbus定时发送 */ public void handle() { try { - // 初始化IO流 + // 1. 初始化IO流(必须在启动读流线程前) in = clientSocket.getInputStream(); out = clientSocket.getOutputStream(); + System.out.printf("【IO流初始化成功】设备:%s%n", deviceAddr); - // 启动心跳接收线程 - startHeartbeatReceiver(); + // 2. 启动【单线程读流】(全局线程池,唯一读流线程,避免多线程冲突) +// startSingleReadThread(); + startMultiThreadProcess(); - // 启动Modbus定时请求线程 - startModbusScheduler(); + // 3. 启动【Modbus定时发送任务】(提交到全局线程池) + startModbusSendTask(); - // 阻塞等待线程结束 + // 4. 阻塞等待设备离线(避免主线程提前退出) synchronized (this) { - wait(); + while (isRunning.get()) { + wait(HEARTBEAT_EXPIRE_MS); + // 心跳超时检查:超过60秒无心跳则强制离线 + if (System.currentTimeMillis() - deviceCache.getLastHeartbeatTime() > HEARTBEAT_EXPIRE_MS) { + System.err.printf("【心跳超时】设备:%s,强制离线%n", deviceAddr); + isRunning.set(false); + } + } } } catch (Exception e) { - System.err.printf("【设备处理异常】地址:%s,原因:%s%n", deviceAddr, e.getMessage()); + if (isRunning.get()) { + System.err.printf("【设备主逻辑异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); + } } finally { - // 清理资源 - stop(); + // 5. 全量资源清理 + stopAll(); } } + // ------------------------------ 核心:单线程读流+心跳/Modbus分逻辑处理 ------------------------------ +// /** +// * 启动单线程读流:唯一线程读取输入流,先判断是否为心跳,再分逻辑处理 +// */ +// private void startSingleReadThread() { +// if (!isRunning.get() || isReadThreadStarted.get()) return; +// +// // 提交读流任务到全局线程池(ScheduledExecutorService支持普通任务) +// readThreadFuture = ThreadPoolUtil.DEVICE_CONN_POOL.submit(() -> { +// isReadThreadStarted.set(true); +// byte[] readBuffer = new byte[1024]; // 读流缓冲区(足够容纳心跳/Modbus响应) +// +// while (isRunning.get()) { +// try { +// // 阻塞读流:Socket已设超时,超时抛SocketTimeoutException +// int readLen = in.read(readBuffer); +// if (readLen <= 0) { +// System.err.printf("【读流失败】设备:%s,连接断开(readLen=%d)%n", deviceAddr, readLen); +// break; +// } +// +// // 截取有效数据(排除缓冲区冗余) +// byte[] receivedData = Arrays.copyOf(readBuffer, readLen); +// System.out.printf("【收到数据】设备:%s,长度:%d字节,内容:%s%n", +// deviceAddr, readLen, bytesToHex(receivedData)); +// +// // 核心:先判断是否为心跳,再分逻辑处理 +// if (isHeartbeatData(receivedData)) { +// handleHeartbeat(receivedData); // 处理心跳 +// } else { +// handleModbusResponse(receivedData); // 处理Modbus响应 +// } +// +// } catch (SocketTimeoutException e) { +// // 读流超时:忽略(继续循环监听,避免线程退出) +// continue; +// } catch (Exception e) { +// if (isRunning.get()) { +// System.err.printf("【读流线程异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); +// } +// break; +// } +// } +// +// // 读流线程退出:标记状态+唤醒主等待线程 +// isReadThreadStarted.set(false); +// isRunning.set(false); +// synchronized (DeviceHandler.this) { +// DeviceHandler.this.notify(); +// } +// System.out.printf("【读流线程退出】设备:%s%n", deviceAddr); +// }); +// } + // 1. 定义处理用的线程池(负责并行处理心跳和Modbus响应) +// 注意:线程池应全局共享或随DeviceHandler生命周期管理 + private final ExecutorService processThreadPool = new ThreadPoolExecutor( + 2, // 核心线程数(根据处理逻辑复杂度调整) + 4, // 最大线程数 + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(100), // 任务队列 + new ThreadFactory() { + private final AtomicInteger counter = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "device-process-" + counter.incrementAndGet()); + } + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 任务满时,由提交线程(读流线程)执行,避免任务丢失 + ); + /** - * 启动心跳接收线程 + * 启动单线程读流(保持读取唯一性)+ 多线程处理(并行处理数据) */ - private void startHeartbeatReceiver() { - new Thread(() -> { - try { - byte[] buffer = new byte[1024]; - while (isRunning.get()) { - int len = in.read(buffer); - if (len <= 0) { - System.out.printf("【心跳接收失败】设备:%s,连接断开%n", deviceAddr); + private void startMultiThreadProcess() { + if (!isRunning.get() || isReadThreadStarted.get()) return; + + // 读流线程:唯一线程负责读取数据(保持单线程,避免流读取混乱) + readThreadFuture = ThreadPoolUtil.DEVICE_CONN_POOL.submit(() -> { + isReadThreadStarted.set(true); + byte[] readBuffer = new byte[1024]; + + while (isRunning.get()) { + try { + // 1. 单线程阻塞读流(核心:流读取必须串行) + int readLen = in.read(readBuffer); + if (readLen <= 0) { + System.err.printf("【读流失败】设备:%s,连接断开(readLen=%d)%n", deviceAddr, readLen); break; } - byte[] data = Arrays.copyOf(buffer, len); - if (isHeartbeatData(data)) { - // 解析SN码 - String snCode = new String(Arrays.copyOfRange(data, 0, data.length)).trim(); - // 更新缓存 - deviceCache.setSnCode(snCode); - deviceCache.setLastHeartbeatTime(System.currentTimeMillis()); - System.out.printf("【心跳更新】设备:%s,SN:%s%n", deviceAddr, snCode); - } - } - } catch (Exception e) { - System.err.printf("【心跳线程异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); - } finally { - // 心跳线程结束,标记设备离线 - isRunning.set(false); - synchronized (DeviceHandler.this) { - DeviceHandler.this.notify(); - } - } - }, "heartbeat-" + deviceAddr).start(); - } + // 2. 截取有效数据(复制一份,避免缓冲区被覆盖) + byte[] receivedData = Arrays.copyOf(readBuffer, readLen); + System.out.printf("【收到数据】设备:%s,长度:%d字节,内容:%s%n", + deviceAddr, readLen, bytesToHex(receivedData)); - /** - * 启动Modbus定时请求调度器 - */ - private void startModbusScheduler() { - modbusScheduler = Executors.newSingleThreadScheduledExecutor( - r -> new Thread(r, "modbus-" + deviceAddr) - ); + // 3. 分发数据到处理线程池(多线程并行处理) + dispatchToProcess(receivedData); - modbusScheduler.scheduleAtFixedRate(() -> { - if (!isRunning.get() || !clientSocket.isConnected() || clientSocket.isClosed()) { - stop(); - return; - } - - // 未获取SN码则等待 - if (deviceCache.getSnCode() == null) { - System.out.printf("【等待心跳】设备:%s%n", deviceAddr); - return; - } - - // 查询变量并处理 - List variables = queryVariables(deviceCache.getSnCode()); - if (variables.isEmpty()) { - System.out.printf("【无变量配置】SN:%s%n", deviceCache.getSnCode()); - return; - } - - // 遍历变量发送请求 - for (ModbusVariable var : variables) { - try { - handleVariable(var); + } catch (SocketTimeoutException e) { + // 读超时:继续循环监听 + continue; } catch (Exception e) { - System.err.printf("【变量处理失败】SN:%s,变量:%s,原因:%s%n", - deviceCache.getSnCode(), var.getVariableName(), e.getMessage()); + if (isRunning.get()) { + System.err.printf("【读流线程异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); + } + break; } } - }, 0, MODBUS_REQUEST_CYCLE, TimeUnit.SECONDS); + // 4. 资源清理:读流线程退出时,关闭处理线程池 + cleanup(); + System.out.printf("【读流线程退出】设备:%s%n", deviceAddr); + }); } /** - * 处理单个变量的请求与解析 + * 分发数据到处理线程池(并行处理) */ -// private void handleVariable(ModbusVariable var) throws Exception { -// // 生成请求帧 -// byte[] request = generateModbusFrame(var); -// System.out.printf("【发送请求】SN:%s,变量:%s,帧:%s%n", -// deviceCache.getSnCode(), var.getVariableName(), bytesToHex(request)); -// -// // 发送请求 -// out.write(request); -// out.flush(); -// -// // 接收响应(超时3秒) -// byte[] response = receiveResponse(10000); -// if (response == null) { -// throw new Exception("响应超时"); -// } -// -// // 验证响应 -// if (!validateResponse(response, var)) { -// throw new Exception("响应验证失败"); -// } -// -// // 解析数据 -// Object value = parseValue(response, var); -// System.out.printf("【解析成功】SN:%s,变量:%s,值:%s %s%n", -// deviceCache.getSnCode(), var.getVariableName(), value, var.getUnit()); -// -// // 更新缓存 -// deviceCache.getVariableValues().put(var.getVariableName(), value); -// } - private void handleVariable(ModbusVariable var) throws Exception { - // 1. 校验设备是否在线(避免无效通信) - if (!isDeviceOnline()) { - throw new Exception("设备离线,无法通信"); - } - - // 2. 生成请求帧并打印详细信息 - byte[] request = generateModbusFrame(var); - String requestHex = bytesToHex(request); - System.out.printf("【发送请求】SN:%s,变量:%s,帧:%s,长度:%d字节%n", - deviceCache.getSnCode(), var.getVariableName(), requestHex, request.length); - - // 3. 发送请求(确保输出流正确) - try { - out.write(request); - out.flush(); - System.out.println("【发送成功】请求已提交到串口"); - } catch (IOException e) { - throw new Exception("发送请求失败:" + e.getMessage(), e); - } - - // 4. 接收响应(增加重试机制,最多3次) - byte[] response = null; - int retryCount = 0; - while (retryCount < 3 && response == null) { - try { - response = receiveResponse(10000); // 每次超时10秒 - if (response == null) { - retryCount++; - System.out.printf("【响应超时】第%d次重试...%n", retryCount); - // 重试前短暂延迟,避免设备繁忙 - Thread.sleep(500); + private void dispatchToProcess(byte[] data) { + // 判断数据类型,提交对应的处理任务 + if (isHeartbeatData(data)) { + // 提交心跳处理任务 + processThreadPool.submit(() -> { + try { + handleHeartbeat(data); + } catch (Exception e) { + System.err.printf("【心跳处理异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); } - } catch (IOException e) { - throw new Exception("接收响应失败:" + e.getMessage(), e); - } - } - if (response == null) { - throw new Exception("多次重试后仍无响应,可能设备离线或参数错误"); - } - - // 5. 打印响应原始数据(关键调试信息) - String responseHex = bytesToHex(response); - System.out.printf("【收到响应】长度:%d字节,帧:%s%n", response.length, responseHex); - - // 6. 验证响应 - if (!validateResponse(response, var)) { - throw new Exception("响应验证失败,原始帧:" + responseHex); - } - - // 7. 解析数据(后续步骤不变) - Object value = parseValue(response, var); - System.out.printf("【解析成功】SN:%s,变量:%s,值:%s %s%n", - deviceCache.getSnCode(), var.getVariableName(), value, var.getUnit()); - - // 8. 更新缓存 - deviceCache.getVariableValues().put(var.getVariableName(), value); - } - - // 辅助方法:判断设备是否在线(如通过ping或前次通信时间) - private boolean isDeviceOnline() { - // 实现逻辑:如检查最近一次成功通信时间是否在有效期内 - return true; - } - - /** - * 从数据库查询变量列表 - */ - private List queryVariables(String snCode) { - ArrayList modbusVariables = new ArrayList<>(); - OpsSbLiebiaoDto opsSbLiebiaoDto = sbLiebiaoService.getLiebiaoBianliangList(snCode); - if (!opsSbLiebiaoDto.getSbMbBianliangVos().isEmpty()) { - for (OpsSbMbBianliangVo v : opsSbLiebiaoDto.getSbMbBianliangVos()) { - ModbusVariable modbusVariable = new ModbusVariable(); - modbusVariable.setDataType(v.getShujvGeshi()); - modbusVariable.setVariableName(v.getBlName()); - modbusVariable.setUnit(v.getBlDanwei()); - modbusVariable.setSnCode(snCode); - modbusVariable.setSlaveId(opsSbLiebiaoDto.getSlaveId().intValue()); - String pureHex = v.getJicunqiGnm().substring(2); - modbusVariable.setFuncCode(Integer.parseInt(pureHex,16)); - modbusVariable.setStartRegAddr(Integer.parseInt(v.getJicunqiAdd())); - switch (v.getShujvGeshi()) { - case "S16": modbusVariable.setRegQuantity(1); break; - case "U16": modbusVariable.setRegQuantity(1); break; - case "S32": modbusVariable.setRegQuantity(2); break; - case "U32": modbusVariable.setRegQuantity(2); break; + }); + } else { + // 提交Modbus响应处理任务 + processThreadPool.submit(() -> { + try { + handleModbusResponse(data); + } catch (Exception e) { + System.err.printf("【Modbus处理异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); } - modbusVariables.add(modbusVariable); - } + }); } - return modbusVariables; } /** - * 停止所有资源 + * 资源清理:关闭处理线程池,标记状态 */ - public void stop() { + private void cleanup() { + isReadThreadStarted.set(false); isRunning.set(false); - - // 关闭Modbus调度器 - if (modbusScheduler != null) { - modbusScheduler.shutdown(); - } - - // 关闭Socket和流 + // 优雅关闭处理线程池(等待已提交任务完成,不再接收新任务) + processThreadPool.shutdown(); try { - if (in != null) in.close(); - if (out != null) out.close(); - if (clientSocket != null && !clientSocket.isClosed()) { - clientSocket.close(); + if (!processThreadPool.awaitTermination(5, TimeUnit.SECONDS)) { + processThreadPool.shutdownNow(); // 超时强制关闭 } - } catch (Exception e) { - System.err.printf("【资源关闭异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); + } catch (InterruptedException e) { + processThreadPool.shutdownNow(); + } + // 唤醒等待线程 + synchronized (DeviceHandler.this) { + DeviceHandler.this.notify(); } - - // 从全局缓存移除 - globalCache.remove(deviceAddr); - System.out.printf("【设备下线】地址:%s%n", deviceAddr); } - // ------------------------------ 工具方法 ------------------------------ + + /** + * 判断是否为心跳包(20字节纯数字字符串) + */ private boolean isHeartbeatData(byte[] data) { if (data.length != HEARTBEAT_LEN) return false; + // 校验是否为纯数字(ASCII码范围:'0'-'9') for (byte b : data) { if (b < '0' || b > '9') return false; } return true; } + /** + * 处理心跳数据:更新设备缓存(SN+最后心跳时间) + */ + private void handleHeartbeat(byte[] heartbeatData) { + try { + String snCode = new String(heartbeatData, "UTF-8").trim(); + deviceCache.setSnCode(snCode); + deviceCache.setLastHeartbeatTime(System.currentTimeMillis()); + System.out.printf("【心跳处理成功】设备:%s,SN:%s,最后心跳时间:%d%n", + deviceAddr, snCode, deviceCache.getLastHeartbeatTime()); + } catch (Exception e) { + System.err.printf("【心跳解析失败】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); + } + } + + /** + * 处理Modbus响应:提交到全局线程池异步解析(避免阻塞读流线程) + */ + private void handleModbusResponse(byte[] responseData) { +// // 响应处理异步化:读流线程只负责转发,不阻塞 +// ThreadPoolUtil.RESPONSE_HANDLING.submit(() -> { +// if (!isRunning.get()) return; +// +// // 1. 先校验响应是否合法(最小长度+CRC初步校验) +// if (!isValidModbusResponse(responseData)) { +// System.err.printf("【无效Modbus响应】设备:%s,内容:%s%n", deviceAddr, bytesToHex(responseData)); +// return; +// } +// +// // 2. 获取设备SN(未获取则忽略响应) +// String snCode = deviceCache.getSnCode(); +// if (snCode == null || snCode.isEmpty()) { +// System.err.printf("【Modbus响应忽略】设备:%s,未获取SN码%n", deviceAddr); +// return; +// } +// +// // 3. 查询设备变量,匹配响应对应的变量 +// List variables = queryModbusVariables(snCode); +// if (variables.isEmpty()) { +// System.err.printf("【Modbus无变量】设备:%s,SN:%s%n", deviceAddr, snCode); +// return; +// } +// +// // 3. 匹配响应对应的变量(获取起始寄存器地址等信息) +// ModbusVariable matchedVar = matchResponseToVariable(responseData, variables); +// if (matchedVar == null) { +// System.err.printf("【无匹配变量】响应:%s%n", bytesToHex(responseData)); +// return; +// } +// +// // 5. 从请求队列中精准匹配请求(加锁+多条件匹配) +// ModbusRequestContext matchedRequest; +// synchronized (queueLock) { // 保证并发安全 +// matchedRequest = findMatchedRequest(responseData, matchedVar); +// } +// if (matchedRequest == null) { +// System.err.printf("【无匹配请求】响应:%s%n", bytesToHex(responseData)); +// return; +// } +// +// // 6. 解析并更新数据 +// try { +// Object parsedValue = parseModbusValue(responseData, matchedVar); +// deviceCache.getVariableValues().put(matchedRequest.getVariable(), parsedValue); +// System.out.printf("【解析成功】SN:%s,变量:%s,值:%s %s(请求ID:%s)%n", +// snCode, matchedRequest.getVariable(), parsedValue, matchedRequest.getUnit(), matchedRequest.getRequestId()); +// } catch (Exception e) { +// System.err.printf("【解析失败】变量:%s,原因:%s%n", matchedRequest.getVariable(), e.getMessage()); +// } +// }); + + // 同步处理:直接在当前线程执行,不再异步提交 + if (!isRunning.get()) return; + + // 1. 先校验响应是否合法(最小长度+CRC初步校验) + if (!isValidModbusResponse(responseData)) { + System.err.printf("【无效Modbus响应】设备:%s,内容:%s%n", deviceAddr, bytesToHex(responseData)); + return; + } + + // 2. 获取设备SN(未获取则忽略响应) + String snCode = deviceCache.getSnCode(); + if (snCode == null || snCode.isEmpty()) { + System.err.printf("【Modbus响应忽略】设备:%s,未获取SN码%n", deviceAddr); + return; + } + + // 3. 查询设备变量,匹配响应对应的变量 + List variables = queryModbusVariables(snCode); + if (variables.isEmpty()) { + System.err.printf("【Modbus无变量】设备:%s,SN:%s%n", deviceAddr, snCode); + return; + } + + // 4. 匹配响应对应的变量(获取起始寄存器地址等信息) + ModbusVariable matchedVar = matchResponseToVariable(responseData, variables); + if (matchedVar == null) { + System.err.printf("【无匹配变量】响应:%s%n", bytesToHex(responseData)); + return; + } + + // 5. 从请求队列中精准匹配请求(加锁+多条件匹配) + ModbusRequestContext matchedRequest; + synchronized (queueLock) { // 保证并发安全(同步处理仍需锁保护队列操作) + matchedRequest = findMatchedRequest(responseData, matchedVar); + } + if (matchedRequest == null) { + System.err.printf("【无匹配请求】响应:%s%n", bytesToHex(responseData)); + return; + } + + // 6. 解析并更新数据 + try { + Object parsedValue = parseModbusValue(responseData, matchedVar); + deviceCache.getVariableValues().put(matchedRequest.getVariable(), parsedValue); + OpsSbData opsSbData = new OpsSbData(); + opsSbData.setSn(snCode); + opsSbData.setUnit(matchedRequest.getUnit()); + opsSbData.setVariableName(matchedRequest.getVariable()); + opsSbData.setCount((Double) parsedValue); + opsSbData.setFunctionCode(String.valueOf(matchedRequest.getStartRegister())); + // 核心修改:将保存操作提交到线程池异步执行 + ThreadPoolUtil.DATA_SAVE_POOL.execute(() -> { + try { + sbDataService.save(opsSbData); + System.out.printf("【数据存储成功】SN:%s,变量:%s%n", snCode, matchedRequest.getVariable()); + } catch (Exception e) { + System.err.printf("【数据存储失败】SN:%s,变量:%s,原因:%s%n", + snCode, matchedRequest.getVariable(), e.getMessage()); + } + }); + System.out.printf("【解析成功】SN:%s,变量:%s,值:%s %s(请求ID:%s)%n", + snCode, matchedRequest.getVariable(), parsedValue, matchedRequest.getUnit(), matchedRequest.getRequestId()); + } catch (Exception e) { + System.err.printf("【解析失败】变量:%s,原因:%s%n", matchedRequest.getVariable(), e.getMessage()); + } + } + + /** + * 从响应帧匹配请求,间接获取起始寄存器地址 + */ + private Integer getStartRegAddrFromResponse(byte[] response) { + // 1. 解析响应帧的基础信息 + int slaveId = response[0] & 0xFF; + int funcCode = response[1] & 0xFF; + int dataLen = response[2] & 0xFF; // 数据长度(字节) + int regQuantity = dataLen / 2; // 寄存器数量(每个寄存器2字节) + + // 2. 遍历请求队列,匹配对应的请求 + ModbusRequestContext matchedRequest = null; + Iterator iterator = pendingRequests.iterator(); + while (iterator.hasNext()) { + ModbusRequestContext req = iterator.next(); + + // 匹配条件:从站地址+功能码+寄存器数量一致 + if (req.getSlaveAddr() == slaveId + && req.getFuncCode() == funcCode + && req.getRegisterCount() == regQuantity) { + + matchedRequest = req; +// iterator.remove(); // 移除已匹配的请求 + break; + } + } + + // 3. 从匹配的请求中提取起始地址 + if (matchedRequest != null) { + log.info("【响应匹配成功】起始地址:{},请求ID:{}", + matchedRequest.getSlaveAddr(), matchedRequest.getRequestId()); + return matchedRequest.getStartRegister(); + } else { + log.error("【无匹配请求】无法获取起始地址,响应:{}", bytesToHex(response)); + return null; + } + } + + + // 修改响应处理中的匹配逻辑 + private ModbusRequestContext findMatchedRequest(byte[] responseData, ModbusVariable matchedVar) { + synchronized (queueLock) { // 加锁保证迭代和修改的原子性 + int slaveAddr = responseData[0] & 0xFF; + int funcCode = responseData[1] & 0xFF; + int dataLen = responseData[2] & 0xFF; + int registerCount = dataLen / 2; + + System.out.printf("【匹配调试】响应目标:从站=%d,功能码=%d,寄存器数=%d,起始地址=%d%n", + slaveAddr, funcCode, registerCount, matchedVar.getStartRegAddr()); + + long now = System.currentTimeMillis(); + ModbusRequestContext matched = null; + + // 迭代队列,寻找匹配的请求(使用迭代器保证并发安全) + Iterator iterator = pendingRequests.iterator(); + while (iterator.hasNext()) { + ModbusRequestContext req = iterator.next(); + + System.out.printf("【队列请求】ID=%s,变量=%s,从站=%d,功能码=%d,寄存器数=%d,起始地址=%d%n", + req.getRequestId(), req.getVariable(), req.getSlaveAddr(), req.getFuncCode(), + req.getRegisterCount(), req.getStartRegister()); + + // 过滤超时请求(如超过5秒未响应,视为失效) + if (now - req.getSendTime() > 10000) { + iterator.remove(); // 移除超时请求 + continue; + } + + // 精准匹配条件:从站地址+功能码+寄存器数量+起始寄存器地址 + if (req.getSlaveAddr() == slaveAddr + && req.getFuncCode() == funcCode + && req.getRegisterCount() == registerCount + && req.getStartRegister() == matchedVar.getStartRegAddr()) { // 关键:匹配起始地址 + matched = req; + iterator.remove(); // 找到后立即移除,避免被其他响应匹配 + break; + } + } + return matched; + } + + } + // ------------------------------ Modbus定时发送任务(提交到全局线程池) ------------------------------ + /** + * 启动Modbus定时发送任务(全局线程池调度) + */ +// private void startModbusSendTask() { +// modbusSendFuture = ThreadPoolUtil.MODBUS_SCHEDULER.scheduleAtFixedRate(() -> { +// if (!isRunning.get() || !clientSocket.isConnected() || clientSocket.isClosed()) { +// stopAll(); +// return; +// } +// +// // 未获取SN码:等待心跳 +// String snCode = deviceCache.getSnCode(); +// if (snCode == null || snCode.isEmpty()) { +// System.out.printf("【Modbus发送等待】设备:%s,未获取SN码%n", deviceAddr); +// return; +// } +// +// // 查询变量并发送请求 +// List variables = queryModbusVariables(snCode); +// if (variables.isEmpty()) { +// System.out.printf("【Modbus无变量】设备:%s,SN:%s%n", deviceAddr, snCode); +// return; +// } +// +// for (ModbusVariable var : variables) { +// try { +// sendModbusRequest(var); +// } catch (Exception e) { +// System.err.printf("【Modbus发送失败】SN:%s,变量:%s,原因:%s%n", +// snCode, var.getVariableName(), e.getMessage()); +// } +// } +// +// }, 0, MODBUS_REQUEST_CYCLE, TimeUnit.SECONDS); +// } + + + // 新增:保存发送线程引用,用于停止时中断线程 + private Thread modbusSendThread; + + /** + * 启动Modbus发送任务(单线程版:使用独立线程循环执行,固定周期发送) + */ + private void startModbusSendTask() { + // 创建单线程,负责定时发送请求 + modbusSendThread = new Thread(() -> { + // 线程命名,方便调试 + Thread.currentThread().setName("modbus-send-" + deviceAddr.replace("/", "")); + + while (true) { + // 核心退出条件:若已停止运行或连接关闭,退出循环 + if (!isRunning.get() || (clientSocket != null && (clientSocket.isClosed() || !clientSocket.isConnected()))) { + System.out.printf("【Modbus发送线程退出】设备:%s%n", deviceAddr); + return; + } + + try { + // 1. 未获取SN码:等待心跳(本次循环不发送) + String snCode = deviceCache.getSnCode(); + if (snCode == null || snCode.isEmpty()) { + System.out.printf("【Modbus发送等待】设备:%s,未获取SN码%n", deviceAddr); + // 等待一个周期后再重试 + Thread.sleep(MODBUS_REQUEST_CYCLE * 1000L); + continue; + } + + // 2. 查询设备变量配置 + List variables = queryModbusVariables(snCode); + if (variables.isEmpty()) { + System.out.printf("【Modbus无变量】设备:%s,SN:%s%n", deviceAddr, snCode); + // 等待一个周期后再重试 + Thread.sleep(MODBUS_REQUEST_CYCLE * 1000L); + continue; + } + + // 3. 逐个发送变量请求 + for (ModbusVariable var : variables) { + // 发送前再次检查状态(避免发送过程中连接关闭) + if (!isRunning.get() || clientSocket.isClosed() || !clientSocket.isConnected()) { + stopAll(); + return; + } + sendModbusRequest(var); + // 变量间发送间隔(避免设备过载) + Thread.sleep(500); // 500ms间隔 + } + + // 4. 完成一轮发送后,休眠固定周期(MODBUS_REQUEST_CYCLE秒) + Thread.sleep(MODBUS_REQUEST_CYCLE * 1000L); + + } catch (InterruptedException e) { + // 线程被中断(通常是stopAll调用),退出循环 + System.out.printf("【Modbus发送线程被中断】设备:%s%n", deviceAddr); + Thread.currentThread().interrupt(); // 保留中断状态 + return; + } catch (Exception e) { + // 其他异常:打印日志,继续下一轮循环 + System.err.printf("【Modbus发送线程异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); + try { + // 异常后等待一个周期再重试 + Thread.sleep(MODBUS_REQUEST_CYCLE * 1000L); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + } + }); + + // 启动单线程 + modbusSendThread.start(); + } + + + + // 定时任务:每30秒清理超时请求(超时时间可自定义,如2秒) + @Scheduled(fixedRate = 30000) + public void cleanTimeoutRequests() { + long now = System.currentTimeMillis(); + Iterator iterator = pendingRequests.iterator(); + while (iterator.hasNext()) { + ModbusRequestContext req = iterator.next(); + if (now - req.getSendTime() > 2000) { // 超过2秒未响应视为超时 + log.warn("【请求超时】SN:{},变量:{},未收到响应", req.getSn(), req.getVariable()); + iterator.remove(); + } + } + } + + /** + * 发送Modbus请求(同步发送,发送后立即返回,响应由读流线程处理) + */ + private void sendModbusRequest(ModbusVariable var) throws Exception { + byte[] requestFrame = generateModbusFrame(var); + // 解析请求帧的关键信息(从请求帧中提取) + int slaveAddr = requestFrame[0] & 0xFF; + int funcCode = requestFrame[1] & 0xFF; + // 解析起始寄存器地址(请求帧第2-3字节) + int startRegister = ((requestFrame[2] & 0xFF) << 8) | (requestFrame[3] & 0xFF); + // 解析读取寄存器数量(请求帧第4-5字节) + int registerCount = ((requestFrame[4] & 0xFF) << 8) | (requestFrame[5] & 0xFF); + + // 创建请求上下文并加入队列 + ModbusRequestContext context = new ModbusRequestContext(); + context.setSn(var.getSnCode()); + context.setVariable(var.getVariableName()); + context.setUnit(var.getUnit()); + context.setId(var.getId()); + context.setSlaveAddr(slaveAddr); + context.setFuncCode(funcCode); + context.setStartRegister(startRegister); + context.setRegisterCount(registerCount); + context.setSendTime(System.currentTimeMillis()); + pendingRequests.add(context); + + + String requestHex = bytesToHex(requestFrame); + // 发送请求(确保输出流有效) + if (out == null) { + throw new Exception("输出流已关闭"); + } + out.write(requestFrame); + out.flush(); + System.out.printf("【Modbus发送成功】SN:%s,变量:%s,请求:%s,长度:%d字节%n", + deviceCache.getSnCode(), var.getVariableName(), requestHex, requestFrame.length); + Thread.sleep(400); // 延迟100ms + } + + // ------------------------------ 原有工具方法(适配优化逻辑) ------------------------------ + /** + * 从数据库查询Modbus变量 + */ + private List queryModbusVariables(String snCode) { + List variables = new ArrayList<>(); + try { + Object object = RedisUtils.getCacheObject("shebei:"+snCode); + if (object == null) { + OpsSbLiebiaoDto deviceDto = sbLiebiaoService.getLiebiaoBianliangList(snCode); + + if (deviceDto == null || deviceDto.getSbMbBianliangVos().isEmpty()) { + return variables; + } + for (OpsSbMbBianliangVo config : deviceDto.getSbMbBianliangVos()) { + ModbusVariable var = new ModbusVariable(); + var.setVariableName(config.getBlName()); + var.setDataType(config.getShujvGeshi()); + var.setUnit(config.getBlDanwei()); + var.setSnCode(snCode); + var.setSlaveId(deviceDto.getSlaveId().intValue()); + // 解析功能码(去除"0x"前缀) + String funcHex = config.getJicunqiGnm().replace("0x", "").trim(); + var.setFuncCode(Integer.parseInt(funcHex, 16)); + // 解析起始寄存器地址 + var.setStartRegAddr(Integer.parseInt(config.getJicunqiAdd().trim())); + // 配置寄存器数量 + switch (config.getShujvGeshi().toUpperCase()) { + case "S16": + case "U16": + var.setRegQuantity(1); + break; + case "S32": + case "U32": + var.setRegQuantity(2); + break; + default: + System.err.printf("【不支持类型】SN:%s,类型:%s%n", snCode, config.getShujvGeshi()); + continue; + } + variables.add(var); + RedisUtils.setCacheObject("shebei:"+snCode, variables); + } + }else { + return convertObjectToList(object, ModbusVariable.class); + } + + } catch (Exception e) { + System.err.printf("【变量查询异常】SN:%s,原因:%s%n", snCode, e.getMessage()); + } + return variables; + } + + /** + * 生成Modbus RTU请求帧 + */ private byte[] generateModbusFrame(ModbusVariable var) { ByteBuffer buffer = ByteBuffer.allocate(6).order(ByteOrder.BIG_ENDIAN); - buffer.put(var.getSlaveId().byteValue()) - .put(var.getFuncCode().byteValue()) - .putShort(var.getStartRegAddr().shortValue()) - .putShort(var.getRegQuantity().shortValue()); + buffer.put((byte) var.getSlaveId()) + .put((byte) var.getFuncCode()) + .putShort((short) var.getStartRegAddr()) + .putShort((short) var.getRegQuantity()); byte[] body = buffer.array(); byte[] crc = calculateCrc16(body); - - byte[] frame = new byte[body.length + crc.length]; - System.arraycopy(body, 0, frame, 0, body.length); - System.arraycopy(crc, 0, frame, body.length, crc.length); - return frame; + byte[] fullFrame = new byte[body.length + crc.length]; + System.arraycopy(body, 0, fullFrame, 0, body.length); + System.arraycopy(crc, 0, fullFrame, body.length, crc.length); + return fullFrame; } - private byte[] receiveResponse(int timeoutMs) throws Exception { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timeoutMs) { - if (in.available() > 0) { - byte[] buffer = new byte[1024]; - int len = in.read(buffer); - return Arrays.copyOf(buffer, len); + /** + * 初步校验Modbus响应(最小长度+CRC) + */ + private boolean isValidModbusResponse(byte[] data) { + // 最小长度:从站(1)+功能码(1)+数据长度(1)+CRC(2) = 5字节 + if (data.length < 5) return false; + // CRC初步校验(避免无效数据) + byte[] body = Arrays.copyOf(data, data.length - 2); + byte[] receivedCrc = Arrays.copyOfRange(data, data.length - 2, data.length); + return Arrays.equals(receivedCrc, calculateCrc16(body)); + } + + /** + * 匹配响应对应的Modbus变量(按从站地址+功能码+数据长度) + */ + private ModbusVariable matchResponseToVariable(byte[] response, List variables) { + int slaveId = response[0] & 0xFF; + int funcCode = response[1] & 0xFF; + int dataLen = response[2] & 0xFF; + Integer startRegAddr = getStartRegAddrFromResponse(response); + for (ModbusVariable var : variables) { + // 匹配条件:从站地址一致+功能码一致+数据长度=寄存器数量×2 + if (var.getSlaveId() == slaveId + && var.getFuncCode() == funcCode + && dataLen == var.getRegQuantity() * 2) { + if (startRegAddr.equals(var.getStartRegAddr())) { + return var; + } } - Thread.sleep(10); } return null; } - private boolean validateResponse(byte[] response, ModbusVariable var) { - if (response.length < 5) return false; - if (response[0] != var.getSlaveId() || response[1] != var.getFuncCode()) return false; - if (response[2] != var.getRegQuantity() * 2) return false; - + /** + * 验证Modbus响应合法性(详细校验) + */ + private boolean validateModbusResponse(byte[] response, ModbusVariable var) { + // 从站地址校验 + if (response[0] != var.getSlaveId()) return false; + // 功能码校验(排除错误码:0x80+功能码) + if (response[1] == (var.getFuncCode() | 0x80)) { + int errorCode = response[2] & 0xFF; + System.err.printf("【设备错误】SN:%s,变量:%s,错误码:%d%n", + var.getSnCode(), var.getVariableName(), errorCode); + return false; + } + if (response[1] != var.getFuncCode()) return false; + // 数据长度校验 + if ((response[2] & 0xFF) != var.getRegQuantity() * 2) return false; + // CRC校验 byte[] body = Arrays.copyOf(response, response.length - 2); byte[] receivedCrc = Arrays.copyOfRange(response, response.length - 2, response.length); return Arrays.equals(receivedCrc, calculateCrc16(body)); } -// private Object parseValue(byte[] response, ModbusVariable var) { -// ByteBuffer buffer = ByteBuffer.wrap(response, 3, response[2]).order(ByteOrder.BIG_ENDIAN); -// double rawValue; -// -// switch (var.getDataType().toUpperCase()) { -// case "S16": rawValue = buffer.getShort(); break; -// case "U16": rawValue = buffer.getShort() & 0xFFFF; break; -// case "S32": rawValue = buffer.getInt(); break; -// case "U32": rawValue = buffer.getLong() & 0xFFFFFFFFL; break; -// case "FLOAT": rawValue = buffer.getFloat(); break; -// default: throw new IllegalArgumentException("不支持的数据类型:" + var.getDataType()); -// } -// -// return rawValue * var.getMultiplier(); -// } - - private Object parseValue(byte[] response, ModbusVariable var) { - // 1. 初始化缓冲区:从响应报文的"数据部分"开始读取(跳过前3字节:从站地址、功能码、字节计数) - // 字节序设置为BIG_ENDIAN(大端序),符合"高字节在前、高字在前"的规则 + /** + * 解析Modbus响应数据 + */ + private Object parseModbusValue(byte[] response, ModbusVariable var) { ByteBuffer buffer = ByteBuffer.wrap(response, 3, response[2]).order(ByteOrder.BIG_ENDIAN); - double rawValue; // 用double统一存储原始值,兼容整数和后续倍率计算 + double rawValue; + String type = var.getDataType().toUpperCase(); - switch (var.getDataType().toUpperCase()) { + switch (type) { case "U16": - // 读取2字节(高字节在前),转换为无符号整数 - // getShort()返回有符号short,&0xFFFF转换为无符号(0~65535) rawValue = buffer.getShort() & 0xFFFF; break; case "S16": - // 读取2字节(高字节在前),直接按有符号short解析(-32768~32767) rawValue = buffer.getShort(); break; case "U32": - // 读取4字节(高字在前,每个字内高字节在前),转换为无符号整数 - // getInt()返回有符号int,转换为long后&0xFFFFFFFFL得到无符号值(0~4294967295) rawValue = (long) buffer.getInt() & 0xFFFFFFFFL; break; case "S32": - // 读取4字节(高字在前,每个字内高字节在前),直接按有符号int解析 rawValue = buffer.getInt(); break; - case "FLOAT": - // 单精度浮点型(32位),大端序解析(符合IEEE 754标准) - rawValue = buffer.getFloat(); - break; default: - throw new IllegalArgumentException("不支持的数据类型:" + var.getDataType()); + throw new IllegalArgumentException("不支持的数据类型:" + type); } - // 原始值 × 倍率 = 实际物理量(如温度、压力等) - return rawValue * var.getMultiplier(); + // 倍率计算(默认1.0) + double multiplier = var.getMultiplier() == null ? 1.0 : var.getMultiplier(); + return rawValue * multiplier; } + /** + * 计算CRC16校验 + */ private byte[] calculateCrc16(byte[] data) { int crc = 0xFFFF; for (byte b : data) { @@ -433,6 +861,9 @@ public class DeviceHandler { return new byte[]{(byte) (crc & 0xFF), (byte) ((crc >> 8) & 0xFF)}; } + /** + * 字节数组转16进制字符串 + */ private String bytesToHex(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) { @@ -440,4 +871,84 @@ public class DeviceHandler { } return sb.toString().trim(); } + + // ------------------------------ 全量资源清理 ------------------------------ + /** + * 停止所有任务+关闭资源 + */ + private void stopAll() { + // 1. 标记设备离线 + isRunning.set(false); + + // 2. 停止定时任务和读流任务 + if (modbusSendFuture != null && !modbusSendFuture.isDone()) { + modbusSendFuture.cancel(true); + System.out.printf("【Modbus发送任务停止】设备:%s%n", deviceAddr); + } + if (readThreadFuture != null && !readThreadFuture.isDone()) { + readThreadFuture.cancel(true); + System.out.printf("【读流任务停止】设备:%s%n", deviceAddr); + } + + // 3. 关闭IO流和Socket + try { + if (in != null) { + in.close(); + in = null; + } + if (out != null) { + out.close(); + out = null; + } + if (clientSocket != null && !clientSocket.isClosed()) { + clientSocket.close(); + System.out.printf("【Socket关闭】设备:%s%n", deviceAddr); + } + } catch (IOException e) { + System.err.printf("【资源关闭异常】设备:%s,原因:%s%n", deviceAddr, e.getMessage()); + } + + // 4. 从全局缓存移除 + if (globalCache != null) { + globalCache.remove(deviceAddr); + } + System.out.printf("【设备下线完成】地址:%s,SN:%s%n", deviceAddr, deviceCache.getSnCode()); + } + + /** + * 将Object转换为指定元素类型的List + * @param obj 待转换对象 + * @param elementType 元素的Class(如String.class) + * @param 元素类型 + * @return 转换后的List + */ + public static List convertObjectToList(Object obj, Class elementType) { + // 1. 先转换为基础List + List rawList = convertObjectToList(obj); + + // 2. 校验元素类型(可选,若需严格保证元素类型) + for (Object element : rawList) { + if (element != null && !elementType.isInstance(element)) { + throw new ClassCastException("List中存在非" + elementType.getName() + "类型的元素:" + element.getClass().getName()); + } + } + + // 3. 转换为带泛型的List(编译器会警告,可忽略或抑制) + @SuppressWarnings("unchecked") + List typedList = (List) rawList; + return typedList; + } + + public static List convertObjectToList(Object obj) { + // 1. 检查对象是否为null + if (obj == null) { + throw new IllegalArgumentException("待转换对象为null"); + } + // 2. 检查是否为List类型 + if (!(obj instanceof List)) { + throw new ClassCastException("对象不是List类型,实际类型:" + obj.getClass().getName()); + } + // 3. 强制转换 + return (List) obj; + } } diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/server/ModbusTcpServer.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/server/ModbusTcpServer.java new file mode 100644 index 0000000..9940c59 --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/server/ModbusTcpServer.java @@ -0,0 +1,76 @@ +package org.dromara.tcpfuwu.server; + +import org.dromara.shebei.service.IOpsSbDataService; +import org.dromara.shebei.service.IOpsSbLiebiaoService; +import org.dromara.tcpfuwu.domain.DeviceCache; +import org.dromara.tcpfuwu.handler.DeviceHandler; +import org.dromara.tcpfuwu.util.ThreadPoolUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Modbus TCP 主服务:监听端口、接收设备连接、分发到DeviceHandler + * 注:若用SpringBoot,需加@Component并通过@Autowired注入IOpsSbLiebiaoService + */ +@Component +public class ModbusTcpServer { + // 全局设备缓存:key=设备地址(IP:PORT),value=设备缓存信息(线程安全) + private final Map globalDeviceCache = new ConcurrentHashMap<>(); + + // 依赖注入:查询设备变量的Spring服务 + @Autowired + private IOpsSbLiebiaoService sbLiebiaoService; + @Autowired + private IOpsSbDataService sbDataService; + + // 服务端口(可配置在application.yml中) + private static final int LISTEN_PORT = 12345; + + /** + * 启动TCP服务 + */ + public void start() { + try (ServerSocket serverSocket = new ServerSocket(LISTEN_PORT)) { + System.out.printf("【Modbus TCP服务启动成功】监听端口:%d%n", LISTEN_PORT); + + // 循环接收设备连接(主线程阻塞) + while (!Thread.currentThread().isInterrupted()) { + Socket clientSocket = serverSocket.accept(); // 阻塞等待连接 + String deviceAddr = clientSocket.getInetAddress() + ":" + clientSocket.getPort(); + System.out.printf("%n【设备上线】地址:%s,Socket:%s%n", deviceAddr, clientSocket); + + // 初始化设备缓存 + DeviceCache deviceCache = new DeviceCache(); + deviceCache.setDeviceAddr(deviceAddr); + deviceCache.setLastHeartbeatTime(System.currentTimeMillis()); + globalDeviceCache.put(deviceAddr, deviceCache); + + // 线程池提交设备处理任务(替代new Thread()) + ThreadPoolUtil.DEVICE_CONN_POOL.submit(() -> { + // 创建DeviceHandler处理单个设备 + DeviceHandler deviceHandler = new DeviceHandler( + clientSocket, + deviceAddr, + deviceCache, + globalDeviceCache, + sbLiebiaoService, // 传递Spring服务 + sbDataService + ); + deviceHandler.handle(); // 启动设备通信逻辑 + }); + } + } catch (Exception e) { + System.err.printf("【Modbus TCP服务异常】启动失败:%s%n", e.getMessage()); + e.printStackTrace(); + } finally { + // 服务停止时关闭所有线程池 + ThreadPoolUtil.shutdownAll(); + System.out.println("【Modbus TCP服务停止】已关闭所有线程池"); + } + } +} diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/starter/TcpServersStarter.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/starter/TcpServersStarter.java index a74a028..db17973 100644 --- a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/starter/TcpServersStarter.java +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/starter/TcpServersStarter.java @@ -2,12 +2,11 @@ package org.dromara.tcpfuwu.starter; import lombok.extern.slf4j.Slf4j; -import org.dromara.tcpfuwu.server.UnifiedTcpServer; +//import org.dromara.tcpfuwu.server.UnifiedTcpServer; +import org.dromara.tcpfuwu.server.ModbusTcpServer; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; -import javax.annotation.PreDestroy; - /** * Spring启动时自动初始化TCP服务器,关闭时释放资源 */ @@ -15,11 +14,18 @@ import javax.annotation.PreDestroy; @Slf4j public class TcpServersStarter implements CommandLineRunner { - private final UnifiedTcpServer unifiedTcpServer; + // private final UnifiedTcpServer unifiedTcpServer; + private final ModbusTcpServer modbusTcpServer; + // 构造注入两个TCP服务器Bean - public TcpServersStarter( UnifiedTcpServer unifiedTcpServer) { - this.unifiedTcpServer = unifiedTcpServer; +// public TcpServersStarter( UnifiedTcpServer unifiedTcpServer) { +// this.unifiedTcpServer = unifiedTcpServer; +// } + + + public TcpServersStarter(ModbusTcpServer modbusTcpServer) { + this.modbusTcpServer = modbusTcpServer; } /** @@ -29,17 +35,19 @@ public class TcpServersStarter implements CommandLineRunner { public void run(String... args) throws Exception { log.info("【TCP服务器启动器】开始初始化Modbus和逆变器心跳服务器..."); // 启动两个服务器(独立线程,不阻塞Spring主线程) - unifiedTcpServer.start(); +// unifiedTcpServer.start(); + modbusTcpServer.start(); log.info("【TCP服务器启动器】所有TCP服务器初始化完成"); } - /** - * Spring容器销毁前执行(释放资源) - */ - @PreDestroy - public void stopTcpServers() { - log.info("【TCP服务器启动器】开始关闭所有TCP服务器..."); - unifiedTcpServer.stop(); - log.info("【TCP服务器启动器】所有TCP服务器已关闭"); - } +// /** +// * Spring容器销毁前执行(释放资源) +// */ +// @PreDestroy +// public void stopTcpServers() { +// log.info("【TCP服务器启动器】开始关闭所有TCP服务器..."); +//// unifiedTcpServer.stop(); +//// modbusTcpServer.stop(); +// log.info("【TCP服务器启动器】所有TCP服务器已关闭"); +// } } diff --git a/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/util/ThreadPoolUtil.java b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/util/ThreadPoolUtil.java new file mode 100644 index 0000000..76b952b --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/java/org/dromara/tcpfuwu/util/ThreadPoolUtil.java @@ -0,0 +1,64 @@ +package org.dromara.tcpfuwu.util; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 线程池工具类:统一管理设备连接、Modbus发送线程 + */ +public class ThreadPoolUtil { + // 设备连接线程池:处理新设备连接(核心线程2,最大5) + public static final ExecutorService DEVICE_CONN_POOL = new ThreadPoolExecutor( + 5, 12, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(100), + r -> new Thread(r, "device-conn-" + System.currentTimeMillis()), + new ThreadPoolExecutor.AbortPolicy() + ); + + + // 设备连接线程池:处理新设备连接(核心线程2,最大5) + public static final ExecutorService RESPONSE_HANDLING = new ThreadPoolExecutor( + 5, 12, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(100), + r -> new Thread(r, "response-handling" + System.currentTimeMillis()), + new ThreadPoolExecutor.AbortPolicy() + ); + + // Modbus定时发送线程池:每个设备一个定时任务(核心线程5,最大10) + public static final ScheduledExecutorService MODBUS_SCHEDULER = new ScheduledThreadPoolExecutor( + 5, + r -> new Thread(r, "modbus-schedule-" + System.currentTimeMillis()), + new ThreadPoolExecutor.AbortPolicy() + ); + + // 数据存储专用线程池(处理数据库保存操作) + public static final ExecutorService DATA_SAVE_POOL = new ThreadPoolExecutor( + 5, 10, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1000), + new ThreadFactory() { + private final AtomicInteger counter = new AtomicInteger(1); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "data-save-" + counter.getAndIncrement()); + } + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时让提交线程执行(避免任务丢失) + ); + + // 关闭线程池(JVM退出时调用) + public static void shutdownAll() { + DEVICE_CONN_POOL.shutdown(); + MODBUS_SCHEDULER.shutdown(); + try { + if (!DEVICE_CONN_POOL.awaitTermination(3, TimeUnit.SECONDS)) { + DEVICE_CONN_POOL.shutdownNow(); + } + if (!MODBUS_SCHEDULER.awaitTermination(3, TimeUnit.SECONDS)) { + MODBUS_SCHEDULER.shutdownNow(); + } + } catch (InterruptedException e) { + DEVICE_CONN_POOL.shutdownNow(); + MODBUS_SCHEDULER.shutdownNow(); + } + } +} diff --git a/ruoyi-modules/xny-ops/src/main/resources/mapper/shebei/OpsSbDataMapper.xml b/ruoyi-modules/xny-ops/src/main/resources/mapper/shebei/OpsSbDataMapper.xml new file mode 100644 index 0000000..2d2f52c --- /dev/null +++ b/ruoyi-modules/xny-ops/src/main/resources/mapper/shebei/OpsSbDataMapper.xml @@ -0,0 +1,7 @@ + + + + +