RabbitMQ延时队列应用场景

简介: RabbitMQ延时队列应用场景

应用场景

我们系统未付款的订单,超过一定时间后,需要系统自动取消订单并释放占有物品


常用的方案

就是利用Spring schedule定时任务,轮询检查数据库

但是会消耗系统内存,增加了数据库的压力、还存在较大的时间误差


解决:rabbitmq的消息TTL和死信Exchange结合

介绍

1.何为消息TTL、死信

死信:对消息设置的过期时间到了,这个消息还没有被消费就认为这个消息死了,死了的消息会进入死信交换机(Dead Letter Exchanges)

成为死信的三种条件:

  • 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
  • 上面的消息的TTL到了,消息过期了。
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上

消息TTL:消息的TTL就是消息的存活时间

RabbitMQ可以对队列和消息都设置过期时间,但代表的都是一个意思,只要消息在设置时间内没有消费,消息就死了,就被称为死信

如果队列和消息都设置了过期时间,那么就取时间最小的,单个消息的过期时间才是延时队列的关键

2.如何运作

设置队列过期时间


消费者P会通过一个路由键deal.message发送消息给X交换机,然后继续发送给delay queau队列,这个队列比较特殊,设置了过期时间5分钟过期,还设置了x-dead-letter-exchange用于指定下一个接收的交换机,消息过期之后会成为死信直接进入delay.exchange交换机,利用x-dead-letter-routing-key绑定的路由键找到下一个队列,这时候只需要有人监听这个队列。

设置消息过期时间


消费者发送一个消息,设置了5分钟过期时间,最后交给了延时队列,延时队列说消息死了不要乱放,指定了一个死信路由,用于找到下一个队列的路由键,等到五分钟后服务器会自动检查是否过期,过期的话会交给delay.exchange路由,最后再交给delay.message

代码模拟


下订单成功先发动给order-event-exchangeorder-event-exchange绑定了两个路由键order.create.orderorder.release.order,根据order.create.order路由键找到order.delay.queue队列,这是一个特殊的队列,上图所诉,消息的存活时间为一分钟,消息在order.delay.queue队列中没人使用变成死信了,交给order-event-exchange交换机,最后通过order.release.order绑定关系找到了order.release.order.queue队列

@Configuration
public class MyMQConfig {
    //监听最后一个队列,获取那些过期的订单消息
    @RabbitListener(queues = "order.release.order.queue")
    public  void  listerner(OrderEntity orderEntity,Channel channel,Message message) throws IOException {
        System.out.println("收到过期订单信息"+orderEntity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
    //特殊队列
    @Bean
    public Queue orderDelayQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000);
        Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
        return orderDelayQueue;
    }
    //最后接收死信消息的队列
    @Bean
    public Queue orderReleaseQueue() {
        return new Queue("order.release.order.queue", true, false, false);
    }
   //事件交换机
    @Bean
    public Exchange orderEventExchange() {
        return new TopicExchange("order-event-exchange", true, false);
    }
    //绑定order.delay.queue队列和的order-event-exchange交换机的路由键
    @Bean
    public Binding orderCreateBingding() {
        return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
    }
    //绑定order.release.order.queue队列和的order-event-exchange交换机的路由键
    @Bean
    public Binding orderReleaseBingding() {
        return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
    }
}

测试

@Autowired
    RabbitTemplate rabbitTemplate;
    @ResponseBody
    @GetMapping("/test/createOrder")
    public String createOrderTest(){
        OrderEntity entity = new OrderEntity();
        entity.setOrderSn(UUID.randomUUID().toString());
        entity.setModifyTime(new Date());
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
        return "ok";
    }


库存解锁实际场景


在库存服务有个stock-event-exchange交换机,如果我们想要解锁库存,

1、首先订单下成功、库存锁定成功

2、锁定成功就要通过stock.locked路由键发送一个消息给交换机stock-event-exchange,消息内容包括哪个订单、哪些商品、多少库存等等

3、交换机通过绑定关系再发送给延时队列stock.delay.queue

4、订单可能需要30分钟才会自动关闭,50分钟之后来检查库存,就会知道订单支付没有

5、50分钟消息没有被消息,就变为死信,通过stock.release路由键绑定关系交给stock-event-exchange交换机

6、stock-event-exchange交换机通过stock.release路由键绑定关系找到strock.relelase.stock.queue队列

7、所有的解锁库存服务就监听这个队列里的消息,只要这个队列里消息能够到达的都是超时没有支付订单的

下单远程锁定库存,然后将仓库锁定库存的数据发给订单,当在订单下单失败时,由于不是分布式事务,订单回滚,但仓库不回滚,所以订单一失败,就需要通过订单拿到mq中仓库传来的数据通知仓库解锁库存

库存解锁场景:

1、下订单成功,订单过期没有支付被系统自动取消或者用户手动取消,都要解锁库存

2、下订单成功、库存锁定成功,但是业务调用失败导致订单回滚,之前锁定的库存就自动解锁,Seata分布式事务太慢,就要用一段时间后自动解决库存。

3、订单失败,因为锁库存失败有一个商品没有锁成功,导致整个锁库存服务都回滚,

消息队列收到库存消息场景

消息队列收到消息之后

  • 如果没有查到数据库有锁定成功的数据,说明库存锁失败了,锁库存自动回滚,数据库查不到记录无需解锁
  • 如果查到有数据,就说明库存锁定成功了
  • 没有这个订单必须解锁库存
  • 有订单,订单没人支付失效了才能解锁库存

定时关闭订单实际场景


同上原理类似也是利用死信路由,订单创建后,默认放入延时队列,也就是订单的有效时间,超过这个时间没有支付或者用户主动取消都会导致订单信息进入order.release.order.queue队列,最后被释放

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
传感器 监控 物联网
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
344 3
|
7月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
76 1
|
7月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
239 2
|
25天前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
47 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
7月前
|
网络协议 Go 数据安全/隐私保护
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
441 2
|
6月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
4月前
|
消息中间件 开发工具
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
|
6月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
176 1
|
6月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制