rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景

简介: rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景

创建项目引入依赖

1. 创建maven项目

2. 引入相应的依赖以及配置文件

3. rabbitmq依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
   <version>2.0.1.RELEASE</version>
</dependency>


4. 配置文件配置


server:
  port: 1111#端口号
spring:
  rabbitmq:
    host: 127.0.0.1 #rabbitmq主机
    port: 5672    #端口
    username: guest #用户名
    password: guest #密码
    virtual-host: /  #权限
    listener:
      type: simple  #容器类型.simple或direct
      simple:
        default-requeue-rejected: false  #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
        #手动接收消息
        acknowledge-mode: manual  #ack模式

声明交换机和队列

1. 声明常量作为交换机以及队列的名称


声明这里在类添加configuration注解 把声明的Bean添加到spring容器当中

Ctrl+shift+u 转换大小写

这里队列声明的后面带个数字就是随便写的,本人队列有点多所有后面添了一个标识。名称自己换一换就行

  //延迟交换机fanout
    public static final String DELAY_EXCHANGE_NAME_30="delay_exchange_name_30";
    //延时队列A
    public static final String DELAY_QUEUEA_NAME_30="delay_queuea_name_30";
    //延时队列B
    public static final String DELAY_QUEUEB_NAME_30="delay_queueb_name_30";
    //死信交换机
    public static final String DEAD_LETTER_EXCHANGE_NAME_30="dead_letter_exchange_name_30";
    //死信队列A
    public static final String DEAD_LETTER_QUEUEA_NAME_30="dead_letter_queuea_name_30";
    //死信队列B
    public static final String DEAD_LETTER_QUEUEB_NAME_30="dead_letter_queueb_name_30";

2. 声明延时交换机以及死信交换机(其实就是普通的交换机)


/**
     * 声明延时交换机 fanout
     * @return
     */
    @Bean(DELAY_EXCHANGE_NAME_30)
    public Exchange DELAY_EXCHANGE_NAME_30(){
        return ExchangeBuilder.fanoutExchange(DELAY_EXCHANGE_NAME_30).durable(true).build();
    }
    /**
     * 声明死信交换机 direct
     * @return
     */
    @Bean(DEAD_LETTER_EXCHANGE_NAME_30)
    public Exchange DEAD_LETTER_EXCHANGE_NAME_30(){
        return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE_NAME_30).durable(true).build();
    }

延时交换机我们设置为fanout,当然也可以是别的

而死信交换机不能设置为fanout,否则一个私信队列能接收到所有延时队列的消息

还不明白的话可以自己测试一下


3. 声明延时队列并指定对应的死信队列以及交换机(等设置延时时间过期之后发送到对应绑定的死信队列)


  /**
     * 声明延时队列A
     * 并绑定死信交换机 和 死信队列A  根据路由key
     * @return
     */
    @Bean(DELAY_QUEUEA_NAME_30)
    public Queue DELAY_QUEUEA_NAME_30(){
        Map map=new HashMap();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME_30);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_NAME_30);
        //设置此队列延时时间 6秒
        map.put("x-message-ttl",6000);
        return QueueBuilder.durable(DELAY_QUEUEA_NAME_30).withArguments(map).build();
    }
    /**
     * 声明延时队列B
     * 并绑定死信交换机 和 死信队列B  根据路由key
     * @return
     */
    @Bean(DELAY_QUEUEB_NAME_30)
    public Queue DELAY_QUEUEB_NAME_30(){
        Map map=new HashMap();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME_30);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_NAME_30);
        //设置此队列延时时间 12秒
        map.put("x-message-ttl",12000);
        return QueueBuilder.durable(DELAY_QUEUEB_NAME_30).withArguments(map).build();
    }

map参数解释

x-dead-letter-exchange 绑定的死信交换机

x-dead-letter-routing-key 死信队列的路由key

x-message-ttl 延时时间,也就是过期时间 毫秒单位

声明队列进行绑定,druable参数是声明队列的名称


