rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景

简介: rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景

创建项目引入依赖

1. 创建maven项目

2. 引入相应的依赖以及配置文件

3. rabbitmq依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
   <version>2.0.1.RELEASE</version>
</dependency>


4. 配置文件配置


server:
  port: 1111#端口号
spring:
  rabbitmq:
    host: 127.0.0.1 #rabbitmq主机
    port: 5672    #端口
    username: guest #用户名
    password: guest #密码
    virtual-host: /  #权限
    listener:
      type: simple  #容器类型.simple或direct
      simple:
        default-requeue-rejected: false  #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
        #手动接收消息
        acknowledge-mode: manual  #ack模式

声明交换机和队列

1. 声明常量作为交换机以及队列的名称


声明这里在类添加configuration注解 把声明的Bean添加到spring容器当中

Ctrl+shift+u 转换大小写

这里队列声明的后面带个数字就是随便写的,本人队列有点多所有后面添了一个标识。名称自己换一换就行

  //延迟交换机fanout
    public static final String DELAY_EXCHANGE_NAME_30="delay_exchange_name_30";
    //延时队列A
    public static final String DELAY_QUEUEA_NAME_30="delay_queuea_name_30";
    //延时队列B
    public static final String DELAY_QUEUEB_NAME_30="delay_queueb_name_30";
    //死信交换机
    public static final String DEAD_LETTER_EXCHANGE_NAME_30="dead_letter_exchange_name_30";
    //死信队列A
    public static final String DEAD_LETTER_QUEUEA_NAME_30="dead_letter_queuea_name_30";
    //死信队列B
    public static final String DEAD_LETTER_QUEUEB_NAME_30="dead_letter_queueb_name_30";

2. 声明延时交换机以及死信交换机(其实就是普通的交换机)


/**
     * 声明延时交换机 fanout
     * @return
     */
    @Bean(DELAY_EXCHANGE_NAME_30)
    public Exchange DELAY_EXCHANGE_NAME_30(){
        return ExchangeBuilder.fanoutExchange(DELAY_EXCHANGE_NAME_30).durable(true).build();
    }
    /**
     * 声明死信交换机 direct
     * @return
     */
    @Bean(DEAD_LETTER_EXCHANGE_NAME_30)
    public Exchange DEAD_LETTER_EXCHANGE_NAME_30(){
        return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE_NAME_30).durable(true).build();
    }

延时交换机我们设置为fanout,当然也可以是别的

而死信交换机不能设置为fanout,否则一个私信队列能接收到所有延时队列的消息

还不明白的话可以自己测试一下


3. 声明延时队列并指定对应的死信队列以及交换机(等设置延时时间过期之后发送到对应绑定的死信队列)


  /**
     * 声明延时队列A
     * 并绑定死信交换机 和 死信队列A  根据路由key
     * @return
     */
    @Bean(DELAY_QUEUEA_NAME_30)
    public Queue DELAY_QUEUEA_NAME_30(){
        Map map=new HashMap();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME_30);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_NAME_30);
        //设置此队列延时时间 6秒
        map.put("x-message-ttl",6000);
        return QueueBuilder.durable(DELAY_QUEUEA_NAME_30).withArguments(map).build();
    }
    /**
     * 声明延时队列B
     * 并绑定死信交换机 和 死信队列B  根据路由key
     * @return
     */
    @Bean(DELAY_QUEUEB_NAME_30)
    public Queue DELAY_QUEUEB_NAME_30(){
        Map map=new HashMap();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME_30);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_NAME_30);
        //设置此队列延时时间 12秒
        map.put("x-message-ttl",12000);
        return QueueBuilder.durable(DELAY_QUEUEB_NAME_30).withArguments(map).build();
    }

map参数解释

x-dead-letter-exchange 绑定的死信交换机

x-dead-letter-routing-key 死信队列的路由key

x-message-ttl 延时时间,也就是过期时间 毫秒单位

声明队列进行绑定,druable参数是声明队列的名称


4. 声明死信队列 (就是正常队列)


  /**
     * 声明死信队列A
     * @return
     */
    @Bean(DEAD_LETTER_QUEUEA_NAME_30)
    public Queue DEAD_LETTER_QUEUEA_NAME_30(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME_30);
    }
    /**
     * 声明死信队列B
     * @return
     */
    @Bean(DEAD_LETTER_QUEUEB_NAME_30)
    public Queue DEAD_LETTER_QUEUEB_NAME_30(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME_30);
    }

