RocketMQ 位移提交源码分析

简介: RocketMQ 消息消费进度是如何提交的,并发消费的时候,一次从 一个队列拉 32 条消息,这 32 条消息会提交到线程池中处理,如果偏移量 m5 比 m4 先执行完成,消息消费后,提交的消费进度是哪个?是提交消息 m5 的偏移量?

向大家提个问题:


RocketMQ 消息消费进度是如何提交的,并发消费的时候,一次从 一个队列拉 32 条消息,这 32 条消息会提交到线程池中处理,如果偏移量  m5 比 m4 先执行完成,消息消费后,提交的消费进度是哪个?是提交消息 m5 的偏移量?


下面跟着我的节奏,撸一波源码。


RocketMQ 每次拉取完消息都会将消息存储到 PullRequest 对象中的 ProcessQueue 中:


org.apache.rocketmq.client.consumer.PullCallback#onSuccess

boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());


接着将消息放进消费线程中去执行:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
  pullResult.getMsgFoundList(), //
  processQueue, //
  pullRequest.getMessageQueue(), //
  dispathToConsume);


ConsumeMessageService 类实现消息消费的逻辑,它有两个实现类:

// 并发消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 顺序消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;


这里我们只分析并发消费:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
  this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
  // ...
}


将消息消费任务封装成 ConsumeRequest 对象,然后将其交给消费线程池中去执行。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:

if (!processQueue.isDropped()) {
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}


ConsumeRequest 是一个实现了 Runnable 的类,因此消息消费的核心逻辑都写在了 run 方法中,如上代码是提交已消费位移的逻辑,当 ProcessQueue 没有被丢弃,则进行已消费位移的提交。


org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult

// 移除已消费的消息,并返回已消费的
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}


移除已消费的位移,并返回最小位移量,如果最小位移量大于 0,并且 ProcessQueue 没有被丢弃,则更新本地缓存,


org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage

public long removeMessage(final List<MessageExt> msgs) {
  long result = -1;
  final long now = System.currentTimeMillis();
  try {
    this.lockTreeMap.writeLock().lockInterruptibly();
    this.lastConsumeTimestamp = now;
    try {
      if (!msgTreeMap.isEmpty()) {
        result = this.queueOffsetMax + 1;
        int removedCnt = 0;
        // 移除已消费的消息
        for (MessageExt msg : msgs) {
          MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
          if (prev != null) {
            removedCnt--;
          }
        }
        // 消息总量累加
        msgCount.addAndGet(removedCnt);
        // 返回消息容器中最小元素 key
        if (!msgTreeMap.isEmpty()) {
          result = msgTreeMap.firstKey();
        }
      }
    } finally {
      this.lockTreeMap.writeLock().unlock();
    }
  } catch (Throwable t) {
    log.error("removeMessage exception", t);
  }
  return result;
}


以上方法就是解答文章开头问题的关键,由于该方法是各个消费线程并发执行,因此需要对其进行加锁操作,msgTreeMap 是 ProcessQueue 的消息容器,它的格式如下:


private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();


它是一个 TreeMap 结构,key 为消息位移,value 为消息数据,消息容器中,消息可以按照位移进行排序,那也就意味着,当消息消费完,只需要在消息容器中移除即可,然后返回消息容器中最小元素(最小位移),如下:

640.jpg


由于消息是按照位移进行排序,因此我们只需移除已消费的消息,并且确保不会将未消费的位移提交,就可避免了位移大的消息先消费导致消息丢失的问题了。


org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffset:

public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
    if (mq != null) {
        AtomicLong offsetOld = this.offsetTable.get(mq);
        if (null == offsetOld) {
            offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
        }
        if (null != offsetOld) {
            if (increaseOnly) {
                MixAll.compareAndIncreaseOnly(offsetOld, offset);
            } else {
                offsetOld.set(offset);
            }
        }
    }
}


offsetTable 为本地位移缓存容器,它的结构如下:

private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<>();


它是一个 ConcurrentMap,一个线程安全容器,key 为 MessageQueue,value 为当前 MessageQueue 的消费位移,从源码看出,当前消费位移的更新,只能是递增更新。


在更新完本地缓存之后,RocketMQ 是如何将其提交到 broker 的呢?


org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);


以上,消费者在启动的时候,开启了一个定时任务,定时将本地缓存提交到broker。

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persistAll:

