SpringBoot+RabbitMQ使用插件方式实现延迟队列

bridge
2023-04-26 / 0 评论 / 0 点赞 / 966 阅读 / 3,464 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2023-04-26,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

一、两种实现延迟队列方式对比

DLX(死信队列) + TTL(消息超时未消费)

实现方式

通过设置消息的TTL和死信交换机来实现延迟任务

对比

  1. 给队列设置 TTL,不能灵活动态配置
  2. 给消息设置 TTL,导致消息时序问题,已经过期了的消息被阻塞导致不能及时被消费

rabbitmq-delayed-message-exchange(插件)

实现方式

可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。消息发布在交换机 x-delayed-message 时,消息它不会立即进入对应队列,而是先将消息保存至 Mnesia (一个分布式数据库管理系统(DBMS)) ,然后插件会尝试确认是否过期,再投递到对应绑定的队列之中。

对比

  1. 可以动态灵活配置过期时间,已经过期了的消息不会被阻塞导致不能及时被消费

二、安装rabbitmq-delayed-message-exchange插件

  1. 下载插件

首先根据你所安装的RabbitMQ版本来选择对应的版本

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
  1. 安装插件

将下载好的插件复制到RabbitMQ的插件目录中,使用以下命令:

cp rabbitmq_delayed_message_exchange.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.11.1/plugins/

其中,/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.3/plugins/是RabbitMQ插件目录的路径,具体路径根据实际情况而定。

然后使用以下命令启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

扩展:关闭该插件使用:

rabbitmq-plugins disable rabbitmq_delayed_message_exchange
  1. 验证插件

安装完成后,可以使用以下命令验证插件是否安装成功:

rabbitmq-plugins list

如果插件列表中包含rabbitmq_delayed_message_exchange,则说明插件安装成功。

重启RabbitMQ使安装新插件生效:

sudo systemctl restart rabbitmq-server

重启后,前往 RabbitMQ web 控制台中,在创建交换机的选项中我们能发现多了一项类型:x-delayed-message

三、SpringBoot使用

  1. 首先,在pom.xml文件中添加RabbitMQ依赖:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在application.yml文件中添加RabbitMQ配置:
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  1. 创建一个延迟消息的交换机,并将其绑定到一个队列上:
@Configuration
public class RabbitMQConfig {
    // 定义延迟消息的交换机名称
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    // 定义延迟消息的队列名称
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    // 定义延迟消息的路由键名称
    public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";

    // 创建延迟消息的交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    // 创建延迟消息的队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    // 将延迟消息的队列绑定到延迟消息的交换机上
    @Bean
    public Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
    }
}
  1. 创建一个生产者,用于发送延迟消息:
@Component
public class DelayedMessageProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    // 发送延迟消息
    public void sendDelayedMessage(String message, int delayTime) {
        // 设置消息的延迟时间
        MessagePostProcessor messagePostProcessor = message1 -> {
            message1.getMessageProperties().setDelay(delayTime);
            return message1;
        };
        // 发送消息到延迟消息的交换机上
        amqpTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME, RabbitMQConfig.DELAYED_ROUTING_KEY, message, messagePostProcessor);
    }
}
  1. 创建一个消费者,用于消费延迟消息:
@Component
public class DelayedMessageConsumer {
    @RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayedMessage(String message) {
        System.out.println("Received delayed message: " + message);
    }
}
  1. 在业务代码中使用生产者发送延迟消息:
@RestController
public class DemoController {
    @Autowired
    private DelayedMessageProducer delayedMessageProducer;

    @GetMapping("/sendDelayedMessage")
    public String sendDelayedMessage() {
        // 发送延迟消息,延迟时间为10秒
        delayedMessageProducer.sendDelayedMessage("Hello, delayed message!", 10000);
        return "Delayed message sent!";
    }
}

四、局限性

  1. 由于 “x-delayed-type” 参数,可以使用此交换机来代替其他交换机,因为 “x-delayed-message” 交换机只是充当代理,可能会对性能产生影响,比实际使用的基本交换机要慢。
  2. 使用 Erlang 的定时器,所以有延时时长:0<=n<=(2^32)-1 ,单位毫秒。
  3. 目前这个插件的设计并不适合大量延迟消息的情况(例如100条数千条或数百万条)。详见 #72](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72)
0

评论区