一、背景
延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景:
订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。
短信通知: 下单成功后 60s 之后给用户发送短信通知。
失败重试: 业务操作失败后,间隔一定的时间进行失败重试。
传统订单处理:
采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。
当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作。
但是通过定时任务来执行,实时性始终不是很好。
二、实现方案
方案一:通过死信队列实现
参考:https://www.cnblogs.com/xmf3628/p/12097101.html
方案二:通过延迟路由插件来实现rabbitmq-delayed-message-exchange
这里演示通过插件来实现
三、插件安装
下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:
//查看已安装的插件 rabbitmq-plugins list //启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange //重启服务 service rabbitmq-server restart //再次查看,插件是否生效 rabbitmq-plugins list
说明插件安装成功,并成功启用。
四、机制解释
安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。
五、代码实战
1、依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2、队列连接信息
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/
3、生产者代码
声明延迟队列和延迟队列的交换器,并建立绑定关系
@Configuration public class TestDelayQueueConfig { @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); return new CustomExchange(ExchangeEnum.DELAY_EXCHANGE.getValue(), "x-delayed-message", true, false, args); } /** * 延迟消息队列 * @return */ @Bean public Queue delayQueue() { return new Queue(QueueEnum.TEST_DELAY.getName(), true); } @Bean public Binding deplyBinding() { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(QueueEnum.TEST_DELAY.getRoutingKey()).noargs(); } }
@Getter public enum ExchangeEnum { DELAY_EXCHANGE("test.deply.exchange"); private String value; ExchangeEnum(String value) { this.value = value; } }
@Getter public enum QueueEnum { /** * delay */ TEST_DELAY("test.delay.queue", "delay"); /** * 队列名称 */ private String name; /** * 队列路由键 */ private String routingKey; QueueEnum(String name, String routingKey) { this.name = name; this.routingKey = routingKey; } }
延迟队列生产者服务:
@Component @Slf4j public class DeplyProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String msg, int delayTime) { log.info("msg= " + msg + ".delayTime" + delayTime); MessageProperties messageProperties = new MessageProperties(); messageProperties.setDelay(delayTime); Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.send(ExchangeEnum.DELAY_EXCHANGE.getValue(), QueueEnum.TEST_DELAY.getRoutingKey(), message); } }
单元测试:
@Test public void sendDeplyMsgTest() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String currentTime = sdf.format(new Date()); log.info("发送测试消息的时间:" +currentTime ); deplyProducer.send(currentTime + "发送一个测试消息,延迟10秒", 10000);//10秒 deplyProducer.send(currentTime + "发送一个测试消息,延迟20秒", 20000);//2秒 deplyProducer.send(currentTime + "发送一个测试消息,延迟30秒", 30000);//1秒 }
消费者:
@Component @RabbitListener(queues = "test.delay.queue") @Slf4j public class DeplyConsumer { @RabbitHandler public void onMessage(byte[] message, @Headers Map<String, Object> headers, Channel channel) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //log.info("收到延时消息时间:" + sdf.format(new Date()) + " Delay sent."); log.info(sdf.format(new Date())+"接收到延时消息:" + new String(message)); } }
执行测试结果:
执行消息发送代码的时候,不会立刻把消息推送到对应的队列中。
只要到了对应的时候,才会将消息推送到队列里面。
可以在执行单元测试的时候,通过管理界面,观察队列中消息的增长:
每隔10s,增加1条。