rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景

简介: rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景

创建项目引入依赖

1. 创建maven项目

2. 引入相应的依赖以及配置文件

3. rabbitmq依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
   <version>2.0.1.RELEASE</version>
</dependency>


4. 配置文件配置


server:
  port: 1111#端口号
spring:
  rabbitmq:
    host: 127.0.0.1 #rabbitmq主机
    port: 5672    #端口
    username: guest #用户名
    password: guest #密码
    virtual-host: /  #权限
    listener:
      type: simple  #容器类型.simple或direct
      simple:
        default-requeue-rejected: false  #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
        #手动接收消息
        acknowledge-mode: manual  #ack模式

声明交换机和队列

1. 声明常量作为交换机以及队列的名称


声明这里在类添加configuration注解 把声明的Bean添加到spring容器当中

Ctrl+shift+u 转换大小写

这里队列声明的后面带个数字就是随便写的,本人队列有点多所有后面添了一个标识。名称自己换一换就行

  //延迟交换机fanout
    public static final String DELAY_EXCHANGE_NAME_30="delay_exchange_name_30";
    //延时队列A
    public static final String DELAY_QUEUEA_NAME_30="delay_queuea_name_30";
    //延时队列B
    public static final String DELAY_QUEUEB_NAME_30="delay_queueb_name_30";
    //死信交换机
    public static final String DEAD_LETTER_EXCHANGE_NAME_30="dead_letter_exchange_name_30";
    //死信队列A
    public static final String DEAD_LETTER_QUEUEA_NAME_30="dead_letter_queuea_name_30";
    //死信队列B
    public static final String DEAD_LETTER_QUEUEB_NAME_30="dead_letter_queueb_name_30";

2. 声明延时交换机以及死信交换机(其实就是普通的交换机)


/**
     * 声明延时交换机 fanout
     * @return
     */
    @Bean(DELAY_EXCHANGE_NAME_30)
    public Exchange DELAY_EXCHANGE_NAME_30(){
        return ExchangeBuilder.fanoutExchange(DELAY_EXCHANGE_NAME_30).durable(true).build();
    }
    /**
     * 声明死信交换机 direct
     * @return
     */
    @Bean(DEAD_LETTER_EXCHANGE_NAME_30)
    public Exchange DEAD_LETTER_EXCHANGE_NAME_30(){
        return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE_NAME_30).durable(true).build();
    }

延时交换机我们设置为fanout,当然也可以是别的

而死信交换机不能设置为fanout,否则一个私信队列能接收到所有延时队列的消息

还不明白的话可以自己测试一下


3. 声明延时队列并指定对应的死信队列以及交换机(等设置延时时间过期之后发送到对应绑定的死信队列)


  /**
     * 声明延时队列A
     * 并绑定死信交换机 和 死信队列A  根据路由key
     * @return
     */
    @Bean(DELAY_QUEUEA_NAME_30)
    public Queue DELAY_QUEUEA_NAME_30(){
        Map map=new HashMap();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME_30);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_NAME_30);
        //设置此队列延时时间 6秒
        map.put("x-message-ttl",6000);
        return QueueBuilder.durable(DELAY_QUEUEA_NAME_30).withArguments(map).build();
    }
    /**
     * 声明延时队列B
     * 并绑定死信交换机 和 死信队列B  根据路由key
     * @return
     */
    @Bean(DELAY_QUEUEB_NAME_30)
    public Queue DELAY_QUEUEB_NAME_30(){
        Map map=new HashMap();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME_30);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_NAME_30);
        //设置此队列延时时间 12秒
        map.put("x-message-ttl",12000);
        return QueueBuilder.durable(DELAY_QUEUEB_NAME_30).withArguments(map).build();
    }

map参数解释

x-dead-letter-exchange 绑定的死信交换机

x-dead-letter-routing-key 死信队列的路由key

x-message-ttl 延时时间,也就是过期时间 毫秒单位

声明队列进行绑定,druable参数是声明队列的名称


4. 声明死信队列 (就是正常队列)


  /**
     * 声明死信队列A
     * @return
     */
    @Bean(DEAD_LETTER_QUEUEA_NAME_30)
    public Queue DEAD_LETTER_QUEUEA_NAME_30(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME_30);
    }
    /**
     * 声明死信队列B
     * @return
     */
    @Bean(DEAD_LETTER_QUEUEB_NAME_30)
    public Queue DEAD_LETTER_QUEUEB_NAME_30(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME_30);
    }

