RocketMQ-延迟消息

本文涉及的产品
云原生网关 MSE Higress,422元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 这一讲,我们主要来结合业务讲讲延迟消息。

这一讲,我们主要来讲延迟消息。


这一次我们结合业务来讲。


业务背景


在电商中,下单后,有一些用户付款后,会主动退款。也有一些用户下单后,未付款。但是这部分未付款的订单,会占用着商品库存。


我们目前的电商App,下单后,会在订单表创建对应的订单数据。这些订单的状态,有一些是未付款的,但是未付款的订单占用着商品库存。为了让商品库存能正常恢复,我们现在的处理方案是:


  • 启动一个定时任务,每30分钟,定时扫描一遍订单表
  • 如果订单是已付款,则跳过,不处理
  • 如果订单是未付款,但未超过30分钟,不处理
  • 如果订单是未付款,且超过30分钟,就取消订单 (补充:取消订单,其实就是下单的逆向流程)


1.png


方案缺点


这个方案有什么缺点?


  • 第一,每次定时任务去扫描全部订单,但是订单未付款且超时30分钟的只有一小部分。就是做很多无用功。
  • 第二,如果订单表的数量超级超级大,这个时候,扫描的时间巨长,浪费cpu资源。
  • 第三,这样子频繁查询数据库,给数据库造成很多不必要的压力。


解决方案


那针对上述的缺点,我们有没有好的解决方案:


  • 第一,避免扫描全表
  • 第二,谁没付款,就去取消谁,不要做多余的动作
  • 第三,要保证近实时取消订单。(近实时:1s左右)


说了这么多,我摊牌了,不装了,就是为了引入RocketMQ的延迟消息


2.png


简单总结一下:创建订单的时候,发送一条延时30分钟的消息。到30分钟后,消费者拿到信息,再去判断订单是否已付款,如果付款就跳过不处理,没付款,那就取消订单。


这种方案:没有多余的扫描数据库操作;谁没付款,就去取消谁。多好呀!在生产上,赶紧用起来。


生产者


上面,介绍的都是方法论,下面就是具体的实操环节了。


下面,简单用一个demo介绍一下生产者


