一、两种实现延迟队列方式对比
DLX(死信队列) + TTL(消息超时未消费)
实现方式
通过设置消息的TTL和死信交换机来实现延迟任务
对比
- 给队列设置 TTL,不能灵活动态配置
- 给消息设置 TTL,导致消息时序问题,已经过期了的消息被阻塞导致不能及时被消费
rabbitmq-delayed-message-exchange(插件)
实现方式
可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。消息发布在交换机 x-delayed-message 时,消息它不会立即进入对应队列,而是先将消息保存至 Mnesia (一个分布式数据库管理系统(DBMS)) ,然后插件会尝试确认是否过期,再投递到对应绑定的队列之中。
对比
- 可以动态灵活配置过期时间,已经过期了的消息不会被阻塞导致不能及时被消费
二、安装rabbitmq-delayed-message-exchange插件
- 下载插件
首先根据你所安装的RabbitMQ版本来选择对应的版本
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 安装插件
将下载好的插件复制到RabbitMQ的插件目录中,使用以下命令:
cp rabbitmq_delayed_message_exchange.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.11.1/plugins/
其中,/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.3/plugins/是RabbitMQ插件目录的路径,具体路径根据实际情况而定。
然后使用以下命令启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
扩展:关闭该插件使用:
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
- 验证插件
安装完成后,可以使用以下命令验证插件是否安装成功:
rabbitmq-plugins list
如果插件列表中包含rabbitmq_delayed_message_exchange,则说明插件安装成功。
重启RabbitMQ使安装新插件生效:
sudo systemctl restart rabbitmq-server
重启后,前往 RabbitMQ web 控制台中,在创建交换机的选项中我们能发现多了一项类型:x-delayed-message
三、SpringBoot使用
- 首先,在pom.xml文件中添加RabbitMQ依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在application.yml文件中添加RabbitMQ配置:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
- 创建一个延迟消息的交换机,并将其绑定到一个队列上:
@Configuration
public class RabbitMQConfig {
// 定义延迟消息的交换机名称
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
// 定义延迟消息的队列名称
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
// 定义延迟消息的路由键名称
public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";
// 创建延迟消息的交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 创建延迟消息的队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 将延迟消息的队列绑定到延迟消息的交换机上
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
}
}
- 创建一个生产者,用于发送延迟消息:
@Component
public class DelayedMessageProducer {
@Autowired
private AmqpTemplate amqpTemplate;
// 发送延迟消息
public void sendDelayedMessage(String message, int delayTime) {
// 设置消息的延迟时间
MessagePostProcessor messagePostProcessor = message1 -> {
message1.getMessageProperties().setDelay(delayTime);
return message1;
};
// 发送消息到延迟消息的交换机上
amqpTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME, RabbitMQConfig.DELAYED_ROUTING_KEY, message, messagePostProcessor);
}
}
- 创建一个消费者,用于消费延迟消息:
@Component
public class DelayedMessageConsumer {
@RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedMessage(String message) {
System.out.println("Received delayed message: " + message);
}
}
- 在业务代码中使用生产者发送延迟消息:
@RestController
public class DemoController {
@Autowired
private DelayedMessageProducer delayedMessageProducer;
@GetMapping("/sendDelayedMessage")
public String sendDelayedMessage() {
// 发送延迟消息,延迟时间为10秒
delayedMessageProducer.sendDelayedMessage("Hello, delayed message!", 10000);
return "Delayed message sent!";
}
}
四、局限性
- 由于 “x-delayed-type” 参数,可以使用此交换机来代替其他交换机,因为 “x-delayed-message” 交换机只是充当代理,可能会对性能产生影响,比实际使用的基本交换机要慢。
- 使用 Erlang 的定时器,所以有延时时长:0<=n<=(2^32)-1 ,单位毫秒。
- 目前这个插件的设计并不适合大量延迟消息的情况(例如100条数千条或数百万条)。详见 #72](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72)
评论区