RabbitMQ之延迟消息

简介: RabbitMQ之延迟消息



前言

死信交换机、延迟消息


一、死信交换机

  • 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递
  • 如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
  • 死信交换机有什么作用呢?
  • 收集那些因处理失败而被拒绝的消息
  • 收集那些因队列满了而被拒绝的消息
  • 收集因TTL(有效期)到期的消息

二、延迟消息

上面讲诉的两种作用可以用来实现消费者重试的处理,即将处理失败、溢出的消息放在特定队列由人工处理,与消费者重试时讲的RepublishMessageRecoverer作用类似(在RabbitMQ之消费者的可靠性里讲过RepublishMessageRecoverer):

  • 收集那些因处理失败而被拒绝的消息
  • 收集那些因队列满了而被拒绝的消息

收集因TTL(有效期)到期的消息这个作用可以用来实现延迟消息

死信交换机实现延迟消息

  • 声明一个Fanout交换机ttl.fanout、一个队列ttl.queue、一个Direct交换机dragon.direct、一个队列directt.queue。发送的消息设置有效期RoutingKey是blue。
  • ttl.queue通过dead-letter-exchange属性绑定dragon.direct交换机,使dragon.direct成为死信交换机,这样ttl.queue队列中过期的消息成为死信就会自动到达dragon.direct中。
  • direct.queue队列通过RoutingKey与死信交换机dragon.direct绑定,且RoutingKey为blue,这样,过期的消息先到达死信交换机,因死信交换机与direct.queue通过RoutingKey绑定,过期的消息通过RoutingKey由死信交换机路由到direct.queue队列。
  • 此时若有消费者消费direct.queue队列,就实现了延迟消费,具体的延时时间就是设置的有效期时间。

图解流程

注意:

  • RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
  • 当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

DelayExchange插件实现延迟消息

安装插件

插件下载地址

将下载的文件放到了/mnt目录下,然后输入sudo docker ps命令查看自己的rabbitmq是否正在运行,如果不在运行则输入sudo docker start id这里填你自己的容器id,如果不知道自己id的,输入sudo docker pa -a查看。

当容器运行起来后,输入sudo docker cp /mnt/rabbitmq_delayed_message_exchange-3.12.0.ez rabbit:/plugins命令,将刚插件拷贝到容器内plugins目录下。

拷贝完成后,输入sudo docker exec -it rabbit /bin/bash命令,进入容器。

进入plugins文件夹

在容器内plugins目录下,查看插件是否上传成功ls -l|grep delay

然后启动插件,在当前目录下输入rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令

到这里插件安装就完成了,接下来我们需要重启RabbitMQ容器。执行exit命令退出RabbitMQ容器内部,然后执行docker restart 容器名命令重启RabbitMQ容器

声明延迟交换机

两种方式,自行选择

基于注解:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于bean:

package com.itheima.consumer.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }
    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

发送延迟消息

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

注意:

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息。


总结

以上就是延迟消息的详细讲解了。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
510 0
|
3月前
|
消息中间件 JSON Java
|
4月前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
4月前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
145 0
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
92 0
说说RabbitMQ延迟队列实现原理?
|
5月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
101 1
|
5月前
|
消息中间件 存储 RocketMQ
消息队列 MQ使用问题之进行超过3天的延迟消息投递,采用多次投递的策略是否有风险
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ产品使用合集之是否支持任意时间延迟的消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件
RabbitMQ 实现消息队列延迟
RabbitMQ 实现消息队列延迟
190 0
|
6月前
|
消息中间件 数据库
03.RabbitMQ延迟队列
03.RabbitMQ延迟队列
55 0