使用RabbitMQ实现未支付订单在30分钟后自动过期

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 使用RabbitMQ实现未支付订单在30分钟后自动过期

延迟队列可以实现消息在投递到Exchange之后,经过一定的时间之后再投递到相应的Queue。再被消费者监听消费。


即:生产者投递的消息经过一段时间之后再被消费者消费。


  • 业务场景:订单在30分钟内还未支付则自动取消。


该业务的其他实现方案:


  • 使用Redis,设置过期时间,监听过期事件。
  • 使用RabbitMQ的过期队列与死信队列,设置消息的存活时间,在设置的时间内未被消费,即会投递到死信队列,我们监听死信队列即可。可参考上一篇文章RabbitMQ死信队列在SpringBoot中的使用


使用RabbitMQ延迟队列实现:


# 安装延迟队列插件:



image.png

  • 重启RabbitMQ


# 业务相关代码编写


  • 订单实体(仅保留相关字段)


image.png

image.png

image.png

订单状态枚举(仅保留相关状态)


image.png

image.png

  • OrderMapper

/**
 * @author futao
 * @date 2020/4/14.
 */
public interface OrderMapper extends BaseMapper<Order> {
}


  • 模拟下定的接口OrderController
    为了简单起见,省略了Service层.


image.png


image.png


# RabbitMQ相关代码编写


  • 配置文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: delay-vh
    connection-timeout: 15000
    # 发送确认
    publisher-confirms: true
    # 路由失败回调
    publisher-returns: true
    template:
      # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
      mandatory: true
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 手动签收ACK
        acknowledge-mode: manual
app:
  rabbitmq:
    # 延迟时长设置
    delay:
      order: 10S
    # 队列定义
    queue:
      order:
        delay: order-delay-queue
    # 交换机定义
    exchange:
      order:
        delay: order-delay-exchange
  • 延迟交换机,队列定义与绑定

/**
 * 队列,交换机定义与绑定
 * 延迟队列插件`rabbitmq-delayed-message-exchange`下载地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
 *
 * @author futao
 * @date 2020/4/10.
 */
@Configuration
public class Declare {
    /**
     * 订单队列 - 接收延迟投递的订单
     *
     * @param orderQueue 订单队列名称
     * @return
     */
    @Bean
    public Queue orderDelayQueue(@Value("${app.rabbitmq.queue.order.delay}") String orderQueue) {
        return QueueBuilder
                .durable(orderQueue)
                .build();
    }
    /**
     * 订单交换机-延迟交换机 - 消息延迟一定时间之后再投递到绑定的队列
     *
     * @param orderExchange 订单延迟交换机
     * @return
     */
    @Bean
    public Exchange orderDelayExchange(@Value("${app.rabbitmq.exchange.order.delay}") String orderExchange) {
        Map<String, Object> args = new HashMap<>(1);
        args.put("x-delayed-type", "topic");
        return new CustomExchange(orderExchange, "x-delayed-message", true, false, args);
    }
    /**
     * 订单队列-交换机 绑定
     *
     * @param orderQueue         订单队列
     * @param orderDelayExchange 订单交换机
     * @return
     */
    @Bean
    public Binding orderBinding(Queue orderDelayQueue, Exchange orderDelayExchange) {
        return BindingBuilder
                .bind(orderDelayQueue)
                .to(orderDelayExchange)
                .with("order.delay.*")
                .noargs();
    }
}


可以看出队列就是普通的队列。重点在交换机的设定上。声明延迟交换机需要设置参数x-delayed-type,值为交换机类型,可以是fanout,topic,direct。并且设置交换机的type为x-delayed-message


  • 定义完成后可以启动SpringBoot应用程序,在RabbitMQ管理后台查看Exchange和Queue。


image.png

可以看到,除了默认的交换机,SpringBoot已经帮我们创建好了延迟交换机order-delay-exchange,并且此时messages delayed为0,因为我们还未向交换机投递消息。

  • 可以继续查看交换机的路由类型与绑定的队列


image.png

队列为普通的队列


image.png

回到代码中,定义消息生产者


image.png



在消息投递之前为每条消息都设置了延迟时长setDelay()

调用消费者的代码在上面OrderController中,下定之后,订单数据落库,并且向MQ中投递延迟消息。可以回头看看。


  • 消费者-监听过期的订单信息,并且将DB中相应的订单设置为已过期。


image.png

为了方便查看到延迟投递的效果,我在消息投递和接收处都打印了日志,测试时可以看到消息投递和消息的时间间隔。


# 测试


  • 把订单过期时长设置为10S

app:
  rabbitmq:
    delay:
      order: 10S


  • 下定


image.png


可以看到,打印出了投递日志,订单主键为666ae86aabe2a1b3120b34bb5f447bbe的订单在2020-04-14 22:22:04.307进行了投递,此时数据库中该订单的状态为100,待支付。


  • 此时查看Exchange详情可以发现,messages delayed:1,即目前有一条消息处于延迟状态。


image.png


等待10S后。


image.png

可以看到OrderConsumer在10S后2020-04-14 22:22:14.320接收到了主键为666ae86aabe2a1b3120b34bb5f447bbe的订单消息。距离投递时间2020-04-14 22:22:04.307为10S。此时查看DB中订单状态:


image.png

订单状态为200已过期,且过期时间为2020-04-14 22:22:14


  • 达到了订单在我们指定的时间后过期。
  • 再测试几条一分钟的场景

app:
  rabbitmq:
    delay:
      order: 1M


image.png


image.png