5. 延时队列绑定交换机


  /**
     * 延时队列A绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayQueueABinding(
            @Qualifier(DELAY_QUEUEA_NAME_30)Queue queue,
            @Qualifier(DELAY_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
    /**
     * 延时队列B绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayQueueBBinding(
            @Qualifier(DELAY_QUEUEB_NAME_30)Queue queue,
            @Qualifier(DELAY_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }

. 因为延时交换机是fanout类型的所以路由key可以不用设置,设置了也不会用到


6. 死信队列绑定交换机

  /**
     * 死信队列A绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterQueueABinding(
            @Qualifier(DEAD_LETTER_QUEUEA_NAME_30)Queue queue,
            @Qualifier(DEAD_LETTER_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_NAME_30).noargs();
    }
    /**
     * 死信队列B绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterQueueBBinding(
            @Qualifier(DEAD_LETTER_QUEUEB_NAME_30)Queue queue,
            @Qualifier(DEAD_LETTER_EXCHANGE_NAME_30)Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_NAME_30).noargs();
    }

.死信交换机类型是direct,我们要指定路由key,在我们声明延时队列已经指定相应的路由key了,所以这里要和声明延时队列指定的路由key要一致哦

7. 绑定参数详解


方法两个参数,队列以及交换机

@Qualifier(name)里就是我们声明@Bean(name)的队列,实现注入,交换机也一样

返回值里的 bind参数就是我们要绑定的队列

to就是绑定到哪个交换机

with就是路由key

生产者发送消息

1. 生产者发送消息(我们在测试类写)


具体看自己业务的需求写在哪里

  @Resource
  private RabbitTemplate rabbitTemplate;
  @Test
  public void test1(){
      Map map=new HashMap();
      map.put("name","张三");
      map.put("age","18");
      //第一个参数发送给哪个交换机   第二个路由key  我们延时交换机是fanout所以路由key为空   第三个发送对象
      rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME_30,"",map);
  }

注入对应模板对象,依赖里面的

conVerAndSend发送消息

消费者监听死信队列

1. 监听死信队列


@Component
public class RabbitmqDeadLetterListener {
    /**
     * 监听死信队列A
     * @param map
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {RabbitmqConfig.DEAD_LETTER_QUEUEA_NAME_30})
    public void receiverA(Map map, Message message, Channel channel) throws IOException {
        System.out.println("死信队列A接收到了消息--------");
        System.out.println(map);
        //接收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
    /**
     * 监听死信队列B
     * @param map
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {RabbitmqConfig.DEAD_LETTER_QUEUEB_NAME_30})
    public void receiverB(Map map, Message message, Channel channel) throws IOException {
        System.out.println("死信队列B接收到了消息--------");
        System.out.println(map);
        //接收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

@RabbitListener注解里面的参数就是监听的队列,这里我们是直接拿配置类里面的使用的

receiverA里面的第一个参数就是我们发送消息的类型,可以直接输出打印

第二个参数也包含发送的对象,获得的是一个字节数组,需要转型,也包含了发送消息的id标识等

第三个就是通道




-----------------↓↓↓↓↓↓↓-----------------

写完是个延时队列的功能,当然也可以变成死信队列…




使用场景

1. 死信队列使用场景


一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息(没错,以前就是这么干的= =)。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。


2. 延时队列使用场景


1.订单在十分钟之内未支付则自动取消。

2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

3.账单在一周内未支付,则自动结算。

4.用户注册成功后,如果三天内没有登陆则进行短信提醒。

5.用户发起退款,如果三天内没有得到处理则通知相关运营人员。

6.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。






结束OVER




欢迎各路神人看完的感受或者有差的地方多多评论



20201201103751521.gif



相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
安全 中间件 数据安全/隐私保护
中间件的定义,包括它的功能、应用场景以及优势。
中间件是位于操作系统和应用软件间的系统软件,提供数据交换、应用集成、流程管理和安全保障等服务。常用于分布式系统、微服务架构和企业级应用,实现高效、低耦合的系统运行。其优势在于降低开发成本、提升系统性能、简化扩展和维护。中间件推动了软件技术的发展和创新。
242 1
|
2月前
|
消息中间件 Java
SpringBoot RabbitMQ死信队列
SpringBoot RabbitMQ死信队列
77 0
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
136 0
|
20天前
|
消息中间件 存储 监控
RabbitMQ 死信队列
RabbitMQ的死信队列(DLQ)是存储无法正常消费消息的特殊队列,常见于消息被拒绝、过期或队列满时。DLQ用于异常处理、任务调度和监控,通过绑定到普通队列自动路由死信消息。通过监听死信队列,可以对异常消息进行补偿和进一步处理,提升系统稳定性和可维护性。
15 1
|
23天前
|
缓存 监控 中间件
探究Django中间件的神奇:功能、应用场景和核心方法
在Django中,中间件是一个强大的概念,它们提供了一种灵活的方式来处理请求和响应。本文将探讨Django中间件的基本概念、常见应用场景以及中间件类中的父类和核心方法。
|
2月前
|
存储 缓存 中间件
中间件Cache-Aside策略特别适合“读多”的应用场景
【5月更文挑战第8天】中间件Cache-Aside策略特别适合“读多”的应用场景
18 2
|
2月前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
55 1
|
2月前
|
消息中间件
RabbitMQ 死信队列
RabbitMQ 死信队列
33 0
RabbitMQ 死信队列
|
2月前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
85 0
|
2月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
68 1