接入消息队列,处理延迟消息
This commit is contained in:
@ -368,3 +368,14 @@ drone:
|
||||
chat:
|
||||
server:
|
||||
port: 19099
|
||||
# rabbitmq 配置
|
||||
rabbitmq:
|
||||
exchange-name: dev-normal-exchange
|
||||
queue-name: dev-normal-queue
|
||||
routing-key: dev.normal.routing.key
|
||||
delay-exchange-name: dev-delay-queue
|
||||
delay-queue-name: dev-delay-exchange
|
||||
delay-routing-key: dev.delay.routing.key
|
||||
dead-letter-exchange: dev-dlx-exchange
|
||||
dead-letter-queue: dev-dlx-queue
|
||||
dead-letter-routing-key: dev.dlx.routing.key
|
||||
|
||||
@ -343,3 +343,14 @@ drone:
|
||||
chat:
|
||||
server:
|
||||
port: 18088
|
||||
# rabbitmq 配置
|
||||
rabbitmq:
|
||||
exchange-name: local-normal-exchange
|
||||
queue-name: local-normal-queue
|
||||
routing-key: local.normal.routing.key
|
||||
delay-exchange-name: local-delay-queue
|
||||
delay-queue-name: local-delay-exchange
|
||||
delay-routing-key: local.delay.routing.key
|
||||
dead-letter-exchange: local-dlx-exchange
|
||||
dead-letter-queue: local-dlx-queue
|
||||
dead-letter-routing-key: local.dlx.routing.key
|
||||
|
||||
@ -358,3 +358,14 @@ drone:
|
||||
chat:
|
||||
server:
|
||||
port: 19099
|
||||
# rabbitmq 配置
|
||||
rabbitmq:
|
||||
exchange-name: prod-normal-exchange
|
||||
queue-name: prod-normal-queue
|
||||
routing-key: prod.normal.routing.key
|
||||
delay-exchange-name: prod-delay-queue
|
||||
delay-queue-name: prod-delay-exchange
|
||||
delay-routing-key: prod.delay.routing.key
|
||||
dead-letter-exchange: prod-dlx-exchange
|
||||
dead-letter-queue: prod-dlx-queue
|
||||
dead-letter-routing-key: prod.dlx.routing.key
|
||||
|
||||
@ -95,7 +95,13 @@ spring:
|
||||
deserialization:
|
||||
# 允许对象忽略json中不存在的属性
|
||||
fail_on_unknown_properties: false
|
||||
|
||||
rabbitmq:
|
||||
host: 192.168.110.2
|
||||
port: 5672
|
||||
username: admin
|
||||
password: yuanjiexny
|
||||
publisher-returns: true
|
||||
publisher-confirm-type: correlated
|
||||
# Sa-Token配置
|
||||
sa-token:
|
||||
# token名称 (同时也是cookie名称)
|
||||
|
||||
@ -29,6 +29,10 @@
|
||||
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
<!-- TwelveMonkeys ImageIO 扩展 -->
|
||||
<dependency>
|
||||
<groupId>com.twelvemonkeys.imageio</groupId>
|
||||
|
||||
@ -0,0 +1,57 @@
|
||||
package org.dromara.rabbitmq.config;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.TopicExchange;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitConfig {
|
||||
|
||||
@Resource
|
||||
private RabbitProperties rabbitProperties;
|
||||
|
||||
/**
|
||||
* 创建交换机
|
||||
* ExchangeBuilder有四种交换机模式
|
||||
* Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。
|
||||
* Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。
|
||||
* Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。
|
||||
* Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。
|
||||
* durable 交换器是否持久化(false 不持久化,true 持久化)
|
||||
**/
|
||||
@Bean
|
||||
public TopicExchange exchange() {
|
||||
return new TopicExchange(rabbitProperties.getExchangeName());
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建队列
|
||||
* durable 队列是否持久化 队列调用此方法就是持久化 可查看方法的源码
|
||||
* deliveryMode 消息是否持久化(1 不持久化,2 持久化)
|
||||
**/
|
||||
@Bean
|
||||
public Queue queue() {
|
||||
return new Queue(rabbitProperties.getQueueName(), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 绑定交换机和队列
|
||||
* bing 方法参数可以是队列和交换机
|
||||
* to 方法参数必须是交换机
|
||||
* with 方法参数是路由Key 这里是以rabbit.开头
|
||||
* noargs 就是不要参数的意思
|
||||
* 这个方法的意思是把rabbit开头的消息 和 上面的队列 和 上面的交换机绑定
|
||||
**/
|
||||
@Bean
|
||||
public Binding binding(Queue queue, TopicExchange exchange) {
|
||||
return BindingBuilder.bind(queue).to(exchange).with(rabbitProperties.getRoutingKey());
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,60 @@
|
||||
package org.dromara.rabbitmq.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author lilemy
|
||||
* @date 2025-12-05 16:25
|
||||
*/
|
||||
@Data
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "rabbitmq")
|
||||
public class RabbitProperties {
|
||||
|
||||
/**
|
||||
* 交换机名称
|
||||
*/
|
||||
private String exchangeName;
|
||||
|
||||
/**
|
||||
* 队列名称
|
||||
*/
|
||||
private String queueName;
|
||||
|
||||
/**
|
||||
* 路由键名称
|
||||
*/
|
||||
private String routingKey;
|
||||
|
||||
/**
|
||||
* 延迟交换机名称
|
||||
*/
|
||||
private String delayExchangeName;
|
||||
|
||||
/**
|
||||
* 延迟队列名称
|
||||
*/
|
||||
private String delayQueueName;
|
||||
|
||||
/**
|
||||
* 延迟路由键名称
|
||||
*/
|
||||
private String delayRoutingKey;
|
||||
|
||||
/**
|
||||
* 死信交换机名称
|
||||
*/
|
||||
private String deadLetterExchange;
|
||||
|
||||
/**
|
||||
* 死信队列名称
|
||||
*/
|
||||
private String deadLetterQueue;
|
||||
|
||||
/**
|
||||
* 死信路由键名称
|
||||
*/
|
||||
private String deadLetterRoutingKey;
|
||||
}
|
||||
@ -0,0 +1,75 @@
|
||||
package org.dromara.rabbitmq.config;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* RabbitTTL队列
|
||||
*
|
||||
* @author xbhog
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitTtlQueueConfig {
|
||||
|
||||
@Resource
|
||||
private RabbitProperties rabbitProperties;
|
||||
|
||||
/**
|
||||
* 声明延迟队列
|
||||
*/
|
||||
@Bean
|
||||
public Queue delayQueue() {
|
||||
return QueueBuilder.durable(rabbitProperties.getDelayQueueName())
|
||||
.deadLetterExchange(rabbitProperties.getDeadLetterExchange())
|
||||
.deadLetterRoutingKey(rabbitProperties.getDeadLetterRoutingKey())
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明延迟交换机
|
||||
*/
|
||||
@Bean
|
||||
public CustomExchange delayExchange() {
|
||||
return new CustomExchange(rabbitProperties.getDelayExchangeName(), "x-delayed-message",
|
||||
true, false, Map.of("x-delayed-type", "direct"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将延迟队列绑定到延迟交换机
|
||||
*/
|
||||
@Bean
|
||||
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
|
||||
return BindingBuilder.bind(delayQueue).to(delayExchange).with(rabbitProperties.getDelayRoutingKey()).noargs();
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明死信队列
|
||||
*/
|
||||
@Bean
|
||||
public Queue deadLetterQueue() {
|
||||
return new Queue(rabbitProperties.getDeadLetterQueue());
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明死信交换机
|
||||
*/
|
||||
@Bean
|
||||
public DirectExchange deadLetterExchange() {
|
||||
return new DirectExchange(rabbitProperties.getDeadLetterExchange());
|
||||
}
|
||||
|
||||
/**
|
||||
* 将死信队列绑定到死信交换机
|
||||
*/
|
||||
@Bean
|
||||
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
|
||||
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(rabbitProperties.getDeadLetterRoutingKey());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,65 @@
|
||||
package org.dromara.rabbitmq.consumer;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.core.utils.StringUtils;
|
||||
import org.dromara.rabbitmq.service.IMqDelayTaskService;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author lilemy
|
||||
* @date 2025-12-05 14:09
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RabbitConsumer {
|
||||
|
||||
@Resource
|
||||
private IMqDelayTaskService mqDelayTaskService;
|
||||
|
||||
/**
|
||||
* 普通消息
|
||||
*/
|
||||
@RabbitListener(queues = "${rabbitmq.queue-name}")
|
||||
public void listenQueue(Message message) {
|
||||
log.info("【消费者】Start consuming data:{}", new String(message.getBody()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理延迟队列消息
|
||||
*/
|
||||
@RabbitListener(queues = "${rabbitmq.delay-queue-name}")
|
||||
public void receiveDelayMessage(String message) {
|
||||
log.info("【消费者】Received delayed message:{}", message);
|
||||
if (StringUtils.isNotBlank(message) && isLong(message)) {
|
||||
mqDelayTaskService.executeTask(Long.parseLong(message));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理死信队列消息
|
||||
* 当消息在延迟队列中未能被正确处理(例如因消费者逻辑错误、超时未ACK等原因)
|
||||
* 它会被自动转发到死信队列中,以便后续的特殊处理或重新尝试。
|
||||
*/
|
||||
@RabbitListener(queues = "${rabbitmq.dead-letter-queue}")
|
||||
public void receiveDeadMessage(String message) {
|
||||
log.info("【消费者】Received dead message:{}", message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断字符串是否为Long
|
||||
*
|
||||
* @param str 字符串
|
||||
* @return 是否为Long
|
||||
*/
|
||||
private static boolean isLong(String str) {
|
||||
try {
|
||||
Long.parseLong(str);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,66 @@
|
||||
package org.dromara.rabbitmq.domain;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* 延迟任务对象 mq_delay_task
|
||||
*
|
||||
* @author lilemy
|
||||
* @date 2025-12-05
|
||||
*/
|
||||
@Data
|
||||
@TableName("mq_delay_task")
|
||||
public class MqDelayTask implements Serializable {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
@TableId(value = "id")
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 业务类型
|
||||
*/
|
||||
private String bizType;
|
||||
|
||||
/**
|
||||
* 业务ID
|
||||
*/
|
||||
private Long bizId;
|
||||
|
||||
/**
|
||||
* 执行的时间点
|
||||
*/
|
||||
private LocalDateTime executeTime;
|
||||
|
||||
/**
|
||||
* 任务状态 0未执行 1执行中 2执行成功 3执行失败
|
||||
*/
|
||||
private Integer status;
|
||||
|
||||
/**
|
||||
* 重试次数
|
||||
*/
|
||||
private Integer retryCount;
|
||||
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
private Integer maxRetry;
|
||||
|
||||
/**
|
||||
* 失败原因
|
||||
*/
|
||||
private String failReason;
|
||||
|
||||
|
||||
}
|
||||
@ -0,0 +1,76 @@
|
||||
package org.dromara.rabbitmq.domain.vo;
|
||||
|
||||
import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;
|
||||
import com.alibaba.excel.annotation.ExcelProperty;
|
||||
import io.github.linpeilie.annotations.AutoMapper;
|
||||
import lombok.Data;
|
||||
import org.dromara.rabbitmq.domain.MqDelayTask;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
|
||||
/**
|
||||
* 延迟任务视图对象 mq_delay_task
|
||||
*
|
||||
* @author lilemy
|
||||
* @date 2025-12-05
|
||||
*/
|
||||
@Data
|
||||
@ExcelIgnoreUnannotated
|
||||
@AutoMapper(target = MqDelayTask.class)
|
||||
public class MqDelayTaskVo implements Serializable {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
@ExcelProperty(value = "主键")
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 业务类型
|
||||
*/
|
||||
@ExcelProperty(value = "业务类型")
|
||||
private String bizType;
|
||||
|
||||
/**
|
||||
* 业务ID
|
||||
*/
|
||||
@ExcelProperty(value = "业务ID")
|
||||
private Long bizId;
|
||||
|
||||
/**
|
||||
* 执行的时间点
|
||||
*/
|
||||
@ExcelProperty(value = "执行的时间点")
|
||||
private LocalDateTime executeTime;
|
||||
|
||||
/**
|
||||
* 任务状态 0未执行 1执行中 2执行成功 3执行失败
|
||||
*/
|
||||
@ExcelProperty(value = "任务状态 0未执行 1执行中 2执行成功 3执行失败")
|
||||
private Integer status;
|
||||
|
||||
/**
|
||||
* 重试次数
|
||||
*/
|
||||
@ExcelProperty(value = "重试次数")
|
||||
private Integer retryCount;
|
||||
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
@ExcelProperty(value = "最大重试次数")
|
||||
private Integer maxRetry;
|
||||
|
||||
/**
|
||||
* 失败原因
|
||||
*/
|
||||
@ExcelProperty(value = "失败原因")
|
||||
private String failReason;
|
||||
|
||||
}
|
||||
@ -0,0 +1,35 @@
|
||||
package org.dromara.rabbitmq.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @author lilemy
|
||||
* @date 2025-12-05 18:26
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum MqDelayTaskTypeEnum {
|
||||
|
||||
/**
|
||||
* 安全隐患
|
||||
*/
|
||||
HIDDEN_DANGER("aqyh");
|
||||
|
||||
private final String type;
|
||||
|
||||
/**
|
||||
* 根据业务类型获取枚举
|
||||
*
|
||||
* @param type 业务类型
|
||||
* @return 枚举
|
||||
*/
|
||||
public static MqDelayTaskTypeEnum getByType(String type) {
|
||||
for (MqDelayTaskTypeEnum value : values()) {
|
||||
if (value.getType().equals(type)) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
package org.dromara.rabbitmq.mapper;
|
||||
|
||||
import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
|
||||
import org.dromara.rabbitmq.domain.MqDelayTask;
|
||||
import org.dromara.rabbitmq.domain.vo.MqDelayTaskVo;
|
||||
|
||||
/**
|
||||
* 延迟任务Mapper接口
|
||||
*
|
||||
* @author lilemy
|
||||
* @date 2025-12-05
|
||||
*/
|
||||
public interface MqDelayTaskMapper extends BaseMapperPlus<MqDelayTask, MqDelayTaskVo> {
|
||||
|
||||
}
|
||||
@ -0,0 +1,51 @@
|
||||
package org.dromara.rabbitmq.producer;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.rabbitmq.config.RabbitProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author lilemy
|
||||
* @date 2025-12-05 11:40
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RabbitProducer {
|
||||
|
||||
@Resource
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Resource
|
||||
private RabbitProperties rabbitProperties;
|
||||
|
||||
/**
|
||||
* 发送消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public void send(String message) {
|
||||
rabbitTemplate.convertAndSend(rabbitProperties.getExchangeName(), rabbitProperties.getRoutingKey(), message);
|
||||
log.info("【生产者】Message send: {}", message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送延迟消息
|
||||
*
|
||||
* @param message 消息
|
||||
* @param delayMs 延迟时间(毫秒)
|
||||
*/
|
||||
public void sendDelayMessage(String message, long delayMs) {
|
||||
rabbitTemplate.convertAndSend(
|
||||
rabbitProperties.getDelayExchangeName(),
|
||||
rabbitProperties.getDelayRoutingKey(),
|
||||
message,
|
||||
message1 -> {
|
||||
message1.getMessageProperties().setDelayLong(delayMs);
|
||||
return message1;
|
||||
}
|
||||
);
|
||||
log.info("【生产者】Delay Message send: {} delay: {}ms", message, delayMs);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,27 @@
|
||||
package org.dromara.rabbitmq.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import org.dromara.rabbitmq.domain.MqDelayTask;
|
||||
|
||||
/**
|
||||
* 延迟任务Service接口
|
||||
*
|
||||
* @author lilemy
|
||||
* @date 2025-12-05
|
||||
*/
|
||||
public interface IMqDelayTaskService extends IService<MqDelayTask> {
|
||||
|
||||
/**
|
||||
* 添加延迟任务
|
||||
*
|
||||
* @param task 延迟任务
|
||||
*/
|
||||
void addDelayTask(MqDelayTask task);
|
||||
|
||||
/**
|
||||
* 执行任务
|
||||
*
|
||||
* @param id 主键id
|
||||
*/
|
||||
void executeTask(Long id);
|
||||
}
|
||||
@ -0,0 +1,114 @@
|
||||
package org.dromara.rabbitmq.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.core.exception.ServiceException;
|
||||
import org.dromara.common.core.utils.StringUtils;
|
||||
import org.dromara.rabbitmq.domain.MqDelayTask;
|
||||
import org.dromara.rabbitmq.enums.MqDelayTaskTypeEnum;
|
||||
import org.dromara.rabbitmq.mapper.MqDelayTaskMapper;
|
||||
import org.dromara.rabbitmq.producer.RabbitProducer;
|
||||
import org.dromara.rabbitmq.service.IMqDelayTaskService;
|
||||
import org.dromara.safety.service.IHazardHiddenDangerRectifyService;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
|
||||
/**
|
||||
* 延迟任务Service业务层处理
|
||||
*
|
||||
* @author lilemy
|
||||
* @date 2025-12-05
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@Service
|
||||
public class MqDelayTaskServiceImpl extends ServiceImpl<MqDelayTaskMapper, MqDelayTask>
|
||||
implements IMqDelayTaskService {
|
||||
|
||||
@Resource
|
||||
private RabbitProducer rabbitProducer;
|
||||
|
||||
@Lazy
|
||||
@Resource
|
||||
private IHazardHiddenDangerRectifyService hazardHiddenDangerRectifyService;
|
||||
|
||||
/**
|
||||
* 添加延迟任务
|
||||
*
|
||||
* @param task 延迟任务
|
||||
*/
|
||||
@Override
|
||||
public void addDelayTask(MqDelayTask task) {
|
||||
String bizType = task.getBizType();
|
||||
Long bizId = task.getBizId();
|
||||
LocalDateTime executeTime = task.getExecuteTime();
|
||||
if (StringUtils.isBlank(bizType) || bizId == null || executeTime == null) {
|
||||
throw new ServiceException("参数错误");
|
||||
}
|
||||
// 保存延迟任务
|
||||
boolean save = this.save(task);
|
||||
if (!save) {
|
||||
throw new ServiceException("添加延迟任务失败");
|
||||
}
|
||||
// 计算超时时间
|
||||
ZoneId zone = ZoneId.systemDefault();
|
||||
long diffMillis = Duration.between(
|
||||
LocalDateTime.now().atZone(zone),
|
||||
executeTime.atZone(zone)
|
||||
).toMillis();
|
||||
if (diffMillis <= 0) {
|
||||
throw new ServiceException("延迟时间不能小于当前时间");
|
||||
}
|
||||
// 添加延迟任务
|
||||
rabbitProducer.sendDelayMessage(String.valueOf(task.getId()), diffMillis);
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行任务
|
||||
*
|
||||
* @param id 主键id
|
||||
*/
|
||||
@Override
|
||||
public void executeTask(Long id) {
|
||||
MqDelayTask task = this.getById(id);
|
||||
if (task == null) {
|
||||
return;
|
||||
}
|
||||
if (task.getStatus() == 1 || task.getStatus() == 2) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
// 执行中
|
||||
task.setStatus(1);
|
||||
this.updateById(task);
|
||||
// 业务逻辑:发送通知
|
||||
String bizType = task.getBizType();
|
||||
MqDelayTaskTypeEnum type = MqDelayTaskTypeEnum.getByType(bizType);
|
||||
switch (type) {
|
||||
case HIDDEN_DANGER -> hazardHiddenDangerRectifyService.sendTimeoutNotify(task.getBizId());
|
||||
case null, default -> {
|
||||
}
|
||||
}
|
||||
// 成功
|
||||
task.setStatus(2);
|
||||
this.updateById(task);
|
||||
} catch (Exception e) {
|
||||
// 更新失败状态
|
||||
task.setRetryCount(task.getRetryCount() + 1);
|
||||
task.setFailReason(e.getMessage());
|
||||
task.setStatus(3);
|
||||
this.updateById(task);
|
||||
// 重试机制
|
||||
if (task.getRetryCount() < task.getMaxRetry()) {
|
||||
// 1分钟后重试
|
||||
rabbitProducer.sendDelayMessage(String.valueOf(id), 60 * 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -87,4 +87,11 @@ public interface IHazardHiddenDangerRectifyService extends IService<HazardHidden
|
||||
* @return 是否删除成功
|
||||
*/
|
||||
Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid);
|
||||
|
||||
/**
|
||||
* 发送超时通知
|
||||
*
|
||||
* @param bizId 业务id
|
||||
*/
|
||||
void sendTimeoutNotify(Long bizId);
|
||||
}
|
||||
|
||||
@ -5,12 +5,15 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.core.exception.ServiceException;
|
||||
import org.dromara.common.core.utils.MapstructUtils;
|
||||
import org.dromara.common.core.utils.StringUtils;
|
||||
import org.dromara.common.mybatis.core.page.PageQuery;
|
||||
import org.dromara.common.mybatis.core.page.TableDataInfo;
|
||||
import org.dromara.common.satoken.utils.LoginHelper;
|
||||
import org.dromara.common.sse.dto.SseMessageDto;
|
||||
import org.dromara.common.sse.utils.SseMessageUtils;
|
||||
import org.dromara.safety.domain.HazardHiddenDanger;
|
||||
import org.dromara.safety.domain.HazardHiddenDangerRectify;
|
||||
import org.dromara.safety.domain.bo.HazardHiddenDangerBo;
|
||||
@ -24,6 +27,8 @@ import org.dromara.safety.domain.vo.RectifyTimesVo;
|
||||
import org.dromara.safety.mapper.HazardHiddenDangerRectifyMapper;
|
||||
import org.dromara.safety.service.IHazardHiddenDangerRectifyService;
|
||||
import org.dromara.safety.service.IHazardHiddenDangerService;
|
||||
import org.dromara.safety.service.IHazardRuleNotifyObjectService;
|
||||
import org.dromara.websocket.ChatServerHandler;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
@ -32,6 +37,7 @@ import java.time.LocalDateTime;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 隐患整改情况Service业务层处理
|
||||
@ -39,6 +45,7 @@ import java.util.Map;
|
||||
* @author lilemy
|
||||
* @date 2025-12-04
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@Service
|
||||
public class HazardHiddenDangerRectifyServiceImpl extends ServiceImpl<HazardHiddenDangerRectifyMapper, HazardHiddenDangerRectify>
|
||||
@ -46,6 +53,10 @@ public class HazardHiddenDangerRectifyServiceImpl extends ServiceImpl<HazardHidd
|
||||
|
||||
private final IHazardHiddenDangerService hazardHiddenDangerService;
|
||||
|
||||
private final ChatServerHandler chatServerHandler;
|
||||
|
||||
private final IHazardRuleNotifyObjectService hazardRuleNotifyObjectService;
|
||||
|
||||
/**
|
||||
* 查询隐患整改情况
|
||||
*
|
||||
@ -297,4 +308,48 @@ public class HazardHiddenDangerRectifyServiceImpl extends ServiceImpl<HazardHidd
|
||||
}
|
||||
return baseMapper.deleteByIds(ids) > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送超时通知
|
||||
*
|
||||
* @param bizId 业务id
|
||||
*/
|
||||
@Override
|
||||
public void sendTimeoutNotify(Long bizId) {
|
||||
HazardHiddenDanger hiddenDanger = hazardHiddenDangerService.getById(bizId);
|
||||
if (hiddenDanger == null) {
|
||||
throw new ServiceException("未找到该数据");
|
||||
}
|
||||
if (!HazardHiddenDanger.RECTIFY.equals(hiddenDanger.getStatus())) {
|
||||
return;
|
||||
}
|
||||
// 发送消息
|
||||
String dangerCode = hiddenDanger.getDangerCode();
|
||||
Long rectifyUserId = hiddenDanger.getRectifyUserId();
|
||||
Long projectId = hiddenDanger.getProjectId();
|
||||
String titleRectify = "您的安全隐患工单[" + dangerCode + "]已超时,请及时处理!";
|
||||
try {
|
||||
chatServerHandler.sendSystemMessageToUser(rectifyUserId, titleRectify, "2");
|
||||
SseMessageUtils.sendMessage(rectifyUserId, titleRectify);
|
||||
} catch (Exception e) {
|
||||
log.error("异步发送系统消息失败,用户ID: {}, 消息: {}", rectifyUserId, titleRectify, e);
|
||||
}
|
||||
// 发送给需要通知的对象
|
||||
Set<Long> userIds = hazardRuleNotifyObjectService.queryNotifyObjectIds(hiddenDanger.getDangerLevelId(), projectId);
|
||||
SseMessageDto sseDto = new SseMessageDto();
|
||||
String title = "安全隐患工单[" + dangerCode + "]未进行整改,请及时关注!";
|
||||
for (Long userId : userIds) {
|
||||
try {
|
||||
chatServerHandler.sendSystemMessageToUser(userId, title, "2");
|
||||
} catch (Exception e) {
|
||||
log.error("异步发送系统消息失败,用户ID: {}, 消息: {}", userId, title, e);
|
||||
}
|
||||
}
|
||||
sseDto.setUserIds(userIds.stream().toList());
|
||||
sseDto.setMessage(title);
|
||||
sseDto.setRoute("");
|
||||
sseDto.setProjectId(projectId);
|
||||
sseDto.setIsRecord(true);
|
||||
SseMessageUtils.publishMessage(sseDto);
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,6 +16,9 @@ import org.dromara.common.mybatis.core.page.PageQuery;
|
||||
import org.dromara.common.mybatis.core.page.TableDataInfo;
|
||||
import org.dromara.common.sse.dto.SseMessageDto;
|
||||
import org.dromara.common.sse.utils.SseMessageUtils;
|
||||
import org.dromara.rabbitmq.domain.MqDelayTask;
|
||||
import org.dromara.rabbitmq.enums.MqDelayTaskTypeEnum;
|
||||
import org.dromara.rabbitmq.service.IMqDelayTaskService;
|
||||
import org.dromara.safety.domain.HazardHiddenDanger;
|
||||
import org.dromara.safety.domain.HazardHiddenDangerRectify;
|
||||
import org.dromara.safety.domain.HazardRule;
|
||||
@ -62,6 +65,8 @@ public class HazardHiddenDangerServiceImpl extends ServiceImpl<HazardHiddenDange
|
||||
|
||||
private final ChatServerHandler chatServerHandler;
|
||||
|
||||
private final IMqDelayTaskService mqDelayTaskService;
|
||||
|
||||
/**
|
||||
* 查询隐患信息
|
||||
*
|
||||
@ -257,10 +262,11 @@ public class HazardHiddenDangerServiceImpl extends ServiceImpl<HazardHiddenDange
|
||||
throw new ServiceException("数据保存失败");
|
||||
}
|
||||
Long projectId = hazardHiddenDanger.getProjectId();
|
||||
String dangerCode = hazardHiddenDanger.getDangerCode();
|
||||
// 通知对应人员
|
||||
CompletableFuture.runAsync(() -> {
|
||||
// 发送给整改人员
|
||||
String titleRectify = "您有新的安全隐患工单需要整改,请及时处理!";
|
||||
String titleRectify = "您有新的安全隐患工单[" + dangerCode + "]需要整改,请及时处理!";
|
||||
try {
|
||||
chatServerHandler.sendSystemMessageToUser(rectifyUserId, titleRectify, "2");
|
||||
SseMessageUtils.sendMessage(rectifyUserId, titleRectify);
|
||||
@ -270,7 +276,7 @@ public class HazardHiddenDangerServiceImpl extends ServiceImpl<HazardHiddenDange
|
||||
// 发送给需要通知的对象
|
||||
Set<Long> userIds = hazardRuleNotifyObjectService.queryNotifyObjectIds(hazardHiddenDanger.getDangerLevelId(), projectId);
|
||||
SseMessageDto sseDto = new SseMessageDto();
|
||||
String title = "您有新的安全隐患工单,请及时查看!";
|
||||
String title = "您有新的安全隐患工单[" + dangerCode + "],请及时查看!";
|
||||
for (Long userId : userIds) {
|
||||
try {
|
||||
chatServerHandler.sendSystemMessageToUser(userId, title, "2");
|
||||
@ -285,6 +291,15 @@ public class HazardHiddenDangerServiceImpl extends ServiceImpl<HazardHiddenDange
|
||||
sseDto.setIsRecord(true);
|
||||
SseMessageUtils.publishMessage(sseDto);
|
||||
// 发送整改期限数据到消息队列
|
||||
MqDelayTask task = new MqDelayTask();
|
||||
task.setBizType(MqDelayTaskTypeEnum.HIDDEN_DANGER.getType());
|
||||
task.setBizId(dto.getId());
|
||||
task.setExecuteTime(rectifyTime);
|
||||
try {
|
||||
mqDelayTaskService.addDelayTask(task);
|
||||
} catch (Exception e) {
|
||||
log.error("添加延迟任务失败,延迟任务: {}", task, e);
|
||||
}
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.dromara.common.core.constant.HttpStatus;
|
||||
import org.dromara.common.core.exception.ServiceException;
|
||||
@ -11,6 +12,7 @@ import org.dromara.common.core.utils.MapstructUtils;
|
||||
import org.dromara.common.core.utils.StringUtils;
|
||||
import org.dromara.common.mybatis.core.page.PageQuery;
|
||||
import org.dromara.common.mybatis.core.page.TableDataInfo;
|
||||
import org.dromara.safety.domain.HazardHiddenDanger;
|
||||
import org.dromara.safety.domain.HazardRule;
|
||||
import org.dromara.safety.domain.HazardRuleNotifyObject;
|
||||
import org.dromara.safety.domain.bo.HazardRuleBo;
|
||||
@ -18,8 +20,10 @@ import org.dromara.safety.domain.bo.HazardRuleNotifyObjectBo;
|
||||
import org.dromara.safety.domain.vo.HazardRuleNotifyObjectVo;
|
||||
import org.dromara.safety.domain.vo.HazardRuleVo;
|
||||
import org.dromara.safety.mapper.HazardRuleMapper;
|
||||
import org.dromara.safety.service.IHazardHiddenDangerService;
|
||||
import org.dromara.safety.service.IHazardRuleNotifyObjectService;
|
||||
import org.dromara.safety.service.IHazardRuleService;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@ -39,6 +43,10 @@ public class HazardRuleServiceImpl extends ServiceImpl<HazardRuleMapper, HazardR
|
||||
|
||||
private final IHazardRuleNotifyObjectService hazardRuleNotifyObjectService;
|
||||
|
||||
@Lazy
|
||||
@Resource
|
||||
private IHazardHiddenDangerService hazardHiddenDangerService;
|
||||
|
||||
/**
|
||||
* 查询隐患分级通知规则
|
||||
*
|
||||
@ -202,6 +210,11 @@ public class HazardRuleServiceImpl extends ServiceImpl<HazardRuleMapper, HazardR
|
||||
public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
|
||||
if (isValid) {
|
||||
//TODO 做一些业务上的校验,判断是否需要校验
|
||||
if (hazardHiddenDangerService.lambdaQuery()
|
||||
.in(HazardHiddenDanger::getDangerLevelId, ids)
|
||||
.count() > 0) {
|
||||
throw new ServiceException("请先删除该等级下的隐患信息", HttpStatus.ERROR);
|
||||
}
|
||||
}
|
||||
return baseMapper.deleteByIds(ids) > 0;
|
||||
}
|
||||
|
||||
@ -0,0 +1,7 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!DOCTYPE mapper
|
||||
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="org.dromara.rabbitmq.mapper.MqDelayTaskMapper">
|
||||
|
||||
</mapper>
|
||||
Reference in New Issue
Block a user