弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!

简介: 弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!

大家好,我是君哥。

在 RocketMQ 4.x 版本,使用延时消息来实现消息的定时消费。延时消息可以一定程度上实现定时发送,但是有一些局限。


RocketMQ 新版本基于时间轮算法引入了定时消息,目前,精确到秒级的定时消息实现的 pr 已经提交到社区,今天来介绍一下。

1 延时消息

1.1 简介

RocketMQ 的延时消息是指 Producer 发送消息后,Consumer 不会立即消费,而是需要等待固定的时间才能消费。在一些场景下,延时消息是很有用的,比如电商场景下关闭 30 分钟内未支付的订单。


使用延时消息非常简单,只需要给消息的 delayTimeLevel 属性赋值就可以。参考下面代码:

Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//第 3 个级别,10s
message.setDelayTimeLevel(3);
producer.send(message);

延时消息有 18 个级别,如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

1.2 实现原理

延时消息的实现原理如下图:

微信图片_20221213122538.png

Producer 把消息发送到 Broker 后,Broker 判断到是延时消息,首先会把消息投递到延时队列(Topic = SCHEDULE_TOPIC_XXXX,queueId = delayTimeLevel - 1)。定时任务线程池会有 18 个线程来对延时队列进行调度,每个线程调度一个延时级别,调度任务把延时消息再投递到原始队列,这样 Consumer 就可以拉取到了。

1.3 存在不足

延时消息存在着一些不足:

1.延时级别只有 18 个,并不能满足所有场景;

2.如果通过修改 messageDelayLevel 配置来自定义延时级别,并不灵活,比如一个在大规模的平台上,延时级别成百上千,而且随时可能增加新的延时时间;

3.延时时间不准确,后台的定时线程可能会因为处理消息量大导致延时误差大。

2 定时消息

为了弥补延时消息的不足,RocketMQ 5.0 引入了定时消息。

2.1 时间轮算法

为了解决定时任务队列遍历任务导致的性能开销,RocketMQ 定时消息引入了秒级的时间轮算法。如下图:

微信图片_20221213122602.png

图中是一个 60s 的时间轮,时间轮上会有一个指向当前时间的指针定时地移动到下一个时间(秒级)。


时间轮算法的优势是不用去遍历所有的任务,每一个时间节点上的任务用链表串起来,当时间轮上的指针移动到当前的时间时,这个时间节点上的全部任务都执行。


虽然上面只是一个 60s 的时间轮,但是对于所有的时间延时,都是支持的。可以在每个时间节点增加一个 round 字段,记录时间轮转动的圈数,比如对于延时 130s 的任务,round 就是 2,放在第 10 个时间刻度的链表中。这样当时间轮转到一个节点,执行节点上的任务时,首先判断 round 是否等于 0,如果等于 0,则把这个任务从任务链表中移出交给异步线程执行,否则将 round 减 1 继续检查后面的任务。

2.2 使用方式

基于时间轮算法的思想,RocketMQ 实现了精准的定时消息。使用 RocketMQ 定时消息时,客户端定义消息的示例代码如下:

org.apache.rocketmq.common.message.Message messageExt = this.sendMessageActivity.buildMessage(null,
 Lists.newArrayList(
  Message.newBuilder()
   .setTopic(Resource.newBuilder()
    .setName(TOPIC)
    .build())
   .setSystemProperties(SystemProperties.newBuilder()
    .setMessageId(msgId)
    .setQueueId(0)
    .setMessageType(MessageType.DELAY)
    .setDeliveryTimestamp(Timestamps.fromMillis(deliveryTime))
    //定义消息投递时间
    .setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
    .setBornHost(StringUtils.defaultString(RemotingUtil.getLocalAddress(), "127.0.0.1:1234"))
    .build())
   .setBody(ByteString.copyFromUtf8("123"))
   .build()
 ),
Resource.newBuilder().setName(TOPIC).build()).get(0);

2.3 实现原理

2.3.1 消息投递

上面的代码构中,Producer 创建消息时给消息传了一个系统属性 deliveryTimestamp,这个属性指定了消息投递的时间,并且封装到消息的 TIMER_DELIVER_MS 属性,代码如下:

protected void fillDelayMessageProperty(apache.rocketmq.v2.Message message, org.apache.rocketmq.common.message.Message messageWithHeader) {
 if (message.getSystemProperties().hasDeliveryTimestamp()) {
  Timestamp deliveryTimestamp = message.getSystemProperties().getDeliveryTimestamp();
  //delayTime 这个延时时间默认不能超过 1 天,可以配置
  long deliveryTimestampMs = Timestamps.toMillis(deliveryTimestamp);
  validateDelayTime(deliveryTimestampMs);
        //...
  String timestampString = String.valueOf(deliveryTimestampMs);
  //MessageConst.PROPERTY_TIMER_DELIVER_MS="TIMER_DELIVER_MS"
  MessageAccessor.putProperty(messageWithHeader, MessageConst.PROPERTY_TIMER_DELIVER_MS, timestampString);
 }
}

