RabbitMQ实现延时队列

简介: RabbitMQ实现延时队列

在RabbitMQ中本身是不存在延时队列,如果需要使用RabbitMQ来实现延时队列,有两种方式:


  • 第一种:DLX+TTL(Time To Live);
    设置TTL分为两种:在队列属性中设置TTL,在消息属性中设置TTL
  • 第二种:使用延时消息插件;


1. DLX+TTL模拟延时队列


通过过期消息和死信队列来模拟出延时队列,消费者监听死信交换器绑定的队列,而不要监听消息发送的队列,这样就可以模拟出延时队列了。


网络异常,图片无法展示
|


通过过期消息和死信队列来模拟出延时队列:


前面讲了过期消息有两种实现方法,第一种是通过对队列进行设置,第二种是通过对消息本身进行设置。


第一种方法通过队列设置,需要在队列声明的时候设置才有效。而如果使用这种方法,那么每增加一个新的时间需求,就要增加一个队列,显然这种方法不够灵活。

既然第一种方法不够灵活,那么咱通过第二种方法就可以实现灵活性。然而事情并没有那么简单,因为前面已经讲了,如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“。


因为RabbitMQ只会检查队列头部的消息是否过期,如果过期则丢到死信队列,所以如果队列中第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。


所以不管是在消息维度或是队列维度设置过期时间绑定死信队列模拟延时队列,归根结底都是在队列上实现消息的延迟,这样方式存在不灵活或者不及时的时序问题。


而使用延时消息插件,是自定义交换机,让交换机拥有了延迟发送消息的能力,从而实现消息的精准延时。下面就简单介绍一下。


2. 使用延时消息插件


使用延时消息插件需要安装延时消息插件(rabbitmq-delayed-message-exchange),我们可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。


实现原理:


使用DLX+TTL的模式,消息首先会路由到一个正常的队列,然后根据设置的TTL进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,而先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。目前资料介绍的不是很多)

这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。


网络异常,图片无法展示
|


总结一下,可以看得出来,通过过期消息和死信队列虽然可以模拟延时队列,但还需要创建两个交换机(死信队列交换机+处理队列交换机)、两个队列(死信队列+处理队列),更何况无法达到一个灵活通用的延迟队列。而使用rabbitmq的延时消息插件方式,只需要创建一个交换机和一个队列,就可以做到延时灵活,明显这种方式使用起来更简单、更灵活通用。


局限性:


延时消息插件实现RabbitMQ延时队列这种方式也不完全是一个银弹,它将延迟消息存在于Mnesia表中,并且在当前节点上具有单个磁盘副本,它们将在节点重启之后幸存。

目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景,详情参见 #/issues/72 另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。


插件的禁用要慎重,以下方式可以实现将插件禁用,但是注意如果此时还有延迟消息未消费,那么禁掉此插件后所有的未消费的延迟消息将丢失。

rabbitmq-plugins disable rabbitmq_delayed_message_exchange


如果你采用了 Delayed Message 插件这种方式来实现,对于消息可用性要求较高的,在发现消息之前可以先落入 DB 打标记,消费之后将消息标记为已消费,中间可以加入定时任务做检测,这可以进一步保证你的消息的可靠性。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
58 1
|
2月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
202 6
|
3月前
|
消息中间件 JSON Java
|
3月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
100 0
|
4月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
122 2
|
5月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
81 0
说说RabbitMQ延迟队列实现原理?
|
5月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
155 1
|
6月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。