RocketMQ-延迟消息

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 这一讲,我们主要来结合业务讲讲延迟消息。

这一讲,我们主要来讲延迟消息。


这一次我们结合业务来讲。


业务背景


在电商中,下单后,有一些用户付款后,会主动退款。也有一些用户下单后,未付款。但是这部分未付款的订单,会占用着商品库存。


我们目前的电商App,下单后,会在订单表创建对应的订单数据。这些订单的状态,有一些是未付款的,但是未付款的订单占用着商品库存。为了让商品库存能正常恢复,我们现在的处理方案是:


  • 启动一个定时任务,每30分钟,定时扫描一遍订单表
  • 如果订单是已付款,则跳过,不处理
  • 如果订单是未付款,但未超过30分钟,不处理
  • 如果订单是未付款,且超过30分钟,就取消订单 (补充:取消订单,其实就是下单的逆向流程)


1.png


方案缺点


这个方案有什么缺点?


  • 第一,每次定时任务去扫描全部订单,但是订单未付款且超时30分钟的只有一小部分。就是做很多无用功。
  • 第二,如果订单表的数量超级超级大,这个时候,扫描的时间巨长,浪费cpu资源。
  • 第三,这样子频繁查询数据库,给数据库造成很多不必要的压力。


解决方案


那针对上述的缺点,我们有没有好的解决方案:


  • 第一,避免扫描全表
  • 第二,谁没付款,就去取消谁,不要做多余的动作
  • 第三,要保证近实时取消订单。(近实时:1s左右)


说了这么多,我摊牌了,不装了,就是为了引入RocketMQ的延迟消息


2.png


简单总结一下:创建订单的时候,发送一条延时30分钟的消息。到30分钟后,消费者拿到信息,再去判断订单是否已付款,如果付款就跳过不处理,没付款,那就取消订单。


这种方案:没有多余的扫描数据库操作;谁没付款,就去取消谁。多好呀!在生产上,赶紧用起来。


生产者


上面,介绍的都是方法论,下面就是具体的实操环节了。


下面,简单用一个demo介绍一下生产者


