diff --git a/xinnengyuan/ruoyi-admin/src/main/resources/application-dev.yml b/xinnengyuan/ruoyi-admin/src/main/resources/application-dev.yml index be349c0a..953e8f06 100644 --- a/xinnengyuan/ruoyi-admin/src/main/resources/application-dev.yml +++ b/xinnengyuan/ruoyi-admin/src/main/resources/application-dev.yml @@ -368,3 +368,14 @@ drone: chat: server: port: 19099 +# rabbitmq 配置 +rabbitmq: + exchange-name: dev-normal-exchange + queue-name: dev-normal-queue + routing-key: dev.normal.routing.key + delay-exchange-name: dev-delay-queue + delay-queue-name: dev-delay-exchange + delay-routing-key: dev.delay.routing.key + dead-letter-exchange: dev-dlx-exchange + dead-letter-queue: dev-dlx-queue + dead-letter-routing-key: dev.dlx.routing.key diff --git a/xinnengyuan/ruoyi-admin/src/main/resources/application-local.yml b/xinnengyuan/ruoyi-admin/src/main/resources/application-local.yml index 64b0ccdc..38eebf83 100644 --- a/xinnengyuan/ruoyi-admin/src/main/resources/application-local.yml +++ b/xinnengyuan/ruoyi-admin/src/main/resources/application-local.yml @@ -343,3 +343,14 @@ drone: chat: server: port: 18088 +# rabbitmq 配置 +rabbitmq: + exchange-name: local-normal-exchange + queue-name: local-normal-queue + routing-key: local.normal.routing.key + delay-exchange-name: local-delay-queue + delay-queue-name: local-delay-exchange + delay-routing-key: local.delay.routing.key + dead-letter-exchange: local-dlx-exchange + dead-letter-queue: local-dlx-queue + dead-letter-routing-key: local.dlx.routing.key diff --git a/xinnengyuan/ruoyi-admin/src/main/resources/application-prod.yml b/xinnengyuan/ruoyi-admin/src/main/resources/application-prod.yml index 235b008a..5b6e6e05 100644 --- a/xinnengyuan/ruoyi-admin/src/main/resources/application-prod.yml +++ b/xinnengyuan/ruoyi-admin/src/main/resources/application-prod.yml @@ -358,3 +358,14 @@ drone: chat: server: port: 19099 +# rabbitmq 配置 +rabbitmq: + exchange-name: prod-normal-exchange + queue-name: prod-normal-queue + routing-key: prod.normal.routing.key + delay-exchange-name: prod-delay-queue + delay-queue-name: prod-delay-exchange + delay-routing-key: prod.delay.routing.key + dead-letter-exchange: prod-dlx-exchange + dead-letter-queue: prod-dlx-queue + dead-letter-routing-key: prod.dlx.routing.key diff --git a/xinnengyuan/ruoyi-admin/src/main/resources/application.yml b/xinnengyuan/ruoyi-admin/src/main/resources/application.yml index 84e30e7e..87e8a97c 100644 --- a/xinnengyuan/ruoyi-admin/src/main/resources/application.yml +++ b/xinnengyuan/ruoyi-admin/src/main/resources/application.yml @@ -95,7 +95,13 @@ spring: deserialization: # 允许对象忽略json中不存在的属性 fail_on_unknown_properties: false - + rabbitmq: + host: 192.168.110.2 + port: 5672 + username: admin + password: yuanjiexny + publisher-returns: true + publisher-confirm-type: correlated # Sa-Token配置 sa-token: # token名称 (同时也是cookie名称) diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/pom.xml b/xinnengyuan/ruoyi-modules/ruoyi-system/pom.xml index 4bffab53..c9925bb7 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/pom.xml +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/pom.xml @@ -29,6 +29,10 @@ + + org.springframework.boot + spring-boot-starter-amqp + com.twelvemonkeys.imageio diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/config/RabbitConfig.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/config/RabbitConfig.java new file mode 100644 index 00000000..c5761106 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/config/RabbitConfig.java @@ -0,0 +1,57 @@ +package org.dromara.rabbitmq.config; + +import jakarta.annotation.Resource; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author xbhog + */ +@Configuration +public class RabbitConfig { + + @Resource + private RabbitProperties rabbitProperties; + + /** + * 创建交换机 + * ExchangeBuilder有四种交换机模式 + * Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。 + * Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。 + * Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。 + * Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。 + * durable 交换器是否持久化(false 不持久化,true 持久化) + **/ + @Bean + public TopicExchange exchange() { + return new TopicExchange(rabbitProperties.getExchangeName()); + } + + /** + * 创建队列 + * durable 队列是否持久化 队列调用此方法就是持久化 可查看方法的源码 + * deliveryMode 消息是否持久化(1 不持久化,2 持久化) + **/ + @Bean + public Queue queue() { + return new Queue(rabbitProperties.getQueueName(), false); + } + + /** + * 绑定交换机和队列 + * bing 方法参数可以是队列和交换机 + * to 方法参数必须是交换机 + * with 方法参数是路由Key 这里是以rabbit.开头 + * noargs 就是不要参数的意思 + * 这个方法的意思是把rabbit开头的消息 和 上面的队列 和 上面的交换机绑定 + **/ + @Bean + public Binding binding(Queue queue, TopicExchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(rabbitProperties.getRoutingKey()); + } + +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/config/RabbitProperties.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/config/RabbitProperties.java new file mode 100644 index 00000000..868667df --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/config/RabbitProperties.java @@ -0,0 +1,60 @@ +package org.dromara.rabbitmq.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * @author lilemy + * @date 2025-12-05 16:25 + */ +@Data +@Configuration +@ConfigurationProperties(prefix = "rabbitmq") +public class RabbitProperties { + + /** + * 交换机名称 + */ + private String exchangeName; + + /** + * 队列名称 + */ + private String queueName; + + /** + * 路由键名称 + */ + private String routingKey; + + /** + * 延迟交换机名称 + */ + private String delayExchangeName; + + /** + * 延迟队列名称 + */ + private String delayQueueName; + + /** + * 延迟路由键名称 + */ + private String delayRoutingKey; + + /** + * 死信交换机名称 + */ + private String deadLetterExchange; + + /** + * 死信队列名称 + */ + private String deadLetterQueue; + + /** + * 死信路由键名称 + */ + private String deadLetterRoutingKey; +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/config/RabbitTtlQueueConfig.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/config/RabbitTtlQueueConfig.java new file mode 100644 index 00000000..94abf3cb --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/config/RabbitTtlQueueConfig.java @@ -0,0 +1,75 @@ +package org.dromara.rabbitmq.config; + +import jakarta.annotation.Resource; +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; + + +/** + * RabbitTTL队列 + * + * @author xbhog + */ +@Configuration +public class RabbitTtlQueueConfig { + + @Resource + private RabbitProperties rabbitProperties; + + /** + * 声明延迟队列 + */ + @Bean + public Queue delayQueue() { + return QueueBuilder.durable(rabbitProperties.getDelayQueueName()) + .deadLetterExchange(rabbitProperties.getDeadLetterExchange()) + .deadLetterRoutingKey(rabbitProperties.getDeadLetterRoutingKey()) + .build(); + } + + /** + * 声明延迟交换机 + */ + @Bean + public CustomExchange delayExchange() { + return new CustomExchange(rabbitProperties.getDelayExchangeName(), "x-delayed-message", + true, false, Map.of("x-delayed-type", "direct")); + } + + /** + * 将延迟队列绑定到延迟交换机 + */ + @Bean + public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { + return BindingBuilder.bind(delayQueue).to(delayExchange).with(rabbitProperties.getDelayRoutingKey()).noargs(); + } + + /** + * 声明死信队列 + */ + @Bean + public Queue deadLetterQueue() { + return new Queue(rabbitProperties.getDeadLetterQueue()); + } + + /** + * 声明死信交换机 + */ + @Bean + public DirectExchange deadLetterExchange() { + return new DirectExchange(rabbitProperties.getDeadLetterExchange()); + } + + /** + * 将死信队列绑定到死信交换机 + */ + @Bean + public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) { + return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(rabbitProperties.getDeadLetterRoutingKey()); + } + +} + diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/consumer/RabbitConsumer.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/consumer/RabbitConsumer.java new file mode 100644 index 00000000..1f1b5bd1 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/consumer/RabbitConsumer.java @@ -0,0 +1,65 @@ +package org.dromara.rabbitmq.consumer; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.utils.StringUtils; +import org.dromara.rabbitmq.service.IMqDelayTaskService; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * @author lilemy + * @date 2025-12-05 14:09 + */ +@Slf4j +@Component +public class RabbitConsumer { + + @Resource + private IMqDelayTaskService mqDelayTaskService; + + /** + * 普通消息 + */ + @RabbitListener(queues = "${rabbitmq.queue-name}") + public void listenQueue(Message message) { + log.info("【消费者】Start consuming data:{}", new String(message.getBody())); + } + + /** + * 处理延迟队列消息 + */ + @RabbitListener(queues = "${rabbitmq.delay-queue-name}") + public void receiveDelayMessage(String message) { + log.info("【消费者】Received delayed message:{}", message); + if (StringUtils.isNotBlank(message) && isLong(message)) { + mqDelayTaskService.executeTask(Long.parseLong(message)); + } + } + + /** + * 处理死信队列消息 + * 当消息在延迟队列中未能被正确处理(例如因消费者逻辑错误、超时未ACK等原因) + * 它会被自动转发到死信队列中,以便后续的特殊处理或重新尝试。 + */ + @RabbitListener(queues = "${rabbitmq.dead-letter-queue}") + public void receiveDeadMessage(String message) { + log.info("【消费者】Received dead message:{}", message); + } + + /** + * 判断字符串是否为Long + * + * @param str 字符串 + * @return 是否为Long + */ + private static boolean isLong(String str) { + try { + Long.parseLong(str); + return true; + } catch (Exception e) { + return false; + } + } +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/domain/MqDelayTask.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/domain/MqDelayTask.java new file mode 100644 index 00000000..3a7499ba --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/domain/MqDelayTask.java @@ -0,0 +1,66 @@ +package org.dromara.rabbitmq.domain; + +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * 延迟任务对象 mq_delay_task + * + * @author lilemy + * @date 2025-12-05 + */ +@Data +@TableName("mq_delay_task") +public class MqDelayTask implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * 主键 + */ + @TableId(value = "id") + private Long id; + + /** + * 业务类型 + */ + private String bizType; + + /** + * 业务ID + */ + private Long bizId; + + /** + * 执行的时间点 + */ + private LocalDateTime executeTime; + + /** + * 任务状态 0未执行 1执行中 2执行成功 3执行失败 + */ + private Integer status; + + /** + * 重试次数 + */ + private Integer retryCount; + + /** + * 最大重试次数 + */ + private Integer maxRetry; + + /** + * 失败原因 + */ + private String failReason; + + +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/domain/vo/MqDelayTaskVo.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/domain/vo/MqDelayTaskVo.java new file mode 100644 index 00000000..99287526 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/domain/vo/MqDelayTaskVo.java @@ -0,0 +1,76 @@ +package org.dromara.rabbitmq.domain.vo; + +import com.alibaba.excel.annotation.ExcelIgnoreUnannotated; +import com.alibaba.excel.annotation.ExcelProperty; +import io.github.linpeilie.annotations.AutoMapper; +import lombok.Data; +import org.dromara.rabbitmq.domain.MqDelayTask; + +import java.io.Serial; +import java.io.Serializable; +import java.time.LocalDateTime; + + +/** + * 延迟任务视图对象 mq_delay_task + * + * @author lilemy + * @date 2025-12-05 + */ +@Data +@ExcelIgnoreUnannotated +@AutoMapper(target = MqDelayTask.class) +public class MqDelayTaskVo implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * 主键 + */ + @ExcelProperty(value = "主键") + private Long id; + + /** + * 业务类型 + */ + @ExcelProperty(value = "业务类型") + private String bizType; + + /** + * 业务ID + */ + @ExcelProperty(value = "业务ID") + private Long bizId; + + /** + * 执行的时间点 + */ + @ExcelProperty(value = "执行的时间点") + private LocalDateTime executeTime; + + /** + * 任务状态 0未执行 1执行中 2执行成功 3执行失败 + */ + @ExcelProperty(value = "任务状态 0未执行 1执行中 2执行成功 3执行失败") + private Integer status; + + /** + * 重试次数 + */ + @ExcelProperty(value = "重试次数") + private Integer retryCount; + + /** + * 最大重试次数 + */ + @ExcelProperty(value = "最大重试次数") + private Integer maxRetry; + + /** + * 失败原因 + */ + @ExcelProperty(value = "失败原因") + private String failReason; + +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/enums/MqDelayTaskTypeEnum.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/enums/MqDelayTaskTypeEnum.java new file mode 100644 index 00000000..c7e7fc67 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/enums/MqDelayTaskTypeEnum.java @@ -0,0 +1,35 @@ +package org.dromara.rabbitmq.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author lilemy + * @date 2025-12-05 18:26 + */ +@Getter +@AllArgsConstructor +public enum MqDelayTaskTypeEnum { + + /** + * 安全隐患 + */ + HIDDEN_DANGER("aqyh"); + + private final String type; + + /** + * 根据业务类型获取枚举 + * + * @param type 业务类型 + * @return 枚举 + */ + public static MqDelayTaskTypeEnum getByType(String type) { + for (MqDelayTaskTypeEnum value : values()) { + if (value.getType().equals(type)) { + return value; + } + } + return null; + } +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/mapper/MqDelayTaskMapper.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/mapper/MqDelayTaskMapper.java new file mode 100644 index 00000000..b817c06c --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/mapper/MqDelayTaskMapper.java @@ -0,0 +1,15 @@ +package org.dromara.rabbitmq.mapper; + +import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; +import org.dromara.rabbitmq.domain.MqDelayTask; +import org.dromara.rabbitmq.domain.vo.MqDelayTaskVo; + +/** + * 延迟任务Mapper接口 + * + * @author lilemy + * @date 2025-12-05 + */ +public interface MqDelayTaskMapper extends BaseMapperPlus { + +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/producer/RabbitProducer.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/producer/RabbitProducer.java new file mode 100644 index 00000000..13dcbe08 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/producer/RabbitProducer.java @@ -0,0 +1,51 @@ +package org.dromara.rabbitmq.producer; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.dromara.rabbitmq.config.RabbitProperties; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +/** + * @author lilemy + * @date 2025-12-05 11:40 + */ +@Slf4j +@Component +public class RabbitProducer { + + @Resource + private RabbitTemplate rabbitTemplate; + + @Resource + private RabbitProperties rabbitProperties; + + /** + * 发送消息 + * + * @param message 消息 + */ + public void send(String message) { + rabbitTemplate.convertAndSend(rabbitProperties.getExchangeName(), rabbitProperties.getRoutingKey(), message); + log.info("【生产者】Message send: {}", message); + } + + /** + * 发送延迟消息 + * + * @param message 消息 + * @param delayMs 延迟时间(毫秒) + */ + public void sendDelayMessage(String message, long delayMs) { + rabbitTemplate.convertAndSend( + rabbitProperties.getDelayExchangeName(), + rabbitProperties.getDelayRoutingKey(), + message, + message1 -> { + message1.getMessageProperties().setDelayLong(delayMs); + return message1; + } + ); + log.info("【生产者】Delay Message send: {} delay: {}ms", message, delayMs); + } +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/service/IMqDelayTaskService.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/service/IMqDelayTaskService.java new file mode 100644 index 00000000..5842da36 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/service/IMqDelayTaskService.java @@ -0,0 +1,27 @@ +package org.dromara.rabbitmq.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import org.dromara.rabbitmq.domain.MqDelayTask; + +/** + * 延迟任务Service接口 + * + * @author lilemy + * @date 2025-12-05 + */ +public interface IMqDelayTaskService extends IService { + + /** + * 添加延迟任务 + * + * @param task 延迟任务 + */ + void addDelayTask(MqDelayTask task); + + /** + * 执行任务 + * + * @param id 主键id + */ + void executeTask(Long id); +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/service/impl/MqDelayTaskServiceImpl.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/service/impl/MqDelayTaskServiceImpl.java new file mode 100644 index 00000000..54bdfe83 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/rabbitmq/service/impl/MqDelayTaskServiceImpl.java @@ -0,0 +1,114 @@ +package org.dromara.rabbitmq.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import jakarta.annotation.Resource; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.exception.ServiceException; +import org.dromara.common.core.utils.StringUtils; +import org.dromara.rabbitmq.domain.MqDelayTask; +import org.dromara.rabbitmq.enums.MqDelayTaskTypeEnum; +import org.dromara.rabbitmq.mapper.MqDelayTaskMapper; +import org.dromara.rabbitmq.producer.RabbitProducer; +import org.dromara.rabbitmq.service.IMqDelayTaskService; +import org.dromara.safety.service.IHazardHiddenDangerRectifyService; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; + +/** + * 延迟任务Service业务层处理 + * + * @author lilemy + * @date 2025-12-05 + */ +@Slf4j +@RequiredArgsConstructor +@Service +public class MqDelayTaskServiceImpl extends ServiceImpl + implements IMqDelayTaskService { + + @Resource + private RabbitProducer rabbitProducer; + + @Lazy + @Resource + private IHazardHiddenDangerRectifyService hazardHiddenDangerRectifyService; + + /** + * 添加延迟任务 + * + * @param task 延迟任务 + */ + @Override + public void addDelayTask(MqDelayTask task) { + String bizType = task.getBizType(); + Long bizId = task.getBizId(); + LocalDateTime executeTime = task.getExecuteTime(); + if (StringUtils.isBlank(bizType) || bizId == null || executeTime == null) { + throw new ServiceException("参数错误"); + } + // 保存延迟任务 + boolean save = this.save(task); + if (!save) { + throw new ServiceException("添加延迟任务失败"); + } + // 计算超时时间 + ZoneId zone = ZoneId.systemDefault(); + long diffMillis = Duration.between( + LocalDateTime.now().atZone(zone), + executeTime.atZone(zone) + ).toMillis(); + if (diffMillis <= 0) { + throw new ServiceException("延迟时间不能小于当前时间"); + } + // 添加延迟任务 + rabbitProducer.sendDelayMessage(String.valueOf(task.getId()), diffMillis); + } + + /** + * 执行任务 + * + * @param id 主键id + */ + @Override + public void executeTask(Long id) { + MqDelayTask task = this.getById(id); + if (task == null) { + return; + } + if (task.getStatus() == 1 || task.getStatus() == 2) { + return; + } + try { + // 执行中 + task.setStatus(1); + this.updateById(task); + // 业务逻辑:发送通知 + String bizType = task.getBizType(); + MqDelayTaskTypeEnum type = MqDelayTaskTypeEnum.getByType(bizType); + switch (type) { + case HIDDEN_DANGER -> hazardHiddenDangerRectifyService.sendTimeoutNotify(task.getBizId()); + case null, default -> { + } + } + // 成功 + task.setStatus(2); + this.updateById(task); + } catch (Exception e) { + // 更新失败状态 + task.setRetryCount(task.getRetryCount() + 1); + task.setFailReason(e.getMessage()); + task.setStatus(3); + this.updateById(task); + // 重试机制 + if (task.getRetryCount() < task.getMaxRetry()) { + // 1分钟后重试 + rabbitProducer.sendDelayMessage(String.valueOf(id), 60 * 1000); + } + } + } +} diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/IHazardHiddenDangerRectifyService.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/IHazardHiddenDangerRectifyService.java index 45a4eb06..96adfbe0 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/IHazardHiddenDangerRectifyService.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/IHazardHiddenDangerRectifyService.java @@ -87,4 +87,11 @@ public interface IHazardHiddenDangerRectifyService extends IService ids, Boolean isValid); + + /** + * 发送超时通知 + * + * @param bizId 业务id + */ + void sendTimeoutNotify(Long bizId); } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/impl/HazardHiddenDangerRectifyServiceImpl.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/impl/HazardHiddenDangerRectifyServiceImpl.java index c3e345a4..8bd887dd 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/impl/HazardHiddenDangerRectifyServiceImpl.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/impl/HazardHiddenDangerRectifyServiceImpl.java @@ -5,12 +5,15 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.exception.ServiceException; import org.dromara.common.core.utils.MapstructUtils; import org.dromara.common.core.utils.StringUtils; import org.dromara.common.mybatis.core.page.PageQuery; import org.dromara.common.mybatis.core.page.TableDataInfo; import org.dromara.common.satoken.utils.LoginHelper; +import org.dromara.common.sse.dto.SseMessageDto; +import org.dromara.common.sse.utils.SseMessageUtils; import org.dromara.safety.domain.HazardHiddenDanger; import org.dromara.safety.domain.HazardHiddenDangerRectify; import org.dromara.safety.domain.bo.HazardHiddenDangerBo; @@ -24,6 +27,8 @@ import org.dromara.safety.domain.vo.RectifyTimesVo; import org.dromara.safety.mapper.HazardHiddenDangerRectifyMapper; import org.dromara.safety.service.IHazardHiddenDangerRectifyService; import org.dromara.safety.service.IHazardHiddenDangerService; +import org.dromara.safety.service.IHazardRuleNotifyObjectService; +import org.dromara.websocket.ChatServerHandler; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -32,6 +37,7 @@ import java.time.LocalDateTime; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; /** * 隐患整改情况Service业务层处理 @@ -39,6 +45,7 @@ import java.util.Map; * @author lilemy * @date 2025-12-04 */ +@Slf4j @RequiredArgsConstructor @Service public class HazardHiddenDangerRectifyServiceImpl extends ServiceImpl @@ -46,6 +53,10 @@ public class HazardHiddenDangerRectifyServiceImpl extends ServiceImpl 0; } + + /** + * 发送超时通知 + * + * @param bizId 业务id + */ + @Override + public void sendTimeoutNotify(Long bizId) { + HazardHiddenDanger hiddenDanger = hazardHiddenDangerService.getById(bizId); + if (hiddenDanger == null) { + throw new ServiceException("未找到该数据"); + } + if (!HazardHiddenDanger.RECTIFY.equals(hiddenDanger.getStatus())) { + return; + } + // 发送消息 + String dangerCode = hiddenDanger.getDangerCode(); + Long rectifyUserId = hiddenDanger.getRectifyUserId(); + Long projectId = hiddenDanger.getProjectId(); + String titleRectify = "您的安全隐患工单[" + dangerCode + "]已超时,请及时处理!"; + try { + chatServerHandler.sendSystemMessageToUser(rectifyUserId, titleRectify, "2"); + SseMessageUtils.sendMessage(rectifyUserId, titleRectify); + } catch (Exception e) { + log.error("异步发送系统消息失败,用户ID: {}, 消息: {}", rectifyUserId, titleRectify, e); + } + // 发送给需要通知的对象 + Set userIds = hazardRuleNotifyObjectService.queryNotifyObjectIds(hiddenDanger.getDangerLevelId(), projectId); + SseMessageDto sseDto = new SseMessageDto(); + String title = "安全隐患工单[" + dangerCode + "]未进行整改,请及时关注!"; + for (Long userId : userIds) { + try { + chatServerHandler.sendSystemMessageToUser(userId, title, "2"); + } catch (Exception e) { + log.error("异步发送系统消息失败,用户ID: {}, 消息: {}", userId, title, e); + } + } + sseDto.setUserIds(userIds.stream().toList()); + sseDto.setMessage(title); + sseDto.setRoute(""); + sseDto.setProjectId(projectId); + sseDto.setIsRecord(true); + SseMessageUtils.publishMessage(sseDto); + } } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/impl/HazardHiddenDangerServiceImpl.java b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/impl/HazardHiddenDangerServiceImpl.java index 0f4158e7..4152b2c0 100644 --- a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/impl/HazardHiddenDangerServiceImpl.java +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/safety/service/impl/HazardHiddenDangerServiceImpl.java @@ -16,6 +16,9 @@ import org.dromara.common.mybatis.core.page.PageQuery; import org.dromara.common.mybatis.core.page.TableDataInfo; import org.dromara.common.sse.dto.SseMessageDto; import org.dromara.common.sse.utils.SseMessageUtils; +import org.dromara.rabbitmq.domain.MqDelayTask; +import org.dromara.rabbitmq.enums.MqDelayTaskTypeEnum; +import org.dromara.rabbitmq.service.IMqDelayTaskService; import org.dromara.safety.domain.HazardHiddenDanger; import org.dromara.safety.domain.HazardHiddenDangerRectify; import org.dromara.safety.domain.HazardRule; @@ -62,6 +65,8 @@ public class HazardHiddenDangerServiceImpl extends ServiceImpl { // 发送给整改人员 - String titleRectify = "您有新的安全隐患工单需要整改,请及时处理!"; + String titleRectify = "您有新的安全隐患工单[" + dangerCode + "]需要整改,请及时处理!"; try { chatServerHandler.sendSystemMessageToUser(rectifyUserId, titleRectify, "2"); SseMessageUtils.sendMessage(rectifyUserId, titleRectify); @@ -270,7 +276,7 @@ public class HazardHiddenDangerServiceImpl extends ServiceImpl userIds = hazardRuleNotifyObjectService.queryNotifyObjectIds(hazardHiddenDanger.getDangerLevelId(), projectId); SseMessageDto sseDto = new SseMessageDto(); - String title = "您有新的安全隐患工单,请及时查看!"; + String title = "您有新的安全隐患工单[" + dangerCode + "],请及时查看!"; for (Long userId : userIds) { try { chatServerHandler.sendSystemMessageToUser(userId, title, "2"); @@ -285,6 +291,15 @@ public class HazardHiddenDangerServiceImpl extends ServiceImpl ids, Boolean isValid) { if (isValid) { //TODO 做一些业务上的校验,判断是否需要校验 + if (hazardHiddenDangerService.lambdaQuery() + .in(HazardHiddenDanger::getDangerLevelId, ids) + .count() > 0) { + throw new ServiceException("请先删除该等级下的隐患信息", HttpStatus.ERROR); + } } return baseMapper.deleteByIds(ids) > 0; } diff --git a/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/resources/mapper/rabbitmq/MqDelayTaskMapper.xml b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/resources/mapper/rabbitmq/MqDelayTaskMapper.xml new file mode 100644 index 00000000..8a474e61 --- /dev/null +++ b/xinnengyuan/ruoyi-modules/ruoyi-system/src/main/resources/mapper/rabbitmq/MqDelayTaskMapper.xml @@ -0,0 +1,7 @@ + + + + +