4. 声明死信队列 (就是正常队列)


  /**
     * 声明死信队列A
     * @return
     */
    @Bean(DEAD_LETTER_QUEUEA_NAME_30)
    public Queue DEAD_LETTER_QUEUEA_NAME_30(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME_30);
    }
    /**
     * 声明死信队列B
     * @return
     */
    @Bean(DEAD_LETTER_QUEUEB_NAME_30)
    public Queue DEAD_LETTER_QUEUEB_NAME_30(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME_30);
    }

5. 延时队列绑定交换机


  /**
     * 延时队列A绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayQueueABinding(
            @Qualifier(DELAY_QUEUEA_NAME_30)Queue queue,
            @Qualifier(DELAY_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
    /**
     * 延时队列B绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayQueueBBinding(
            @Qualifier(DELAY_QUEUEB_NAME_30)Queue queue,
            @Qualifier(DELAY_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }

. 因为延时交换机是fanout类型的所以路由key可以不用设置,设置了也不会用到


6. 死信队列绑定交换机

  /**
     * 死信队列A绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterQueueABinding(
            @Qualifier(DEAD_LETTER_QUEUEA_NAME_30)Queue queue,
            @Qualifier(DEAD_LETTER_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_NAME_30).noargs();
    }
    /**
     * 死信队列B绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterQueueBBinding(
            @Qualifier(DEAD_LETTER_QUEUEB_NAME_30)Queue queue,
            @Qualifier(DEAD_LETTER_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_NAME_30).noargs();
    }

.死信交换机类型是direct,我们要指定路由key,在我们声明延时队列已经指定相应的路由key了,所以这里要和声明延时队列指定的路由key要一致哦

7. 绑定参数详解


方法两个参数,队列以及交换机

@Qualifier(name)里就是我们声明@Bean(name)的队列,实现注入,交换机也一样

返回值里的 bind参数就是我们要绑定的队列

to就是绑定到哪个交换机

with就是路由key

生产者发送消息

1. 生产者发送消息(我们在测试类写)


具体看自己业务的需求写在哪里

  @Resource
  private RabbitTemplate rabbitTemplate;
  @Test
  public void test1(){
      Map map=new HashMap();
      map.put("name","张三");
      map.put("age","18");
      //第一个参数发送给哪个交换机   第二个路由key  我们延时交换机是fanout所以路由key为空   第三个发送对象
      rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME_30,"",map);
  }

注入对应模板对象,依赖里面的

conVerAndSend发送消息

消费者监听死信队列

1. 监听死信队列


@Component
public class RabbitmqDeadLetterListener {
    /**
     * 监听死信队列A
     * @param map
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {RabbitmqConfig.DEAD_LETTER_QUEUEA_NAME_30})
    public void receiverA(Map map, Message message, Channel channel) throws IOException {
        System.out.println("死信队列A接收到了消息--------");
        System.out.println(map);
        //接收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
    /**
     * 监听死信队列B
     * @param map
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {RabbitmqConfig.DEAD_LETTER_QUEUEB_NAME_30})
    public void receiverB(Map map, Message message, Channel channel) throws IOException {
        System.out.println("死信队列B接收到了消息--------");
        System.out.println(map);
        //接收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

@RabbitListener注解里面的参数就是监听的队列,这里我们是直接拿配置类里面的使用的

receiverA里面的第一个参数就是我们发送消息的类型,可以直接输出打印

第二个参数也包含发送的对象,获得的是一个字节数组,需要转型,也包含了发送消息的id标识等

第三个就是通道




-----------------↓↓↓↓↓↓↓-----------------

写完是个延时队列的功能,当然也可以变成死信队列…




使用场景

1. 死信队列使用场景


一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息(没错,以前就是这么干的= =)。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。


2. 延时队列使用场景


1.订单在十分钟之内未支付则自动取消。

2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

3.账单在一周内未支付,则自动结算。

4.用户注册成功后,如果三天内没有登陆则进行短信提醒。

5.用户发起退款,如果三天内没有得到处理则通知相关运营人员。

6.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。






结束OVER




欢迎各路神人看完的感受或者有差的地方多多评论



20201201103751521.gif



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
4月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
4月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
2月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
123 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
3月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
143 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
2月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
3月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
250 6
|
4月前
|
消息中间件 JSON Java
|
4月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
5月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
166 2