八、RabbitMQ延迟队列
8.1 概念
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。
但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。
8.2 死信队列实现延迟队列
1.创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
2.编写配置文件
spring: rabbitmq: host: 192.168.66.100 port: 5672 username: MQzhang password: MQzhang virtual-host: / #日志格式 logging: pattern: console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
3.创建队列和交换机
package com.zj.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { // 订单交换机和队列 private final String ORDER_EXCHANGE = "order_exchange"; private final String ORDER_QUEUE = "order_queue"; // 过期订单交换机和队列(死信交换机和死信队列) private final String EXPIRE_EXCHANGE = "expire_exchange"; private final String EXPIRE_QUEUE = "expire_queue"; // 过期订单交换机 @Bean(EXPIRE_EXCHANGE) public Exchange deadExchange(){ return ExchangeBuilder .topicExchange(EXPIRE_EXCHANGE) .durable(false) .build(); } // 过期订单队列 @Bean(EXPIRE_QUEUE) public Queue deadQueue(){ return QueueBuilder .durable(EXPIRE_QUEUE) .build(); } // 将过期订单队列绑定到交换机 @Bean public Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){ return BindingBuilder .bind(queue) .to(exchange) .with("expire_routing") .noargs(); } // 订单交换机 @Bean(ORDER_EXCHANGE) public Exchange normalExchange(){ return ExchangeBuilder .topicExchange(ORDER_EXCHANGE) .durable(false) .build(); } // 订单队列 @Bean(ORDER_QUEUE) public Queue normalQueue(){ return QueueBuilder .durable(ORDER_QUEUE) .ttl(10000) // 存活时间为10s,模拟30min .deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机 .deadLetterRoutingKey("expire_routing") // 死信交换机的路由关键字 .build(); } // 将订单队列绑定到交换机 @Bean public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange, @Qualifier(ORDER_QUEUE) Queue queue){ return BindingBuilder .bind(queue) .to(exchange) .with("order_routing") .noargs(); } }
4.编写下单的控制器方法,下单后向订单交换机发送消息
@RestController public class OrderController { @Resource private RabbitTemplate rabbitTemplate; //下单 @GetMapping("/place/{id}") public String placeOrder(@PathVariable("id") String id){ System.out.println("处理订单数据"); //将订单id发送到订单队列 rabbitTemplate.convertAndSend("order_exchange","order_routing",id); return "下单成功,修改库存。"; } }
5.编写监听死信队列的消费者
@Component public class Consumer { // 监听过期队列 @RabbitListener(queues = "expire_queue") public void listen_message(String id) { System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 "); } }
8.3 插件实现延迟队列
在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。
RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。
1、使用xftpj将延迟插件上传到虚拟机
2.安装插件
# 将插件放入RabbitMQ插件目录中 mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/local/rabbitmq/plugins/ # 启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.重启RabbitMQ服务
#停止rabbitmq rabbitmqctl stop #启动rabbitmq rabbitmq-server restart -detached
此时登录管控台可以看到交换机类型多了延迟消息:
4、创建延迟交换机和延迟队列
package com.zj.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class RabbitConfig { // 创建延迟交换机和延迟队列 private final String DELAYED_EXCHANGE = "delayed_exchange"; private final String DELAYED_QUEUE = "delayed_queue"; // 延迟交换机,ExchangeBuilder只能创建普通的交换机例如:topic、direct、fanout交换机。要创建延迟交换机只能创建自定义交换机。 @Bean(DELAYED_EXCHANGE) public Exchange deadExchange(){ HashMap<String, Object> args = new HashMap<>(); args.put("x-delayed-type","topic"); //topic:延迟交换机的实际类型。 return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",false,true,args); } // 延迟队列 @Bean(DELAYED_QUEUE) public Queue deadQueue(){ return QueueBuilder .durable(DELAYED_QUEUE) .build(); } // 将延迟队列绑定到延迟交换机 @Bean public Binding bindExchangeQueue(@Qualifier(DELAYED_EXCHANGE) Exchange exchange, @Qualifier(DELAYED_QUEUE) Queue queue){ return BindingBuilder .bind(queue) .to(exchange) .with("delayed-routing") .noargs(); } }
5.编写下单的控制器方法
package com.zj.controller; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class OrderController { @Resource private RabbitTemplate rabbitTemplate; //下单 @GetMapping("/place/{id}") public String placeOrder(@PathVariable("id") String id){ System.out.println("处理订单数据"); //设置消息的延迟时间为10s MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(10000); return message; } }; rabbitTemplate.convertAndSend("delayed_exchange","delayed-routing",id,messagePostProcessor); return "下单成功,修改库存。"; } }
6.编写延迟队列的消费者
package com.zj.consumer; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { // 监听延迟队列 @RabbitListener(queues = "delayed_queue") public void listen_message(String id) { System.out.println("查询订单号为:"+id+"的订单,如果已支付无需处理,未支付回退库存。 "); } }
7、下单测试
延迟队列中没有消息是因为消费者将消息消费了。