Broker 收到这个消息后,如果判断到 TIMER_DELIVER_MS 这个属性有值,就会把这个消息投递到 Topic 是 rmq_sys_wheel_timer 的队列中,queueId 是 0,同时会保存原始消息的 Topic、queueId、投递时间(TIMER_OUT_MS)。

TimerMessageStore 中有个定时任务 TimerEnqueueGetService 会从 rmq_sys_wheel_timer 这个 Topic 中读取消息,然后封装 TimerRequest 请求并放到队列 enqueuePutQueue。

2.3.2 绑定时间轮

RocketMQ 使用 TimerLog 来保存消息的原始数据绑定到时间轮上。首先看一下 TimerLog 保存的数据结构,如下图:

微信图片_20221213122628.png

参考下面代码:

//TimerMessageStore类
ByteBuffer tmpBuffer = timerLogBuffer;
tmpBuffer.clear();
tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
tmpBuffer.putLong(slot.lastPos); //prev pos
tmpBuffer.putInt(magic); //magic
tmpBuffer.putLong(tmpWriteTimeMs); //currWriteTime
tmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTime
tmpBuffer.putLong(offsetPy); //offset
tmpBuffer.putInt(sizePy); //size
tmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topic
tmpBuffer.putLong(0); //reserved value, just set to 0 now
long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
if (-1 != ret) {
 // If it's a delete message, then slot's total num -1
 // TODO: check if the delete msg is in the same slot with "the msg to be deleted".
 timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
  isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
}

TimerEnqueuePutService 这个定时任务从上面的 enqueuePutQueue(2.3.1节) 取出 TimerRequest 然后封装成  TimerLog。


那时间轮是怎么跟 TimerLog 关联起来的呢?RocketMQ 使用 TimerWheel 来描述时间轮,TimerWheel 中每一个时间节点是一个 Slot,Slot 保存了这个延时时间的 TimerLog 信息。数据结构如下图:

微信图片_20221213122655.png

参考下面代码:

//类 TimerWheel
public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) {
 localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
 localBuffer.get().putLong(timeMs / precisionMs);
 localBuffer.get().putLong(firstPos);
 localBuffer.get().putLong(lastPos);
 localBuffer.get().putInt(num);
 localBuffer.get().putInt(magic);
}

这样时间轮跟 TimerLog 就关联起来了,见下图:

微信图片_20221213122715.png

如果时间轮的一个时间节点(Slot)上有一条新的消息到来,那只要新建一个 TimerLog,然后把它的指针指向该时间节点的最后一个 TimerLog,然后把 Slot 的 lastPos 属性指向新建的这个 TimerLog,如下图:

微信图片_20221213122734.png

从源码上看,RocketMQ 定义了一个 7 天的以秒为单位的时间轮。

2.3.3 时间轮转动

转动时间轮时,TimerDequeueGetService 这个定时任务从当前时间节点(Slot)对应的 TimerLog 中取出数据,封装成 TimerRequest 放入 dequeueGetQueue 队列。

2.3.4 CommitLog 中读取消息

定时任务 TimerDequeueGetMessageService 从队列 dequeueGetQueue 中拉取 TimerRequest 请求,然后根据 TimerRequest 中的参数去 CommitLog(MessageExt) 中查找消息,查出后把消息封装到 TimerRequest 中,然后把 TimerRequest 写入 dequeuePutQueue 这个队列。

2.3.5 写入原队列

定时任务 TimerDequeuePutMessageService 从 dequeuePutQueue 队列中获取消息,把消息转换成原始消息,投入到原始队列中,这样消费者就可以拉取到了。

3 总结

RocketMQ 4.x 版本只支持延时消息,有一些局限性。而 RocketMQ 新版本引入了定时消息,弥补了延时消息的不足。定时消息的处理流程如下图:

微信图片_20221213122757.png

可以看到,RocketMQ 的定时消息的实现还是有一定复杂度的,这里用到 5 个定时任务和 3 个队列来实现。


最后,对于定时时间的定义,客户端、Broker 和时间轮的默认最大延时时间定义是不同的,使用的时候需要注意。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
72 0
|
消息中间件 运维 算法
RabbitMQ高阶使用延时任务
RabbitMQ高阶使用延时任务
175 0
|
消息中间件 弹性计算 网络安全
消息队列RocketMQ版:定时消息通知功能体验
本实验场景介绍消息队列RocketMQ版的定时(延时)消息收发功能,体验发送若干条自定义延迟触发的消息,观察消息是否按照预期的投递时间投递。
|
消息中间件 SQL 存储
微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
144 0
|
消息中间件 Kafka
MQ 学习日志(八) 消息队列的延时以及过期失效问题处理
消息队列的延时以及过期失效问题处理
322 0
|
消息中间件 Shell RocketMQ
RocketMQ进阶-延时消息
RocketMQ进阶-延时消息
1124 0
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
470 0
RocketMQ  消息集成:多类型业务消息——定时消息
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
1121 1
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
339 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
下一篇
DataWorks