RabbitMQ之死信队列解读

简介: RabbitMQ之死信队列解读

基本介绍

什么是死信交换机

在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。

什么是死信队列

死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已

RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。

要注意的是,DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。当这个队列存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

消息进入到死信队列的情况

消息过期

MessageProperties messageProperties=new MessageProperties();
//设置此条消息的过期时间为10秒
messageProperties.setExpiration("10000");

队列过期

 Map<String, Object> arguments =new HashMap<>();
 //指定死信交换机,通过x-dead-letter-exchange 来设置
 arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);
 //设置死信路由key,value 为死信交换机和死信队列绑定的key
 arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);
 //队列的过期时间
 arguments.put("x-message-ttl",10000);
return  new Queue(QUEUE_NORMAL,true,false,false,arguments);

TTL: Time to Live的简称,过期时间

队列达到最大长度(先入队的消息会被发送到DLX)

Map<String, Object> arguments = new HashMap<String, Object>();
//设置队列的最大长度 ,对头的消息会被挤出变成死信
arguments.put("x-max-length", 5);

消费者拒绝消息不进行重新投递

从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。

application.yml 启动手动确认

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
 /**
     * 监听正常的那个队列的名字,不是监听那个死信队列
     * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
     *
     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
     */
    @RabbitListener(queues = {RabbitConfig.QUEUE})
    public void process(Message message, Channel channel) {
        System.out.println("接收到的消息:" + message);
        //对消息不确认, ack单词是 确认 的意思
        try {
            System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
            //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:消息的一个数字标签
  • multiple:翻译成中文是多个的意思,如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack
  • requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列

消费者拒绝消息

开启手动确认模式,并拒绝消息,不重新投递,则进入死信队列

  /**
     * 监听正常的那个队列的名字,不是监听那个死信队列
     * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
     *
     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
     */
    @RabbitListener(queues = {RabbitConfig.QUEUE})
    public void process(Message message, Channel channel) {
        System.out.println("接收到的消息:" + message);
        try {
            System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
            //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

  • basicReject是否定的交付,一般在消费消息时出现异常等的时候执行。可以将该消息丢弃或重排序去重新处理消息
  • 参数1: 消费消息的index
  • 参数2: 对异常消息的处理,true表示重排序,false表示丢弃
  • Reject 在拒绝消息时,可以使用 requeue 标识,告诉 RabbitMQ 是否需要重新发送给别的消费者。如果是 false 则不重新发送,一般这个消息就会被RabbitMQ 丢弃。Reject 一次只能拒绝一条消息。如果是 true 则消息发生了重新投递。
  • Nack 跟 Reject 类似,只是它可以一次性拒绝多个消息。也可以使用 requeue 标识,这是 RabbitMQ 对 AMQP 规范的一个扩展。
  • 通过 RejectRequeuConsumer 可以看到无论是使用 Reject 方式还是 Nack 方式,当 requeue
  • 参数设置为 true 时,消息发生了重新投递。当 requeue 参数设置为 false 时,消息丢失了。

springboot代码实战

实战架构

如上图,消息到达正常的交换机exchange.nomal.a,通过与正常的队列queue.noaml.a绑定,消息会到达正常队列,如果消息变为死消息以后则会转发到与正常队列绑定的死信交换机中,死信交换机会转发到与其绑定的死信队列queue.deal.a。

工程概述

工程采用springboot架构,主要用到的依赖为:

<!--        rabbit的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

application.yml配置文件如下:

server:
  port: 8080
spring:
  rabbitmq:
    host: 123.249.70.148
    port: 5673
    username: admin
    password: 123456
    virtual-host: /

RabbitConfigDeal 配置类:创建队列及交换机并进行绑定

@Configuration
public class RabbitConfigDeal {
}

创建正常交换机

   @Bean
    public DirectExchange normalExchange(){
        return ExchangeBuilder.directExchange("exchange.normal.a").build();
    }

创建死信交换机

    @Bean
    public DirectExchange deadExchange(){
        return ExchangeBuilder.directExchange("exchange.dead.a").build();
    }

创建死信队列

    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable("queue.dead.a").build();
    }

创建正常队列,设置他的绑定死信交换机,以及对应绑定的路由key为order

    @Bean
    public Queue normalQueue(){
        Map<String, Object> arguments =new HashMap<>();
        arguments.put("x-message-ttl",20000);
        arguments.put("x-dead-letter-exchange","exchange.dead.a");
        arguments.put("x-dead-letter-routing-key","order");
        return QueueBuilder.durable("queue.normal.a")
                .withArguments(arguments).build();
    }

绑定正常交换机和正常队列

    @Bean
    public Binding bindingNormal(DirectExchange normalExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
    }

绑定死信交换机和死信队列

    @Bean
    public Binding bindingNormal(DirectExchange normalExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
    }

MessageService业务类:发送消息及接收消息

@Component
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;
}

发送消息方法

    public void sendMsg(){
        //添加消息属性
        Message message = MessageBuilder.withBody("hello word!".getBytes(StandardCharsets.UTF_8))
               .build();
        rabbitTemplate.convertAndSend("exchange.normal.a","order",message);
        log.info("发送消息时间:{}",new Date());
    }

这里用的路由key为info

MessageConvert

  • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
  • RabbitMQ 的序列化是指 Messagebody 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter默认)、Jackson2JsonMessageConverter

 接受消息

    @RabbitListener(queues = {"queue.dead.a"})
    public void receiveMsg(Message message){
        byte[] body = message.getBody();
        String queue = message.getMessageProperties().getConsumerQueue();
        String msg=new String(body);
        log.info("{}接收到消息时间:{},消息为{}",queue,new Date(),msg);
    }

Message

在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

  1. MessageProperties // 消息属性
  2. byte[] body // 消息内容

@RabbitListener

使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

  • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)
  • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
  • application/octet-stream:二进制字节数组存储,使用 byte[]
  • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
  • text/plain:文本数据类型存储,使用 String
  • application/json:JSON 格式,使用 Object、相应类型

主启动类RabbitMq01Application:实现ApplicationRunner接口

/**
 * @author 风轻云淡
 */
@SpringBootApplication
public class RabbitMq01Application implements ApplicationRunner {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMq01Application.class, args);
    }
    @Resource
    private MessageService messageService;
    /**
     * 程序一启动就会调用该方法
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

在SpringBoot中,提供了一个接口:ApplicationRunner。 该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。

由于该方法是在容器启动完成之后,才执行的,所以,这里可以从spring容器中拿到其他已经注入的bean。

启动主启动类后查看控制台:

2023-09-28 10:46:17.772  INFO 71700 --- [           main]
 c.e.rabbitmq01.service.MessageService    :
 发送消息时间:Thu Sep 28 10:46:17 CST 2023
2023-09-28 10:46:37.824  INFO 71700 --- [ntContainer#0-1] 
c.e.rabbitmq01.service.MessageService    : 
queue.dead.a接收到消息时间:Thu Sep 28 10:46:37 CST 2023,消息为hello word!

我们在这里可以看见17s的时候发送了消息,在经过了20s,即到37s的时候我们在死信队列queue.dead.a接受到了消息。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
151 6
|
3月前
|
消息中间件 JSON Java
|
3月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
93 0
|
4月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
109 2
|
3月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
67 0
|
5月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
72 0
说说RabbitMQ延迟队列实现原理?
|
5月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
140 1
|
5月前
|
消息中间件
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
RabbitMQ 死信消息队列 重复消费 basicAck basicNack