public class Producer {
    public static void main(String[] args) throws Exception{
        //生产者组
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");
        //设置nameserver
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();
        //构建消息
        Message message = new Message("delayTopic","TagA","delayMessage".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 重点:设置延迟级别
        message.setDelayTimeLevel(3);
        // 发送消息
        SendResult sendResult = producer.send(message);
        // 打印发送结果
        System.out.println("发送结果:"+sendResult);
        // 关闭生产者
        producer.shutdown();
    }
}


这里强调一下,不是延迟发送哈,是延迟消费。发送是立马就发送的,只是消费的时候,延迟30分钟。


补充知识点


延迟级别是从1开始的,不是从0开始。然后你可能会发现,最多延迟2小时。如果你想延迟3小时,对不起,RocketMQ不支持。告辞!!!


3.png


消费者


public class Consumer {
    public static void main(String[] args) throws Exception {
        // 消费者组
        DefaultMQPushConsumer consumer =  new DefaultMQPushConsumer("delay_consumer_group");
        //注册nameserver
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题
        consumer.subscribe("delayTopic","TagA");
        // 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i = 0; i < list.size(); i++) {
                    MessageExt messageExt = list.get(i);
                    String msg = new String(messageExt.getBody());
                    //这里主要打印延迟时间≈当前时间-消息的生产时间
                    System.out.println(msg+" 时间="+(System.currentTimeMillis()-messageExt.getBornTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}


总结:延迟消费者和普通的消费者相同,一毛一样。延迟消息的核心点:生产者多了一个延迟级别。


知其然知其所以然


上面,你已经知道怎么使用了。


如果面试官问你:RocketMQ的延迟消息底层原理是什么?


那你接着看下去。


4.png


看图说话。


  • 第一,生产者发送的消息,因为带了延迟级别,因此会被分发到叫SCHEDULE_TOPIC_XXXX的Topic中。里面有18的队列,一个队列对应着一个延迟级别。比如queueId=delayLevel-1。


  • 第二,定时器,每100毫秒,扫描所有延迟级别里面的延迟消息message,如果消费时间已经大于当前时间,那定时器就会把延迟消息message,发送到真正的topic(就是代码写的topic,比如上面代码的:delayTopic),根据负载均衡策略,把message发送到具体某个队列。


  • 第三,有消息后,消费者进行消息和后续处理。


上面这里,是一个总体流程图。


然后,我们对照代码,来进一步深刻认识一下。其实,就是加深理解。


第一步:生产者发送的消息到SCHEDULE_TOPIC_XXXX的topic


org.apache.rocketmq.store.CommitLog#putMessage


        //真正的topic
        String topic = msg.getTopic();
        //真正的队列Id
        int queueId = msg.getQueueId();
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 延迟级别大于0
            if (msg.getDelayTimeLevel() > 0) {
                // 如果延迟级别大于最大延迟级别,那就把延迟级别设置为最大延迟级别
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                // 延迟topicSCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 根据延迟级别,获取队列id
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 消息的topic设置为延迟topic,不是设置真正的topic
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
        省略部分封装msg的代码..
            //最后把msg追加到mappedFile上,mappedFile这个后续再讲,在这里你把它当做一个文件即可
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);


第二步:定时器扫描信息


  • 1,org.apache.rocketmq.store.schedule.ScheduleMessageService#start


public void start() {
        //通过AtomicBoolean 来确保 有且仅有一次执行start方法
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            // 遍历所有 延迟级别
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                // key为延迟级别
                Integer level = entry.getKey();
                // value 为 毫秒数
                Long timeDelay = entry.getValue();
                // 根据延迟级别 ,获取对应的offset
                Long offset = this.offsetTable.get(level);
                //
                if (null == offset) {
                    offset = 0L;
                }
                // 为每个延迟级别创建定时任务,开始执行定时任务,1S后开始执行
                if (timeDelay != null) {
                    // 第二步:具体核心执行逻辑在DeliverDelayedMessageTimerTask-->executeOnTimeup()
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
            // 延迟10秒后执行定时任务,flushDelayOffsetInterval=10s,周期也是10秒执行一次
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        //持久化每个队列的消费offset
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }


2,org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup


public void executeOnTimeup() {
            //根据延迟级别和topic:RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";来找到对应的ConsumeQueue
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
            // 消费偏移量
            long failScheduleOffset = offset;
            if (cq != null) {
                // 根据消费偏移量从消息队列中获取所有有效消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        // 遍历所有消息
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            // 获取消息的物理偏移量
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            // 获取消息的物理长度
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();
                            //当前时间
                            long now = System.currentTimeMillis();
                            //消费时间
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                            //下一个偏移量
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                            //如果消费时间<当前时间,说明应该被消费了
                            long countdown = deliverTimestamp - now;
                            if (countdown <= 0) {
                                //根据物理偏移量和长度,获取消息
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);
                                if (msgExt != null) {
                                    try {
                                        //构建真正 的消息
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        // 重新把消息发送到真正的消息队列上
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);
                                  ...省略一堆不太重要的代码
                                       }
            //这里又重新添加一个新的任务,这次是100毫秒
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }


第三步: 消费者后续处理(略)后用一张图来总结


最后用一张图来总结


5.png


好了,写完了,下期见,拜拜。


有问题的话,欢迎留言交流。


每日一问


RocketMQ不支持自定义延迟时间,那Kafka支持延迟消息吗?如果支持,支持自定义延迟时间吗?如要你实现自定义延迟时间,你会怎么实现?说说你的思路


欢迎留言


后续文章


  • RocketMQ-入门(已更新)
  • RocketMQ-架构和角色(已更新)
  • RocketMQ-消息发送(已更新)
  • RocketMQ-消费信息
  • RocketMQ-消费者的广播模式和集群模式(已更新)
  • RocketMQ-顺序消息(已更新)
  • RocketMQ-延迟消息(已更新)
  • RocketMQ-批量消息
  • RocketMQ-过滤消息
  • RocketMQ-事务消息
  • RocketMQ-消息存储
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主从复制
  • RocketMQ-刷盘机制
  • RocketMQ-幂等性
  • RocketMQ-消息重试
  • RocketMQ-死信队列...


欢迎各位入(guan)股(zhu),后续文章干货多多。


—本文转载自「大头菜技术」公众号

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列解读
RabbitMQ之延迟队列解读
|
3月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
92 0
|
4月前
|
消息中间件
RabbitMQ 实现消息队列延迟
RabbitMQ 实现消息队列延迟
126 0
|
28天前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
45 0
|
2月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
45 1
|
3月前
|
消息中间件 Docker 容器
docker构建rabbitmq并配置延迟队列插件
docker构建rabbitmq并配置延迟队列插件
46 0
|
9月前
|
消息中间件 Java
RabbitMQ实现延迟消息居然如此简单,整个插件就完事了
RabbitMQ实现延迟消息的方式有两种,一种是使用死信队列实现,另一种是使用延迟插件实现。死信队列实现我们以前曾经讲过这次我们讲个更简单的,使用延迟插件实现。
|
4月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
385 1
|
4月前
|
消息中间件 Shell 数据库
RabbitMQ之延迟消息
RabbitMQ之延迟消息
|
5月前
|
消息中间件 存储 RocketMQ
大白话-设计RocketMQ延迟消息
RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间的延迟,这一点对于有强迫症的朋友来说就比较难受,但是搞明白为什么这么设计后,就自然释怀了。

相关产品

  • 云消息队列 MQ