7. 延迟队列
7.1. 延迟队列概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
7.2. 延迟队列使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
7.3. RabbitMQ 中的 TTL
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
7.3.1 两者的区别
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。
7.4. 整合 springboot
7.4.1. 创建项目
7.4.2. 添加依赖
<dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--swagger--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
7.4.3. 修改配置文件
spring.rabbitmq.host=182.92.234.71
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
wo 本地的
spring.rabbitmq.host=39.107.43.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
7.4.4. 添加 Swagger 配置类
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig(){ return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo(){ return new ApiInfoBuilder() .title("rabbitmq 接口文档") .description("本文档描述了 rabbitmq 微服务接口定义") .version("1.0") .contact(new Contact("enjoy6288", "http://atguigu.com", "1551388580@qq.com")) .build(); } }
7.5. 队列 TTL
7.5.1. 代码架构图
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:
7.5.2. 配置文件类代码
@Configuration public class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; // 声明 xExchange @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } // 声明 xExchange @Bean("yExchange") public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明队列 A ttl 为 10s 并绑定到对应的死信交换机 @Bean("queueA") public Queue queueA(){ Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } // 声明队列 A 绑定 X 交换机 @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //声明队列 B ttl 为 40s 并绑定到对应的死信交换机 @Bean("queueB") public Queue queueB(){ Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } //声明队列 B 绑定 X 交换机 @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); } //声明死信队列 QD @Bean("queueD") public Queue queueD(){ return new Queue(DEAD_LETTER_QUEUE); } //声明死信队列 QD 绑定关系 @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
package com.atguigu.springbootrabbitmq.config; import org.mapstruct.Qualifier; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration //接口 public class TtlQueueConfig { //普通交换机的名称 public static final String X_EXCHANGE = "X"; //死信交换机的名称 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列的名称 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信队列的名称 public static final String DEAD_LETTER_QUEUE = "QD"; //普通 交换机 //声明X_EXCHANGE 别名注入的时候用 @Bean("xEXCHANGE") public DirectExchange x_EXCHANGE() { return new DirectExchange(X_EXCHANGE); } //声明y_EXCHANGE 别名注入的时候用 @Bean("yEXCHANGE") public DirectExchange y_EXCHANGE() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //队列一 //声明y_EXCHANGE 别名注入的时候用 ttl为10s @Bean("queueA") public Queue queueA() { //3 代表map的长度 因为现在是3个 就写一个3 可不写 Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信的RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //设置ttl 单位ms arguments.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A). withArguments(arguments).build(); } //队列二 //声明y_EXCHANGE 别名注入的时候用 ttl为10s @Bean("queueB") public Queue queueB() { //3 代表map的长度 因为现在是3个 就写一个3 可不写 Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信的RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //设置ttl 单位ms arguments.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B). withArguments(arguments).build(); } //队列三 死信队列 //声明y_EXCHANGE 别名注入的时候用 ttl为10s @Bean("queueD") public Queue queueQD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE). build(); } //绑定 交换机和队列 进行绑定 @Bean public Binding queueABindxingX(@org.springframework.beans.factory.annotation.Qualifier("queueA") Queue queueA, @org.springframework.beans.factory.annotation.Qualifier("xEXCHANGE") DirectExchange xEXCHANGE){ //x的交换机与qa队列绑定 他们的routing的值是XA return BindingBuilder.bind(queueA).to(xEXCHANGE).with("XA"); } //绑定 交换机和队列 进行绑定 @Bean public Binding queueBBindxingX(@org.springframework.beans.factory.annotation.Qualifier("queueB") Queue queueB, @org.springframework.beans.factory.annotation.Qualifier("xEXCHANGE") DirectExchange xEXCHANGE){ //x的交换机与qb队列绑定 他们的routing的值是XB return BindingBuilder.bind(queueB).to(xEXCHANGE).with("XB"); } //绑定 死信交换机和队列 进行绑定 @Bean public Binding queueDBindxingX(@org.springframework.beans.factory.annotation.Qualifier("queueD") Queue queueD, @org.springframework.beans.factory.annotation.Qualifier("yEXCHANGE") DirectExchange yEXCHANGE){ //x的交换机与qb队列绑定 他们的routing的值是XB return BindingBuilder.bind(queueD).to(yEXCHANGE).with("YD"); } }