5 张图带你理解 RocketMQ 延时消息机制

简介: 5 张图带你理解 RocketMQ 延时消息机制

大家好,我是君哥。今天来聊一聊 RocketMQ 的延时消息是怎么实现的。

延时消息是指发送到 RocketMQ 后不会马上被消费者拉取到,而是等待固定的时间,才能被消费者拉取到。

延时消息的使用场景很多,比如电商场景下关闭超时未支付的订单,某些场景下需要在固定时间后发送提示消息。

1 生产者

首先看一个生产者发送延时消息的官方示例代码:

public static void main(String[] args) throws Exception {
 // Instantiate a producer to send scheduled messages
 DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
 // Launch producer
 producer.start();
 int totalMessagesToSend = 100;
 for (int i = 0; i < totalMessagesToSend; i++) {
  Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
  // This message will be delivered to consumer 10 seconds later.
  message.setDelayTimeLevel(3);
  // Send the message
  producer.send(message);
 }
 // Shutdown producer after use.
 producer.shutdown();
}

从上面的代码可以看到,跟普通消息不一样的是,消息设置 setDelayTimeLevel 属性值,这里设置为 3,这里最终将 3 这个延时级别复制给了 DELAY 属性。

关于延时级别,可以看下面这个定义:

//MessageStoreConfig类
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

这里延时级别有 18 个,上面的示例代码中延迟级别是 3,消息会延迟 10s 后消费者才能拉取。

2 Broker 处理

2.1 写入消息

Broker 收到消息后,会将消息写入 CommitLog。在写入时,会判断消息 DELAY 属性是否大于 0。代码如下:

//CommitLog 类
if (msg.getDelayTimeLevel() > 0) {
 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
  msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
 }
 topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
 int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 // Backup real topic, queueId
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
 msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 msg.setTopic(topic);
 msg.setQueueId(queueId);
}

从上面的代码可以看到,CommitLog 写入时并没有直接写入,而是把 Topic 改为 SCHEDULE_TOPIC_XXXX,把 queueId 改为延时级别减 1。因为延时级别有 18 个,所以这里有 18 个队列。如下图:

微信图片_20221213114029.png

2.2 调度消息

延时消息写入后,会有一个调度任务不停地拉取这些延时消息,这个逻辑在类 ScheduleMessageService。这个类的初始化代码如下:

public void start() {
 if (started.compareAndSet(false, true)) {
  this.load();
  this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
  //省略部分逻辑
  for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
   Integer level = entry.getKey();
   Long timeDelay = entry.getValue();
   Long offset = this.offsetTable.get(level);
   if (null == offset) {
    offset = 0L;
   }
   if (timeDelay != null) {
    //省略部分逻辑
    this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
   }
  }
        //省略持久化的逻辑
 }
}

上面的 load() 方法会加载一个 delayLevelTable(ConcurrentHashMap类型),key 保存延时级别(从 1 开始),value 保存延时时间(单位是 ms)。

load() 方法结束后,创建了一个有 18 个核心线程的定时线程池,然后遍历 delayLevelTable,创建 18 个任务(DeliverDelayedMessageTimerTask)进行每个延时级别的任务调度。任务调度的代码逻辑如下:

public void executeOnTimeup() {
 ConsumeQueue cq =
  ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
   delayLevel2QueueId(delayLevel));
 if (cq == null) {
  this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
  return;
 }
 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
 if (bufferCQ == null) {
  //省略部分逻辑
  this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
  return;
 }
 long nextOffset = this.offset;
 try {
  int i = 0;
  ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
  for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
   long offsetPy = bufferCQ.getByteBuffer().getLong();
   int sizePy = bufferCQ.getByteBuffer().getInt();
   long tagsCode = bufferCQ.getByteBuffer().getLong();
            //省略部分逻辑
   long now = System.currentTimeMillis();
   long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
   nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
   long countdown = deliverTimestamp - now;
   if (countdown > 0) {
    this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
    return;
   }
   MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
   if (msgExt == null) {
    continue;
   }
   MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
   //事务消息判断省略
   boolean deliverSuc;
   //只保留同步
   deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
   if (!deliverSuc) {
    this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
    return;
   }
  }
  nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 } catch (Exception e) {
  log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
 } finally {
  bufferCQ.release();
 }
 this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}

这段代码可以参考下面的流程图来进行理解:

640 (1).png

上面有一个修正投递时间的函数,这个函数的意义是如果已经过了投递时间,那么立即投递。代码如下:

private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
 long result = deliverTimestamp;
 long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
 if (deliverTimestamp > maxTimestamp) {
  result = now;
 }
 return result;
}

注意:消息从 CommitLog 转发到 ConsumeQueue 时,会判断是否是延时消息(Topic = SCHEDULE_TOPIC_XXXX 并且延时级别大于 0),如果是延时消息,就会修改 tagsCode 值为消息投递的时间戳,而 tagsCode 原值是 tag 的 HashCode。代码如下:

//CommitLog类checkMessageAndReturnSize方法
if (delayLevel > 0) {
 tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
  storeTimestamp);
}

如下图:

微信图片_20221213114115.png

而 ScheduleMessageService 调度线程将消息从 ConsumeQueue 重新投递到原始队列中时,会把 tagsCode 再次修改为 tag 的 HashCode,代码如下:

//类MessageExtBrokerInner,这个方法被 messageTimeup 方法调用。
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
 if (null == tags || tags.length() == 0) { return 0; }
 return tags.hashCode();
}

如下图:

微信图片_20221213114146.png

2.3 一个问题

如果有一个业务场景,要求延时消息 3 小时才能消费,而 RocketMQ 的延时消息最大延时级别只支持延时 2 小时,怎么处理?

这里提供两个思路供大家参考:

  • 在 Broker 上修改 messageDelayLevel 的默认配置;
  • 在客户端缓存 msgId,先设置延时级别是 18(2h),当客户端拉取到消息后首先判断有没有缓存,如果有缓存则再次发送延时消息,这次延时级别是 17(1h),如果没有缓存则进行消费。

3 总结

经过上面的讲解,延时消息的处理流程如下:


最后,延时消息的延时时间并不精确,这个时间是 Broker 调度线程把消息重新投递到原始的 MessageQueue 的时间,如果发生消息积压或者 RocketMQ 客户端发生流量管控,客户端拉取到消息后进行处理的时间可能会超出预设的延时时间。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 监控
|
11月前
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
286 1
|
3月前
|
消息中间件 存储 运维
|
3月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
69 0
|
21天前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
4天前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
12 0
|
4天前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
10 0
|
4天前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
9 0
|
3月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
66 0
|
2月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。