RocketMQ延时消息的原理与实现

简介: 本文分享了RocketMQ的延时消息的原理和实现,手把手带你从源码角度了解到内部实现机制。

前面给大家分享了Rocket大体架构设计和Spring快速集成RocketMQ。看了前面的文章的小伙伴把RocketMQ集成进项目以及发送消息和消费消息问题应该不大,有问题的可以私信我一起学习解决问题。

今天给大家梳理下RocketMQ的延时消息如何使用以及如何实现的,包括我的一些改进想法,是不是有点飘了居然想修改RocketMQ的源码。

延时消息基本概念

延时消息:顾名思义就是消息不是实时处理的,可以在延时设置时候后消息才能被消费者消费。

以下使用场景不一定使用延时消息是最好的方案,但延时消息是适用于以下场景的:

1、30分钟取消订单,商城用户下单后如果在设定时候后还没支付就将订单状态置为取消,并恢复商品库存

2、超时自动审批,有的系统审批流程可以设置为超过设定时间后自动执行通过或者拒绝流程

3、短信或提醒,比如注册账号后三天不登陆就发短信提醒

RocketMq如何使用延时消息

Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) );
// 设置延时消息的级别
msg.setDelayTimeLevel(2);

是不是发现了RocketMQ发送延时消息非常简单,只需要在消息上设置delayTimeLevel属性即可。但是看属性的字面意思可以发现这个属性值好像不是具体多少秒,感觉像是一个等级的意思,那这个延时时间等级是怎么样个效果呢?请接着往下看

RocketMQ延时消息等级

RocketMQ的延时消息其实并非是精确的一个时间,而是采用延时等级来定义的。在MessageStoreConfig类上有对这个延时消息等级的定义,定义了18个等级最低1s,最长2h

// RocketMQ延时消息的定义
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

我们看看RocketMQ是如何使用这18个等级来实现延时消息的。

Broker初始化流程-延时消息部分

在Broker初始化流程中有这样的逻辑,流程如下

解析18个时间等级

这里解析了18个等级为秒然后放入到delayLevelTable中存储起来。

Broker启动流程-延时消息读部分

接着来看看延时任务在Broker启动的时候怎么创建的,可以看到每个等级都创建了一个定时任务,初始设定都是秒调度执行一次

每个等级创建一个定时任务

那关键执行逻辑就是DeliverDelayedMessageTimerTask的run方法了

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

//有这样的一个代码,可以看到RocketMQ把延时消息都放到了SCHEDULE_TOPIC_XXXX队列按照level分到了不同的queue。这个Topic是内部写死的
ConsumeQueue cq =
             ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
//然后从这个对象中根据offset来取buffer,主要是用来判断当前offset的消息时间戳和当前时间戳作对比
  
  // 在executeOnTimeup这个方法的中间部分
 long now = System.currentTimeMillis();
 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 long countdown = deliverTimestamp - now;
 if (countdown <= 0) {
        // 如果时间戳比当前时间戳小证明消息到期了
   
   // 会调用这个方法把真实的Topic设置到消息上
   # DeliverDelayedMessageTimerTask#messageTimeup
   ...
   // 组装真实的topic消息
   MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
   
   // 写到真实的消息commitLog中
   PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);
 } else {
   // 如果没到期则进行下一轮检测
 }

延时消息写入commitLog流程

当我们需要发送延时消息时我们会在message上设置setDelayTimeLevel(), 那从上面读取部分我们反推下写入流程,应该是在存储时判断消息体上是否有延时消息的level属性,如果有就应该把消息放在Topic为SCHEDULE_TOPIC_XXXX的队列中,我们来看看是否是我们猜想的呢?

当Broker启动时会作为服务器启动netty,当有接到消息时会触发NettyServerHandler,跟踪下发送消息的流程

org.apache.rocketmq.store.CommitLog#asyncPutMessage

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // Delay Delivery
    if (msg.getDelayTimeLevel() > 0) {
        
                // 这里将topic设置为SCHEDULE_TOPIC_XXXX
        topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
        // queueId设置为 msg.getDelayTimeLevel()-1
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
                
        // Backup real topic, queueId
        // 将真实的topic存储起来,以便在消息到期时替换为真实的topic发送到对应的队列上去
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
}

小结

从上面的代码我们可以梳理出RocketMQ为我们内置了18个延时,同时在启动Broker时启动18个定时任务分别扫描各自等级的消息逻辑。当需要延时消息时可在发送时在消息上指定等级,消息到达Broker后将topic替换为SCHEDULE_TOPIC_XXXX,将真实的topic存放在propertis中。当定时器定时扫描各自等级的队列,如果根据offset取出有消息并判断时间戳消息当前时间戳就将topic替换为真实topic发送的对应topic的队列中。

从上面的逻辑我们可以了解到开源版RocketMQ的延时消息其实并不是精准延时的。商业版的RocketMQ是阿里云的ons,ons是能满足精准延时的。滴滴也基于开源版的RocketMQ开发出了精准延时的模块,有兴趣的可以去了解下 DDMQ

本次分享就到这了,关于commitLog本篇文章梳理的不是很清楚的,下次咱们专门针对RocketMQ的存储再梳理下,好了本篇关于开源版RocketMQ的延时消息实现梳理对你有帮助可以关注下我 Java极客帮

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
2月前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
14天前
|
消息中间件 中间件 Kafka
MQ四兄弟:如何实现延时消息
本文介绍了几种常见的消息队列系统(RabbitMQ、RocketMQ、Kafka和Pulsar)实现延时消息的方式。RabbitMQ通过死信队列或延时插件实现;RocketMQ内置延时消息支持,可通过设置`delayTimeLevel`属性实现;Kafka不直接支持延时消息,但可以通过时间戳、延时Topic、Kafka Streams等方法间接实现;Pulsar自带延时消息功能,提供`deliverAfter`和`deliverAt`两种方式。每种方案各有优劣,适用于不同的应用场景。
42 0
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
5月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
5月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
5月前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
5月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
6月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
6月前
|
消息中间件 存储 缓存
MetaQ/RocketMQ 原理问题之Consume queue中的条目长度是固定的问题如何解决
MetaQ/RocketMQ 原理问题之Consume queue中的条目长度是固定的问题如何解决