public class Producer {
    public static void main(String[] args) throws Exception{
        //生产者组
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");
        //设置nameserver
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();
        //构建消息
        Message message = new Message("delayTopic","TagA","delayMessage".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 重点:设置延迟级别
        message.setDelayTimeLevel(3);
        // 发送消息
        SendResult sendResult = producer.send(message);
        // 打印发送结果
        System.out.println("发送结果:"+sendResult);
        // 关闭生产者
        producer.shutdown();
    }
}


这里强调一下,不是延迟发送哈,是延迟消费。发送是立马就发送的,只是消费的时候,延迟30分钟。


补充知识点


延迟级别是从1开始的,不是从0开始。然后你可能会发现,最多延迟2小时。如果你想延迟3小时,对不起,RocketMQ不支持。告辞!!!


3.png


消费者


public class Consumer {
    public static void main(String[] args) throws Exception {
        // 消费者组
        DefaultMQPushConsumer consumer =  new DefaultMQPushConsumer("delay_consumer_group");
        //注册nameserver
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题
        consumer.subscribe("delayTopic","TagA");
        // 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i = 0; i < list.size(); i++) {
                    MessageExt messageExt = list.get(i);
                    String msg = new String(messageExt.getBody());
                    //这里主要打印延迟时间≈当前时间-消息的生产时间
                    System.out.println(msg+" 时间="+(System.currentTimeMillis()-messageExt.getBornTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}


总结:延迟消费者和普通的消费者相同,一毛一样。延迟消息的核心点:生产者多了一个延迟级别。


知其然知其所以然


上面,你已经知道怎么使用了。


如果面试官问你:RocketMQ的延迟消息底层原理是什么?


那你接着看下去。


4.png


看图说话。


  • 第一,生产者发送的消息,因为带了延迟级别,因此会被分发到叫SCHEDULE_TOPIC_XXXX的Topic中。里面有18的队列,一个队列对应着一个延迟级别。比如queueId=delayLevel-1。


  • 第二,定时器,每100毫秒,扫描所有延迟级别里面的延迟消息message,如果消费时间已经大于当前时间,那定时器就会把延迟消息message,发送到真正的topic(就是代码写的topic,比如上面代码的:delayTopic),根据负载均衡策略,把message发送到具体某个队列。


  • 第三,有消息后,消费者进行消息和后续处理。


上面这里,是一个总体流程图。


然后,我们对照代码,来进一步深刻认识一下。其实,就是加深理解。


第一步:生产者发送的消息到SCHEDULE_TOPIC_XXXX的topic


org.apache.rocketmq.store.CommitLog#putMessage


        //真正的topic
        String topic = msg.getTopic();
        //真正的队列Id
        int queueId = msg.getQueueId();
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 延迟级别大于0
            if (msg.getDelayTimeLevel() > 0) {
                // 如果延迟级别大于最大延迟级别,那就把延迟级别设置为最大延迟级别
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                // 延迟topicSCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 根据延迟级别,获取队列id
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 消息的topic设置为延迟topic,不是设置真正的topic
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
        省略部分封装msg的代码..
            //最后把msg追加到mappedFile上,mappedFile这个后续再讲,在这里你把它当做一个文件即可
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);


第二步:定时器扫描信息


  • 1,org.apache.rocketmq.store.schedule.ScheduleMessageService#start


public void start() {
        //通过AtomicBoolean 来确保 有且仅有一次执行start方法
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            // 遍历所有 延迟级别
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                // key为延迟级别
                Integer level = entry.getKey();
                // value 为 毫秒数
                Long timeDelay = entry.getValue();
                // 根据延迟级别 ,获取对应的offset
                Long offset = this.offsetTable.get(level);
                //
                if (null == offset) {
                    offset = 0L;
                }
                // 为每个延迟级别创建定时任务,开始执行定时任务,1S后开始执行
                if (timeDelay != null) {
                    // 第二步:具体核心执行逻辑在DeliverDelayedMessageTimerTask-->executeOnTimeup()
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
            // 延迟10秒后执行定时任务,flushDelayOffsetInterval=10s,周期也是10秒执行一次
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        //持久化每个队列的消费offset
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }


2,org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup


public void executeOnTimeup() {
            //根据延迟级别和topic:RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";来找到对应的ConsumeQueue
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
            // 消费偏移量
            long failScheduleOffset = offset;
            if (cq != null) {
                // 根据消费偏移量从消息队列中获取所有有效消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        // 遍历所有消息
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            // 获取消息的物理偏移量
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            // 获取消息的物理长度
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();
                            //当前时间
                            long now = System.currentTimeMillis();
                            //消费时间
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                            //下一个偏移量
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                            //如果消费时间<当前时间,说明应该被消费了
                            long countdown = deliverTimestamp - now;
                            if (countdown <= 0) {
                                //根据物理偏移量和长度,获取消息
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);
                                if (msgExt != null) {
                                    try {
                                        //构建真正 的消息
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        // 重新把消息发送到真正的消息队列上
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);
                                  ...省略一堆不太重要的代码
                                       }
            //这里又重新添加一个新的任务,这次是100毫秒
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }


第三步: 消费者后续处理(略)后用一张图来总结


最后用一张图来总结


5.png


好了,写完了,下期见,拜拜。


有问题的话,欢迎留言交流。


每日一问


RocketMQ不支持自定义延迟时间,那Kafka支持延迟消息吗?如果支持,支持自定义延迟时间吗?如要你实现自定义延迟时间,你会怎么实现?说说你的思路


欢迎留言


后续文章


  • RocketMQ-入门(已更新)
  • RocketMQ-架构和角色(已更新)
  • RocketMQ-消息发送(已更新)
  • RocketMQ-消费信息
  • RocketMQ-消费者的广播模式和集群模式(已更新)
  • RocketMQ-顺序消息(已更新)
  • RocketMQ-延迟消息(已更新)
  • RocketMQ-批量消息
  • RocketMQ-过滤消息
  • RocketMQ-事务消息
  • RocketMQ-消息存储
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主从复制
  • RocketMQ-刷盘机制
  • RocketMQ-幂等性
  • RocketMQ-消息重试
  • RocketMQ-死信队列...


欢迎各位入(guan)股(zhu),后续文章干货多多。


—本文转载自「大头菜技术」公众号

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
808 0
|
消息中间件
RabbitMQ 实现消息队列延迟
RabbitMQ 实现消息队列延迟
523 0
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
4532 104
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
350 0
说说RabbitMQ延迟队列实现原理?
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
607 0
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
635 1
|
消息中间件 JavaScript RocketMQ
消息队列 MQ产品使用合集之是否支持任意时间延迟的消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 云消息队列 MQ