// 参数mqs是当前分配的队列
public void persistAll(Set<MessageQueue> mqs) {
  if (null == mqs || mqs.isEmpty())
    return;
  final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
  if (!mqs.isEmpty()) {
    // 遍历位移缓存容器
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
      MessageQueue mq = entry.getKey();
      AtomicLong offset = entry.getValue();
      if (offset != null) {
        // 位移缓存容器包含在当前分配队列,则进行消费位移提交
        if (mqs.contains(mq)) {
          try {
            // 提交消费位移
            this.updateConsumeOffsetToBroker(mq, offset.get());
          } catch (Exception e) {
            log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
          }
        } else {
          unusedMQ.add(mq);
        }
      }
    }
  }
  // 将未分配的队列从位移缓存中移除
  if (!unusedMQ.isEmpty()) {
    for (MessageQueue mq : unusedMQ) {
      this.offsetTable.remove(mq);
      log.info("remove unused mq, {}, {}", mq, this.groupName);
    }
  }
}


最终会调用以上方法,RocketMQ 会从重平衡那里获取当前消费者已分配的队列,如果位移缓存容器包含在当前分配队列,则进行消费位移提交,否则将从位移缓存容器中移除。


broker 端处理:


org.apache.rocketmq.broker.offset.ConsumerOffsetManager#commitOffset

private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
  ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
  if (null == map) {
    map = new ConcurrentHashMap<Integer, Long>(32);
    map.put(queueId, offset);
    this.offsetTable.put(key, map);
  } else {
    Long storeOffset = map.put(queueId, offset);
    if (storeOffset != null && offset < storeOffset) {
      log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
    }
  }
}


以上,offsetTable 为 broker 端的消费位移缓存容器,它的结构如下:

private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
  new ConcurrentHashMap<>(512);


它同样是一个 ConcurrentMap,一个线程安全容器,key 为的形式为 “topic@group”,value 也是一个 ConcurrentMap 它的 key 为 queueId,value 为位移,它会以 json 的形式持久化到磁盘

${ROCKETMQ_HOME}/store/config/consumerOffset.json 文件中,具体格式如下:

{
    "offsetTable": {
        "test-topic@test-group": {
            "0": 88526,
            "1": 88528,
            "2": 88532,
            "3": 88537
        }
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
消息中间件 负载均衡 中间件
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
158 0
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
|
10月前
|
消息中间件 中间件 Kafka
RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
**RocketMQ**的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的 ,具体来说Broker端是**Netty服务器**用来负责与客户端的连接请求处理,而Producer/Consumer端是**Netty客户端**用来负责与Netty服务器的通信及请求响应处理。
172 1
|
11月前
|
消息中间件 存储 负载均衡
RocketMQ 源码分析——NameServer
- 编写优雅、高效的代码。RocketMQ作为阿里双十一交易核心链路产品,支撑千万级并发、万亿级数据洪峰。读源码可以积累编写高效、优雅代码的经验。 - 提升微观的架构设计能力,重点在思维和理念。Apache RocketMQ作为Apache顶级项目,它的架构设计是值得大家借鉴的。 - 解决工作中、学习中的各种疑难杂症。在使用RocketMQ过程中遇到消费卡死、卡顿等问题可以通过阅读源码的方式找到问题并给予解决。 - 在BATJ一线互联网公司面试中展现优秀的自己。大厂面试中,尤其是阿里系的公司,你有RocketMQ源码体系化知识,必定是一个很大的加分项。
179 0
|
11月前
|
消息中间件 存储 Kafka
RocketMQ 源码分析——Broker
1. Broker启动流程分析 2. 消息存储设计 3. 消息写入流程 4. 亮点分析:NRS与NRC的功能号设计 5. 亮点分析:同步双写数倍性能提升的CompletableFuture 6. 亮点分析:Commitlog写入时使用可重入锁还是自旋锁? 7. 亮点分析:零拷贝技术之MMAP提升文件读写性能 8. 亮点分析:堆外内存机制
184 0
|
消息中间件 运维 监控
RocketMq-dashboard:topic 5min trend 原理和源码分析(一)
RocketMq-dashboard:topic 5min trend 原理和源码分析(一)
311 0
|
消息中间件 存储 负载均衡
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPushConsumer的实现原理及源码分析
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPushConsumer的实现原理及源码分析
177 1
|
消息中间件 存储 监控
搭建源码调试环境—RocketMQ源码分析(一)
在正式开始搭建调试环境之前,我们先了解一下RockeMQ源码的整体架构。 这是因为掌握了整体架构,可以让我们迅速了解各个方面的特性,并且可以方便我们后续快速定位功能模块对应的代码文件。话不多说,我们开始看RocketMQ目录结构。
304 0
搭建源码调试环境—RocketMQ源码分析(一)
|
消息中间件 存储 Java
【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
225 0
【分布式技术专题】RocketMQ延迟消息实现原理和源码分析