03.RabbitMQ延迟队列
1.使用场景
“订单下单成功后,15分钟未支付自动取消”
1.1.传统处理超时订单
采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作
1.2.rabbitMQ延时队列方案
一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的, 并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失
2.TTL和DLX
rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换机(DLX)和设置过期时间(TTL)结合起来实现延迟队列
2.1.TTL
TTL是Time To Live的缩写, 也就是生存时间。
RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。
设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。 设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。 消息:生产者 -> 交换机 消息在生产者制造消息的时候就开始计算了TTL TTL=5 队列:生产者 -> 交换机 -> 路由键 -> 队列 当消息送达到队列的时候才开始计算TTL TTL=10
2.2.DLX和死信队列
DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。
死信队列是指队列(正常)上的消息(过期)变成死信后,能够发送到另外一个交换机(DLX),然后被路由到一个队列上,这个队列,就是死信队列
成为死信一般有以下几种情况:
消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
消息的TTL-存活时间已经过期
队列长度限制被超越(队列满)
注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去
注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明 x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键
3.延迟队列
通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费
注1:延迟队列(即死信队列)产生流程
4. 开发步骤
4.1.config中添加DlxConfig
package com.zjzaki.rabbitmqprovider.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.logging.Handler; /** * @Author zjzaki * @Package com.zjzaki.rabbitmqprovider.config * @Date 2023-09-06 20:53:31 */ @Configuration public class DlxConfig { public static String NORMAL_EXCHANGE = "normal_exchange"; public static String NORMAL_QUEUE = "normal_queue"; public static String NORMAL_ROUTING_KEY = "normal_routing_key"; public static String DLX_EXCHANGE = "dlx_exchange"; public static String DLX_QUEUE = "dlx_queue"; public static String DLX_ROUTING_KEY = "normal_routing_key"; @Bean public Queue normalQueue() { // ttl时间,当前队列绑定哪个交换机,绑定的交换机对应的routing_key HashMap<String, Object> arguments = new HashMap<>(16); //message在该队列queue的存活时间最大为10秒 arguments.put("x-message-ttl", 10000); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX) arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); //x-dead-letter-routing-key参数是给这个DLX指定路由键 arguments.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // return new Queue(NORMAL_QUEUE); return new Queue(NORMAL_QUEUE, true, false, false, arguments); } @Bean public DirectExchange normalEXCHANGE() { return new DirectExchange(NORMAL_EXCHANGE); } @Bean public Binding normalBinding() { return BindingBuilder.bind(normalQueue()).to(normalEXCHANGE()).with(NORMAL_ROUTING_KEY); } @Bean public Queue dlxQueue() { return new Queue(DLX_QUEUE); } @Bean public DirectExchange dlxEXCHANGE() { return new DirectExchange(DLX_EXCHANGE); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxEXCHANGE()).with(DLX_ROUTING_KEY); } }
controller中添加代码
@RequestMapping("/dlx") public String dlxMsg(String rk) { Map<Object, Object> map = new HashMap<>(); map.put("msg", "此消息为[验证死信队列]路由过来的"); map.put("time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); rabbitTemplate.convertAndSend(DlxConfig.NORMAL_EXCHANGE, rk, map); return "dlx"; }
4.2.访问 http://localhost:8081/dlx?rk=normal_routing_key
4.3.rabbitmq-consumer添加DlxReceiver
package com.zjzaki.rabbitmqconsumer.config; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; @Component @RabbitListener(queues = {"dlx_queue"}) public class DlxReceiver { // @RabbitListener(queues = {"direct-queue"}) @RabbitHandler public void handler(Map msg) { System.out.println("当前的时间为: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); System.out.println(msg); } }
4.4.启动,访问 http://localhost:8081/dlx?rk=normal_routing_key
可以注意到间隔10秒后收到消息