SpringBoot整合RabbitMQ实现死信队列

简介: 前面一文通过[Java整合RabbitMQ实现生产消费(7种通讯方式)](https://wangbinguang.blog.csdn.net/article/details/128284902),本文基于SpringBoot实现RabbitMQ中的死信队列和延迟队列。

前面一文通过Java整合RabbitMQ实现生产消费(7种通讯方式),本文基于SpringBoot实现RabbitMQ中的死信队列和延迟队列。

概念介绍

什么是死信

死信可以理解成没有被正常消费的消息,在RabbitMQ中以下几种情况会被认定为死信:

  1. 消费者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。
  2. 消息到达生存时间还未被消费。
  3. 队列超过长度限制,消息被丢弃。

这些消息会被发送到死信交换机并路由到死信队列中(在RabbitMQ中死信交换机和死信队列就是普通的交换机和队列)。其流转过程如下图
在这里插入图片描述

死信队列应用

  • 作为消息可靠性的一个扩展。比如,在队列已满的情况下也不会丢失消息。
  • 可以实现延迟消费功能。比如,订单15分钟内未支付。

注意事项:基于死信队列实现的延迟消费不适合时间过于复杂的场景。比如,一个队列中第一条消息TTL为10s,第二条消息TTL为5s,由于RabbitMQ只会监听第一条消息,所以本应第二条消息先达到TTL会在第一条消息的TTL之后。对于该现象有两种解决方案:

  1. 维护多个队列,每个队列维护一个TTL时间。
  2. 使用延迟交换机。这种方式需要下载插件支持,参考链接:RabbitMQ插件

    工程搭建

    环境说明

  • RabbitMQ环境,参考RabbitMQ环境搭建
  • Java版本:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA

    搭建步骤

  1. 创建SpringBoot项目。
  2. pom.xml文件导入RabbitMQ依赖。
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  1. application.yml文件添加RabbitMQ配置。
spring:
  # rabbitmq配置信息 RabbitProperties类
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启confirm机制
    publisher-confirm-type: correlated
    # 开启return机制
    publisher-returns: true
    #全局配置,局部配置存在就以局部为准
    listener:
      simple:
        acknowledge-mode: manual # 手动ACK

实现死信

准备Exchange&Queue

@Configuration
public class RabbitMQConfig {
   
   

    /**
     * 正常队列
     */
    public static final String EXCHANGE = "boot-exchange";

    public static final String QUEUE = "boot-queue";

    public static final String ROUTING_KEY = "boot-rout";

    /**
     * 死信队列
     */
    public static final String DEAD_EXCHANGE = "dead-exchange";

    public static final String DEAD_QUEUE = "dead-queue";

    public static final String DEAD_ROUTING_KEY = "dead-rout";

    /**
     * 声明死信交换机
     *
     * @return
     */
    @Bean
    public Exchange deadExchange() {
   
   
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();
    }

    /**
     * 声明死信队列
     *
     * @return
     */
    @Bean
    public Queue deadQueue() {
   
   
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }


    /**
     * 绑定死信的队列和交换机
     *
     * @param deadExchange
     * @param deadQueue
     * @return
     */
    @Bean
    public Binding deadBind(Exchange deadExchange, Queue deadQueue) {
   
   
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }

    /**
     * 声明交换机,同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
     *
     * @return
     */
    @Bean
    public Exchange bootExchange() {
   
   
        return ExchangeBuilder.directExchange(EXCHANGE).build();
    }

    /**
     * 声明队列,同channel.queueDeclare(QUEUE, true, false, false, null);
     * 绑定死信交换机及路由key
     *
     * @return
     */
    @Bean
    public Queue bootQueue() {
   
   
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                //给队列设置消息时长
                //.ttl(10000)
                //队列最大长度
                .maxLength(1)
                .build();
    }

    /**
     * 绑定队列和交换机,同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
     *
     * @param bootExchange
     * @param bootQueue
     * @return
     */
    @Bean
    public Binding bootBind(Exchange bootExchange, Queue bootQueue) {
   
   
        return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
    }

}

监听死信队列

    @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
    public void listener_dead(String msg, Channel channel, Message message) throws IOException {
   
   
        System.out.println("死信接收到消息" + msg);
        System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId());
        System.out.println("messageID:" + message.getMessageProperties().getMessageId());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

方式一——消费者拒绝&否认

  • 拒绝消息
      @RabbitListener(queues = RabbitMQConfig.QUEUE)
      public void listener(String msg, Channel channel, Message message) throws IOException {
         
         
          System.out.println("接收到消息" + msg);
          channel.basicReject(message.getMessageProperties().getDeliveryTag(), false)
      }
    
  • 否认消息
      @RabbitListener(queues = RabbitMQConfig.QUEUE)
      public void listener(String msg, Channel channel, Message message) throws IOException {
         
         
          System.out.println("接收到消息" + msg);
           channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
      }
    

    方式二——超过消息TTL

  • 发送消息时设置TTL

    @SpringBootTest
    public class Publisher {
         
         
    
      @Autowired
      private RabbitTemplate template;
          /**
       * 5秒未被消费会路由到死信队列
       */
      @Test
      public void publish_expir() {
         
         
          template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> {
         
         
              message.getMessageProperties().setExpiration("5000");
              return message;
          });
      }
    }
    
  • 设置队列所有消息的TTL
    更新RabbitMQConfig类中bootQueue() ,更新后需要删除队列,因为队列属性有更改
      @Bean
      public Queue bootQueue() {
         
         
          return QueueBuilder.durable(QUEUE)
                  .deadLetterExchange(DEAD_EXCHANGE)
                  .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                  //声明队列属性有更改时需要删除队列
                  //给队列设置消息时长
                  .ttl(10000)
                  .build();
      }
    

    方式三——超过队列长度限制

    设置队列长度限制,当队列长度超过设置的阈值,消息便会路由到死信队列。
      @Bean
      public Queue bootQueue() {
         
         
          return QueueBuilder.durable(QUEUE)
                  .deadLetterExchange(DEAD_EXCHANGE)
                  .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                  //声明队列属性有更改时需要删除队列
                  .maxLength(1)
                  .build();
      }
    

    代码仓库

    点我
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
21天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
212 1
|
28天前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
28天前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
5月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
195 32
|
4月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
591 0
|
10月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
152 6
|
12月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
696 6
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列