RabbitMQ之延迟队列(手把手教你学习延迟队列)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。

文章目录

一、延迟队列概念

二、延迟队列使用场景

三、RabbitMQ 中的 TTL

1、队列设置 TTL

2、消息设置 TTL

3、两者的区别

四、整合 springboot

1、添加依赖

2、修改配置文件

3、添加 Swagger 配置类

五、队列 TTL

1、代码架构图

2、配置文件类代码

3、消息生产者代码

4、消息消费者代码

六、延时队列优化

1、代码架构图

2、配置文件类代码

3、消息生产者代码

七、Rabbitmq 插件实现延迟队列

1、安装延时队列插件

2、代码架构图

3、配置文件类代码

4、消息生产者代码

5、消息消费者代码

总结


一、延迟队列概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。

二、延迟队列使用场景

  • 1.订单在十分钟之内未支付则自动取消
  • 2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

三、RabbitMQ 中的 TTL

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有

消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

1、队列设置 TTL

第一种是在创建队列的时候设置队列的“x-message-ttl”属性

2、消息设置 TTL

第二种是针对每条消息设置 TTL

3、两者的区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

四、整合 springboot

1、添加依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>

2、修改配置文件

spring.rabbitmq.host=192.168.10.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3、添加 Swagger 配置类

importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importspringfox.documentation.builders.ApiInfoBuilder;
importspringfox.documentation.service.ApiInfo;
importspringfox.documentation.service.Contact;
importspringfox.documentation.spi.DocumentationType;
importspringfox.documentation.spring.web.plugins.Docket;
importspringfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration@EnableSwagger2publicclassSwaggerConfig {
@BeanpublicDocketwebApiConfig(){
returnnewDocket(DocumentationType.SWAGGER_2)
            .groupName("webApi")
            .apiInfo(webApiInfo())
            .select()
            .build();
    }
privateApiInfowebApiInfo(){
returnnewApiInfoBuilder()
            .title("rabbitmq 接口文档")
            .description("本文档描述了 rabbitmq 微服务接口定义")
            .version("1.0")
            .contact(newContact("enjoy6288", "http://atguigu.com", "1551388580@qq.com"))
            .build();
    }
}

五、队列 TTL

1、代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

2、配置文件类代码

