前言
死信交换机、延迟消息
一、死信交换机
- 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue参数设置为false- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
- 如果一个队列中的消息已经成为死信,并且这个队列通过
dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。- 死信交换机有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 收集因TTL(有效期)到期的消息
二、延迟消息
上面讲诉的两种作用可以用来实现消费者重试的处理,即将处理失败、溢出的消息放在特定队列由人工处理,与消费者重试时讲的RepublishMessageRecoverer作用类似(在RabbitMQ之消费者的可靠性里讲过RepublishMessageRecoverer):
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
收集因TTL(有效期)到期的消息这个作用可以用来实现
延迟消息
死信交换机实现延迟消息
- 声明一个Fanout交换机ttl.fanout、一个队列ttl.queue、一个Direct交换机dragon.direct、一个队列directt.queue。发送的消息设置有效期和RoutingKey是blue。
- ttl.queue通过dead-letter-exchange属性绑定dragon.direct交换机,使dragon.direct成为死信交换机,这样ttl.queue队列中过期的消息成为死信就会自动到达dragon.direct中。
- direct.queue队列通过RoutingKey与死信交换机dragon.direct绑定,且RoutingKey为blue,这样,过期的消息先到达死信交换机,因死信交换机与direct.queue通过RoutingKey绑定,过期的消息通过RoutingKey由死信交换机路由到direct.queue队列。
- 此时若有消费者消费direct.queue队列,就实现了延迟消费,具体的延时时间就是设置的有效期时间。
图解流程
注意:
- RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
- 当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。
DelayExchange插件实现延迟消息
安装插件
将下载的文件放到了/mnt目录下,然后输入sudo docker ps命令查看自己的rabbitmq是否正在运行,如果不在运行则输入
sudo docker start id
这里填你自己的容器id,如果不知道自己id的,输入sudo docker pa -a
查看。
当容器运行起来后,输入
sudo docker cp /mnt/rabbitmq_delayed_message_exchange-3.12.0.ez rabbit:/plugins
命令,将刚插件拷贝到容器内plugins目录下。
拷贝完成后,输入
sudo docker exec -it rabbit /bin/bash
命令,进入容器。进入plugins文件夹
在容器内plugins目录下,查看插件是否上传成功
ls -l|grep delay
然后启动插件,在当前目录下输入
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
命令
到这里插件安装就完成了,接下来我们需要重启RabbitMQ容器。执行
exit
命令退出RabbitMQ容器内部,然后执行docker restart 容器名
命令重启RabbitMQ容器
声明延迟交换机
两种方式,自行选择
基于注解:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayMessage(String msg){ log.info("接收到delay.queue的延迟消息:{}", msg); }
基于bean:
package com.itheima.consumer.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class DelayExchangeConfig { @Bean public DirectExchange delayExchange(){ return ExchangeBuilder .directExchange("delay.direct") // 指定交换机类型和名称 .delayed() // 设置delay的属性为true .durable(true) // 持久化 .build(); } @Bean public Queue delayedQueue(){ return new Queue("delay.queue"); } @Bean public Binding delayQueueBinding(){ return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay"); } }
发送延迟消息
@Test void testPublisherDelayMessage() { // 1.创建消息 String message = "hello, delayed message"; // 2.发送消息,利用消息后置处理器添加消息头 rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 添加延迟消息属性 message.getMessageProperties().setDelay(5000); return message; } }); }
注意:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息。
总结
以上就是延迟消息的详细讲解了。