在RabbitMQ中本身是不存在延时队列,如果需要使用RabbitMQ来实现延时队列,有两种方式:
- 第一种:DLX+TTL(Time To Live);
设置TTL分为两种:在队列属性中设置TTL,在消息属性中设置TTL - 第二种:使用延时消息插件;
1. DLX+TTL模拟延时队列
通过过期消息和死信队列来模拟出延时队列,消费者监听死信交换器绑定的队列,而不要监听消息发送的队列,这样就可以模拟出延时队列了。
通过过期消息和死信队列来模拟出延时队列:
前面讲了过期消息有两种实现方法,第一种是通过对队列进行设置,第二种是通过对消息本身进行设置。
第一种方法通过队列设置,需要在队列声明的时候设置才有效。而如果使用这种方法,那么每增加一个新的时间需求,就要增加一个队列,显然这种方法不够灵活。
既然第一种方法不够灵活,那么咱通过第二种方法就可以实现灵活性。然而事情并没有那么简单,因为前面已经讲了,如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“。
因为RabbitMQ只会检查队列头部的消息是否过期,如果过期则丢到死信队列,所以如果队列中第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。
所以不管是在消息维度或是队列维度设置过期时间绑定死信队列模拟延时队列,归根结底都是在队列上实现消息的延迟,这样方式存在不灵活或者不及时的时序问题。
而使用延时消息插件,是自定义交换机,让交换机拥有了延迟发送消息的能力,从而实现消息的精准延时。下面就简单介绍一下。
2. 使用延时消息插件
使用延时消息插件需要安装延时消息插件(rabbitmq-delayed-message-exchange),我们可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。
实现原理:
使用DLX+TTL的模式,消息首先会路由到一个正常的队列,然后根据设置的TTL进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,而先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。目前资料介绍的不是很多)
这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。
总结一下,可以看得出来,通过过期消息和死信队列虽然可以模拟延时队列,但还需要创建两个交换机(死信队列交换机+处理队列交换机)、两个队列(死信队列+处理队列),更何况无法达到一个灵活通用的延迟队列。而使用rabbitmq的延时消息插件方式,只需要创建一个交换机和一个队列,就可以做到延时灵活,明显这种方式使用起来更简单、更灵活通用。
局限性:
延时消息插件实现RabbitMQ延时队列这种方式也不完全是一个银弹,它将延迟消息存在于Mnesia表中,并且在当前节点上具有单个磁盘副本,它们将在节点重启之后幸存。
目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景,详情参见 #/issues/72 另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。
插件的禁用要慎重,以下方式可以实现将插件禁用,但是注意如果此时还有延迟消息未消费,那么禁掉此插件后所有的未消费的延迟消息将丢失。
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
如果你采用了 Delayed Message 插件这种方式来实现,对于消息可用性要求较高的,在发现消息之前可以先落入 DB 打标记,消费之后将消息标记为已消费,中间可以加入定时任务做检测,这可以进一步保证你的消息的可靠性。