@ConfigurationpublicclassTtlQueueConfig {
publicstaticfinalStringX_EXCHANGE="X";
publicstaticfinalStringQUEUE_A="QA";
publicstaticfinalStringQUEUE_B="QB";
publicstaticfinalStringY_DEAD_LETTER_EXCHANGE="Y";
publicstaticfinalStringDEAD_LETTER_QUEUE="QD";
// 声明 xExchange@Bean("xExchange")
publicDirectExchangexExchange(){
returnnewDirectExchange(X_EXCHANGE);
    }
// 声明 xExchange@Bean("yExchange")
publicDirectExchangeyExchange(){
returnnewDirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
//声明队列 A ttl 为 10s 并绑定到对应的死信交换机@Bean("queueA")
publicQueuequeueA(){
Map<String, Object>args=newHashMap<>(3);
//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTLargs.put("x-message-ttl", 10000);
returnQueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }
// 声明队列 A 绑定 X 交换机@BeanpublicBindingqueueaBindingX(@Qualifier("queueA") QueuequeueA,@Qualifier("xExchange") DirectExchangexExchange){
returnBindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
//声明队列 B ttl 为 40s 并绑定到对应的死信交换机@Bean("queueB")
publicQueuequeueB(){
Map<String, Object>args=newHashMap<>(3);
//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTLargs.put("x-message-ttl", 40000);
returnQueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }
//声明队列 B 绑定 X 交换机@BeanpublicBindingqueuebBindingX(@Qualifier("queueB") Queuequeue1B,@Qualifier("xExchange") DirectExchangexExchange){
returnBindingBuilder.bind(queue1B).to(xExchange).with("XB");
    }
//声明死信队列 QD@Bean("queueD")
publicQueuequeueD(){
returnnewQueue(DEAD_LETTER_QUEUE);
    }
//声明死信队列 QD 绑定关系@BeanpublicBindingdeadLetterBindingQAD(@Qualifier("queueD") QueuequeueD,@Qualifier("yExchange") DirectExchangeyExchange){
returnBindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

3、消息生产者代码

@Slf4j@RequestMapping("ttl")
@RestControllerpublicclassSendMsgController {
@AutowiredprivateRabbitTemplaterabbitTemplate;
@GetMapping("sendMsg/{message}")
publicvoidsendMsg(@PathVariableStringmessage){
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", newDate(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);
    } 
}

4、消息消费者代码

@Slf4j@ComponentpublicclassDeadLetterQueueConsumer {
@RabbitListener(queues="QD")
publicvoidreceiveD(Messagemessage, Channelchannel) throwsIOException {
Stringmsg=newString(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", newDate().toString(), msg);
    }
}

发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

六、延时队列优化

1、代码架构图

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间

2、配置文件类代码

@ComponentpublicclassMsgTtlQueueConfig {
publicstaticfinalStringY_DEAD_LETTER_EXCHANGE="Y";
publicstaticfinalStringQUEUE_C="QC";
//声明队列 C 死信交换机@Bean("queueC")
publicQueuequeueB(){
Map<String, Object>args=newHashMap<>(3);
//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");
//没有声明 TTL 属性returnQueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }
//声明队列 B 绑定 X 交换机@BeanpublicBindingqueuecBindingX(@Qualifier("queueC") QueuequeueC,@Qualifier("xExchange") DirectExchangexExchange){
returnBindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}

3、消息生产者代码

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
publicvoidsendMsg(@PathVariableStringmessage,@PathVariableStringttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData->{
correlationData.getMessageProperties().setExpiration(ttlTime);
returncorrelationData;
    });
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", newDate(),ttlTime, message);
}

发起请求

  • http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
  • http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

七、Rabbitmq 插件实现延迟队列

1、安装延时队列插件

因为博主是用Docker安装的RabbitMQ,所以安装延时队列插件也是在Docker中进行。

首先在官网下载rabbitmq_delayed_message_exchange 插件。

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

点击下载.ez文件。

然后通过自己的文件传输工具将.ez文件上传到虚拟机中,博主这里将.ez文件放到了/mnt目录下。

然后输入sudo docker ps命令查看自己的rabbitmq是否正在运行,如果不在运行则输入sudo docker start idid这里填你自己的容器id,如果不知道自己id的,输入sudo docker pa -a查看。

当容器运行起来后,输入sudo docker cp /mnt/rabbitmq_delayed_message_exchange-3.12.0.ez rabbit:/plugins命令,将刚插件拷贝到容器内plugins目录下。

拷贝完成后,输入sudo docker exec -it rabbit /bin/bash命令,进入容器。

在容器内plugins目录下,查看插件是否上传成功ls -l|grep delay

然后启动插件,在当前目录下输入rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令

到这里插件安装就完成了,接下来我们需要重启RabbitMQ容器。执行exit命令退出RabbitMQ容器内部,然后执行docker restart rabbit命令重启RabbitMQ容器

在容器重启完成后,我们可以登录RabbitMQ的Web端管理界面,在Exchanges选项卡下,点击Add a new exchange,在Type里面看是否出现了x-delayed-message选项,如下图到这里,整个安装过程就完毕了。

2、代码架构图

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

3、配置文件类代码

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

@ConfigurationpublicclassDelayedQueueConfig {
publicstaticfinalStringDELAYED_QUEUE_NAME="delayed.queue";
publicstaticfinalStringDELAYED_EXCHANGE_NAME="delayed.exchange";
publicstaticfinalStringDELAYED_ROUTING_KEY="delayed.routingkey";
@BeanpublicQueuedelayedQueue() {
returnnewQueue(DELAYED_QUEUE_NAME);
    }
//自定义交换机 我们在这里定义的是一个延迟交换机@BeanpublicCustomExchangedelayedExchange() {
Map<String, Object>args=newHashMap<>();
//自定义交换机的类型args.put("x-delayed-type", "direct");
returnnewCustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }
@BeanpublicBindingbindingDelayedQueue(@Qualifier("delayedQueue") Queuequeue,@Qualifier("delayedExchange") CustomExchangedelayedExchange) {
returnBindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

4、消息生产者代码

publicstaticfinalStringDELAYED_EXCHANGE_NAME="delayed.exchange";
publicstaticfinalStringDELAYED_ROUTING_KEY="delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
publicvoidsendMsg(@PathVariableStringmessage,@PathVariableIntegerdelayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData->{
correlationData.getMessageProperties().setDelay(delayTime);
returncorrelationData;
    });
log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", newDate(),delayTime, message);
}

5、消息消费者代码

publicstaticfinalStringDELAYED_QUEUE_NAME="delayed.queue";
@RabbitListener(queues=DELAYED_QUEUE_NAME)
publicvoidreceiveDelayedQueue(Messagemessage){
Stringmsg=newString(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", newDate().toString(), msg);
}

发起请求:

  • http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
  • http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

第二个消息被先消费掉了,符合预期


总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
3月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
【1月更文挑战第12天】用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
230 1
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
71 0
|
21天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
17 0
|
1月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
42 1
|
2月前
|
消息中间件 监控 数据挖掘
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
48 0
|
2月前
|
消息中间件 Docker 容器
docker构建rabbitmq并配置延迟队列插件
docker构建rabbitmq并配置延迟队列插件
38 0
|
2月前
|
消息中间件
rabbitmq动态创建队列
rabbitmq动态创建队列
36 0
|
2月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
53 0
|
3月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
30 0

相关产品

  • 云消息队列 MQ