消息都在延迟1分钟后投递到了队列-消费者。

建议收藏,当然我只是建议。


# 严重风险提示:


在实际业务使用中,如果消费者的消费能力比较低下,会存在已经过期的消息阻塞积压在队列,无法在指定的时间内过期,导致业务出现异常。
实际上,按照我们业务意图,队里Queue里是不应该有大量消息存在的,因为投递到过期队列的消息已经是过期了的,应该立即被消费掉。


  • 进行测试:为了降低消费者的消费能力,进行如下处理:
  1. 设置消费者的最大并发数为1,并进行手动签收。

listener:
      simple:
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        acknowledge-mode: manual
app:
  rabbitmq:
    delay:
      # 订单过期时间为1分钟
      order: 1M


  1. 消费者在处理消息时休眠5S


image.png

  1. 向MQ投递两条消息,预期两条消息都在1分钟后正常过期。
  2. 结果(去除了无关信息):


image.png


2020-04-15 20:18:05.269  OrderSender       : 订单[d6fd965b11f8db0fafb762d305db83b0]投递到MQ
2020-04-15 20:18:05.765  OrderSender       : 订单[77ceb7f1bfbbcaf627224ac75e96b0e5]投递到MQ
2020-04-15 20:19:05.279  OrderConsumer     : 消费者接收到延迟订单[d6fd965b11f8db0fafb762d305db83b0]
2020-04-15 20:19:15.316  OrderConsumer     : 订单业务处理结束.....进行消息ack签收
2020-04-15 20:19:15.318  OrderConsumer     : 消费者接收到延迟订单[77ceb7f1bfbbcaf627224ac75e96b0e5]
2020-04-15 20:19:25.330  OrderConsumer     : 订单业务处理结束.....进行消息ack签收


第一个订单d6fd965b11f8db0fafb762d305db83b0投递时间为2020-04-15 20:18:05.269。1分钟后2020-04-15 20:19:05.279接收到了通知,并且处理了10S后进行了签收ack。


第二个订单77ceb7f1bfbbcaf627224ac75e96b0e5投递时间为2020-04-15 20:18:05.765。1分钟过后并没有收到通知,而是在第一个订单处理完毕之后,2020-04-15 20:19:15.318才收到了通知,比预期的时间长了10秒,实际延迟时间为1分钟+10秒。出现了业务异常。


  • 导致这个问题的原因就是消费者无法及时消费消息并更新订单状态。所以我们在进行开发时,需要考虑实际的数据量大小,消费者消费能力。及时关注队列消息积压情况,灵活调整消费者并发数量,优化消费者代码,提高消费者消费能力。


# 系列文章



任何技术的使用都不可生搬硬套,需要结合自己实际的业务场景进行相应的调整优化。在平时的工作中应该多关注程序在实际的运行过程中的结果是否符合我们的预期


本文涉及的源代码:https://github.com/FutaoSmile/springboot-learn-integration/releases/tag/v_rabbitmq_delay_queue

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 负载均衡 Kafka
MQ消息路由大揭秘!从菜鸟到高手,一文带你玩转消息传递的‘高速公路’,轻松实现订单秒级响应!
【8月更文挑战第24天】在现代分布式系统中,消息队列(MQ)作为系统间解耦的核心工具,支持异步处理、负载均衡及高可用性。消息路由是MQ中的关键环节,决定消息从生产者到消费者的路径。主流MQ产品如RabbitMQ、Kafka等采用相似的路由机制,涉及交换器、队列、路由键等概念。常见的路由模式包括直接交换、主题交换及发布/订阅模式。以RabbitMQ为例,通过直接交换模式,可以根据订单类型(如“普通订单”、“紧急订单”)将消息路由至相应的处理队列。这一过程展示了MQ系统如何基于路由键和队列绑定关系实现消息的有效传递。
77 2
|
3月前
|
机器人 C# 人工智能
智能升级:WPF与人工智能的跨界合作——手把手教你集成聊天机器人,打造互动新体验与个性化服务
【8月更文挑战第31天】聊天机器人已成为现代应用的重要组成部分,提供即时响应、个性化服务及全天候支持。随着AI技术的发展,聊天机器人的功能日益强大,不仅能进行简单问答,还能实现复杂对话管理和情感分析。本文通过具体案例分析,展示了如何在WPF应用中集成聊天机器人,并通过示例代码详细说明其实现过程。使用Microsoft的Bot Framework可以轻松创建并配置聊天机器人,增强应用互动性和用户体验。首先,需在Bot Framework门户中创建机器人项目并编写逻辑。然后,在WPF应用中添加聊天界面,实现与机器人的交互。
98 0
|
6月前
|
消息中间件 Java Unix
MQ产品使用合集之消费订单状态,订单消费待支付消息失败,是否会导致其他订单也没法消费
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件
RabbitMQ如何设置消息过期
RabbitMQ 是一个功能强大的消息中间件,用于在分布式系统中处理和传递消息。为了提高消息传递的灵活性和效率,RabbitMQ 提供了一种消息过期的机制,可以设置消息的过期时间,这样当消息在指定时间内未被消费者消费时,会自动地从队列中删除。
223 0
|
6月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
92 0
|
6月前
|
消息中间件 数据库 RocketMQ
Springboot+RocketMQ通过事务消息优雅的实现订单支付功能
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
253 0
|
6月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
59 0
|
6月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(上)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
205 0
|
消息中间件 Java
RabbitMQ之ttl(过期消息)解读
RabbitMQ之ttl(过期消息)解读
|
消息中间件 Java