5. 延时队列绑定交换机


  /**
     * 延时队列A绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayQueueABinding(
            @Qualifier(DELAY_QUEUEA_NAME_30)Queue queue,
            @Qualifier(DELAY_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
    /**
     * 延时队列B绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayQueueBBinding(
            @Qualifier(DELAY_QUEUEB_NAME_30)Queue queue,
            @Qualifier(DELAY_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }

. 因为延时交换机是fanout类型的所以路由key可以不用设置,设置了也不会用到


6. 死信队列绑定交换机

  /**
     * 死信队列A绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterQueueABinding(
            @Qualifier(DEAD_LETTER_QUEUEA_NAME_30)Queue queue,
            @Qualifier(DEAD_LETTER_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_NAME_30).noargs();
    }
    /**
     * 死信队列B绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterQueueBBinding(
            @Qualifier(DEAD_LETTER_QUEUEB_NAME_30)Queue queue,
            @Qualifier(DEAD_LETTER_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_NAME_30).noargs();
    }

.死信交换机类型是direct,我们要指定路由key,在我们声明延时队列已经指定相应的路由key了,所以这里要和声明延时队列指定的路由key要一致哦

7. 绑定参数详解


方法两个参数,队列以及交换机

@Qualifier(name)里就是我们声明@Bean(name)的队列,实现注入,交换机也一样

返回值里的 bind参数就是我们要绑定的队列

to就是绑定到哪个交换机

with就是路由key

生产者发送消息

1. 生产者发送消息(我们在测试类写)


具体看自己业务的需求写在哪里

  @Resource
  private RabbitTemplate rabbitTemplate;
  @Test
  public void test1(){
      Map map=new HashMap();
      map.put("name","张三");
      map.put("age","18");
      //第一个参数发送给哪个交换机   第二个路由key  我们延时交换机是fanout所以路由key为空   第三个发送对象
      rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME_30,"",map);
  }

注入对应模板对象,依赖里面的

conVerAndSend发送消息

消费者监听死信队列

1. 监听死信队列


@Component
public class RabbitmqDeadLetterListener {
    /**
     * 监听死信队列A
     * @param map
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {RabbitmqConfig.DEAD_LETTER_QUEUEA_NAME_30})
    public void receiverA(Map map, Message message, Channel channel) throws IOException {
        System.out.println("死信队列A接收到了消息--------");
        System.out.println(map);
        //接收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
    /**
     * 监听死信队列B
     * @param map
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {RabbitmqConfig.DEAD_LETTER_QUEUEB_NAME_30})
    public void receiverB(Map map, Message message, Channel channel) throws IOException {
        System.out.println("死信队列B接收到了消息--------");
        System.out.println(map);
        //接收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

@RabbitListener注解里面的参数就是监听的队列,这里我们是直接拿配置类里面的使用的

receiverA里面的第一个参数就是我们发送消息的类型,可以直接输出打印

第二个参数也包含发送的对象,获得的是一个字节数组,需要转型,也包含了发送消息的id标识等

第三个就是通道




-----------------↓↓↓↓↓↓↓-----------------

写完是个延时队列的功能,当然也可以变成死信队列…




使用场景

1. 死信队列使用场景


一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息(没错,以前就是这么干的= =)。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。


2. 延时队列使用场景


1.订单在十分钟之内未支付则自动取消。

2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

3.账单在一周内未支付,则自动结算。

4.用户注册成功后,如果三天内没有登陆则进行短信提醒。

5.用户发起退款,如果三天内没有得到处理则通知相关运营人员。

6.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。






结束OVER




欢迎各路神人看完的感受或者有差的地方多多评论



20201201103751521.gif



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
222 3
|
23天前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
23天前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
23天前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
5天前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
10 0
|
11天前
|
中间件 数据库连接 UED
Django中间件秘籍:如何用几行代码让你的应用变得超级强大?
【8月更文挑战第31天】中间件是Django框架的核心特性,位于视图与HTTP服务器之间,允许全局处理请求和响应,增强Web应用功能。通过实现`MiddlewareMixin`类的方法,如`process_request`和`process_response`,可以轻松实现请求预处理或响应后处理。中间件应用场景广泛,包括用户认证、CSRF防护和数据库连接管理等。创建并配置中间件需将其加入`settings.py`的`MIDDLEWARE`列表,顺序决定执行优先级。合理利用中间件能提高代码重用性和应用性能,带来更好的用户体验。
17 0
|
2月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
2月前
|
消息中间件 监控 负载均衡
中间件RabbitMQ性能瓶颈
【7月更文挑战第13天】
131 11
|
2月前
|
消息中间件
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
|
3月前
|
设计模式 监控 中间件
PHP中的中间件模式及其应用
【6月更文挑战第24天】在PHP开发中,中间件是一种设计模式,它允许开发者在请求处理流程的不同阶段插入自定义的处理逻辑。本文将介绍PHP中间件的概念、实现方式以及如何利用中间件提高代码的可维护性和扩展性。通过实际案例,我们将探索中间件在身份验证、日志记录和性能监控等方面的应用,并讨论如何在Laravel框架中有效使用中间件。
下一篇
DDNS