RocketMQ延迟消息的代码实战及原理分析

简介: 在RocketMQ中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。

RocketMQ简介

RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠、万亿级容量、灵活可伸缩的消息发布与订阅服务。

它前身是MetaQ,是阿里基于Kafka的设计使用Java进行自主研发的。在2012年,阿里将其开源, 在2016年,阿里将其捐献给Apache软件基金会(Apache Software Foundation,简称为ASF),正式成为孵化项目。2017 年,Apache软件基金会宣布RocketMQ已孵化成为 Apache顶级项目(Top Level Project,简称为TLP ),是国内首个互联网中间件在 Apache上的顶级项目。

延迟消息

生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息

在RocketMQ中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。

消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。在发送消息时,设置消息延迟级别即可,设置消息延迟级别时有以下3种情况:

  1. 设置消息延迟级别等于0时,则该消息为非延迟消息。
  2. 设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
  3. 设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。

延迟消息示例

首先,写一个消费者,用于消费延迟消息:

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");

        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");

        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("OneMoreTopic", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s %s Receive New Messages:%n"
                    , sdf.format(new Date())
                    , Thread.currentThread().getName());
            for (MessageExt msg : msgs) {
                System.out.printf("\tMsg Id: %s%n", msg.getMsgId());
                System.out.printf("\tBody: %s%n", new String(msg.getBody()));
            }
            // 标记该消息已经被成功消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 启动消费者实例
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

再写一个延迟消息的生产者,用于发送延迟消息:

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");

        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();

        Message msg = new Message("OneMoreTopic"
                , "DelayMessage", "This is a delay message.".getBytes());

        //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        //设置消息延迟级别为3,也就是延迟10s。
        msg.setDelayTimeLevel(3);

        // 发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 通过sendResult返回消息是否成功送达
        System.out.printf("%s Send Status: %s, Msg Id: %s %n"
                , sdf.format(new Date())
                , sendResult.getSendStatus()
                , sendResult.getMsgId());

        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

运行生产者以后,就会发送一条延迟消息:

10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000

10秒钟后,消费者收到的这条延迟消息:

10:37:25.026 ConsumeMessageThread_1 Receive New Messages:
    Msg Id: C0A8006D5AB018B4AAC216E0DB690000
    Body: This is a delay message.

延迟消息的原理分析

以下分析的RocketMQ源码的版本号是4.7.1,版本不同源码略有差别。

CommitLog

org.apache.rocketmq.store.CommitLog中,针对延迟消息做了一些处理:

// 延迟级别大于0,就是延时消息
if (msg.getDelayTimeLevel() > 0) {
    // 判断当前延迟级别,如果大于最大延迟级别,
    // 就设置当前延迟级别为最大延迟级别。
    if (msg.getDelayTimeLevel() > this.defaultMessageStore
            .getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore
                .getScheduleMessageService().getMaxDelayLevel());
    }

    // 获取延迟消息的主题,
    // 其中RMQ_SYS_SCHEDULE_TOPIC的值为SCHEDULE_TOPIC_XXXX
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    // 根据延迟级别获取延迟消息的队列Id,
    // 队列Id其实就是延迟级别减1
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    // 备份真正的主题和队列Id
    MessageAccessor.putProperty(msg
            , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg
            , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    // 设置延时消息的主题和队列Id
    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

可以看到,每一个延迟消息的主题都被暂时更改为SCHEDULE_TOPIC_XXXX,并且根据延迟级别延迟消息变更了新的队列Id。接下来,处理延迟消息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService

ScheduleMessageService

ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore进行初始化的,初始化包括构造对象和调用load方法。最后,再执行ScheduleMessageService的start方法:

public void start() {
    // 使用AtomicBoolean确保start方法仅有效执行一次
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        // 遍历所有延迟级别
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            // key为延迟级别
            Integer level = entry.getKey();
            // value为延迟级别对应的毫秒数
            Long timeDelay = entry.getValue();
            // 根据延迟级别获得对应队列的偏移量
            Long offset = this.offsetTable.get(level);
            // 如果偏移量为null,则设置为0
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                // 为每个延迟级别创建定时任务,
                // 第一次启动任务延迟为FIRST_DELAY_TIME,也就是1秒
                this.timer.schedule(
                        new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        // 延迟10秒后每隔flushDelayOffsetInterval执行一次任务,
        // 其中,flushDelayOffsetInterval默认配置也为10秒
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    // 持久化每个队列消费的偏移量
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore
            .getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

遍历所有延迟级别,根据延迟级别获得对应队列的偏移量,如果偏移量不存在,则设置为0。然后为每个延迟级别创建定时任务,第一次启动任务延迟为1秒,第二次及以后的启动任务延迟才是延迟级别相应的延迟时间。

然后,又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由flushDelayOffsetInterval属性进行配置,默认为10秒。

定时任务

ScheduleMessageService的start方法执行之后,每个延迟级别都创建自己的定时任务,这里的定时任务的具体实现就在DeliverDelayedMessageTimerTask类之中,它核心代码是executeOnTimeup方法之中,我们来看一下主要部分:

// 根据主题和队列Id获取消息队列
ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
                TopicValidator.RMQ_SYS_SCHEDULE_TOPIC
                , delayLevel2QueueId(delayLevel));

如果没有获取到对应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:

// 根据消费偏移量从消息队列中获取所有有效消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

如果没有获取到有效消息,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:

// 遍历所有消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    // 获取消息的物理偏移量
    long offsetPy = bufferCQ.getByteBuffer().getLong();
    // 获取消息的物理长度
    int sizePy = bufferCQ.getByteBuffer().getInt();
    long tagsCode = bufferCQ.getByteBuffer().getLong();
    
    // 省略部分代码...

    long now = System.currentTimeMillis();
    // 计算消息应该被消费的时间
    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
    // 计算下一条消息的偏移量
    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)

    long countdown = deliverTimestamp - now;
    // 省略部分代码...
}

如果当前消息不到消费的时间,则在countdown毫秒后再执行任务。如果到消费的时间,就继续执行下面操作:

// 根据消息的物理偏移量和大小获取消息
MessageExt msgExt =
    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
            offsetPy, sizePy);

如果获取到消息,则继续执行下面操作:

// 重新构建新的消息,包括:
// 1.清除消息的延迟级别
// 2.恢复真正的消息主题和队列Id
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
    log.error("[BUG] the real topic of schedule msg is {},"
                    + " discard the msg. msg={}",
            msgInner.getTopic(), msgInner);
    continue;
}
// 重新把消息发送到真正的消息队列上
PutMessageResult putMessageResult =
        ScheduleMessageService.this.writeMessageStore
                .putMessage(msgInner);

清除了消息的延迟级别,并且恢复了真正的消息主题和队列Id,重新把消息发送到真正的消息队列上以后,消费者就可以立即消费了。

总结

经过以上对源码的分析,可以总结出延迟消息的实现步骤:

  1. 如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
  2. 消息进入SCHEDULE_TOPIC_XXXX的队列中。
  3. 定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
  4. 根据消息的物理偏移量和大小再次获取消息。
  5. 根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
  6. 重新发送消息到原主题的队列中,供消费者进行消费。
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
631 1
|
6月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
6月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
6月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
4月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
3270 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
5月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
6月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
6月前
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
|
6月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
|
6月前
|
消息中间件 存储 Kafka
RocketMQ实战—4.消息零丢失的方案
本文分析了用户支付完成后未收到红包的问题,深入探讨了RocketMQ事务消息机制的实现原理及其在确保消息零丢失中的作用。首先,通过全链路分析发现消息可能在推送、存储或消费环节丢失。接着,介绍了RocketMQ事务消息机制如何通过half消息、本地事务执行及回调确认来保证消息发送成功,并详细解析了其底层原理,如half消息对消费者不可见、rollback与commit操作等。同时,对比了同步重试方案,指出其在复杂场景下的局限性。
RocketMQ实战—4.消息零丢失的方案