5 张图带你理解 RocketMQ 消费者启动过程

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 5 张图带你理解 RocketMQ 消费者启动过程

大家好,我是君哥。
今天来分享 RocketMQ 中一个关键的知识点,消费者的启动过程。

多数消息队列中,消费者和 Broker 通信的方式有两种,PUSH 模式和 PULL 模式:

  • PUSH 模式:Broker 主动把消息推送给订阅的消费者;
  • PULL模式:消费者主动从 Broker 拉取消息。

注意,RocketMQ 并没有真正实现 PUSH 模式, RocketMQ 中的 PUSH 模式,本质上也是 PULL 模式,只是消费端封装了轮询过程,相当于开启一个定时线程不停地从 Broker 拉取消息,拉取到消息后唤醒本地业务线程来处理。本文讲解 PULL 模式的启动过程。涉及到到的启动过程如下图:

微信图片_20221213112833.png

首先看下面这张图:

微信图片_20221213112858.png

图中可以看出,消费者需要注册到 Name Server,拉取消息的时候可以从 Broker 主节点拉取,也可以从 Broker 从节点拉取。

在 RocketMQ 的源码中,拉模式有两个消费者相关的类,其中 DefaultMQPullCons umer 类已经被废弃,官方推荐使用 Defau ltLitePullConsumer 类。下面代码来自官方示例:

public static void main(String[] args) throws Exception {
    DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
    litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    litePullConsumer.subscribe("TopicTest", "*");
    //启动方法
    litePullConsumer.start();
    try {
        while (running) {
            //这里可以看到,PULL 模式下消费者需要业务代码主动去拉取消息
            List<MessageExt> messageExts = litePullConsumer.poll();
            System.out.printf("%s%n", messageExts);
        }
    } finally {
        litePullConsumer.shutdown();
    }
}

上面代码中消费者属于消费组 lite_pull _consumer_test,订阅了【TopicTest 】这个 Topic 下的所有 tag。下面一起看一下启动方法。下图是消费者启动过程中类调用关系图,图中心的 pullRequestQueu e 是核心,pull 请求会先发送到这个队列,然后循环地拉取处理。

微信图片_20221213112924.png

检查启动配置

消费者启动时首先会检查配置,检查的配置项如下:

  • 消费组名称是否合法。包括校验项包括【非空】、【长度小于等于255】、符合正则表达式【^[%|a-zA-Z0-9_-]+$】、【不等于 “DEFAULT_CO NSUMER”】;
  • 消息模式不能是空,包括集群和广播两种模式;
  • MessageQueue 负载策略不能是空,包括:平均分配策略、循环分配策略、自定义分配策略、按照机房平均分配策略、按照机房就近分配策略、一致性 HASH 策略;
  • 长轮询模式下,消费者连接挂起时间不小于长轮询模式下 Broker 挂起时间,Broker 挂起时间默认 20s,官方不建议修改。

这部分源代码见 DefaultLitePullConsum erImpl#checkConfig。

修改消费者实例名称

如果是集群模式,实例名称改为【进程 ID + “ #” + 系统时间(纳秒 )】,代码如下:

//ClientConfig类
public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
    }
}

初始化 MQ 客户端

创建一个 MQClientInstance 实例,然后把消费者注册到 MQClientInstance。

private void initMQClientFactory() throws MQClientException {
    this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
    boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
    if (!registerOK) {
        this.serviceState = ServiceState.CREATE_JUST;
        throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
            null);
    }
}

初始化负载均衡器

对  RebalanceLitePullImpl 实例初始化,给下面的参数赋值:

  • 消费者名称;
  • 消息模型;
  • MessageQueue  负载均衡策略;
  • MQ 客户端,上节中初始化的 MQClientInstance 实例。

负载均衡线程启动后,默认每 20s 做一次负载均衡,见如下代码:

//RebalanceService 类
public void run() {
    while (!this.isStopped()) {
        //waitInterval 默认 20s,可以配置
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }
}

初始化 Wrapper

PullAPIWrapper 这个 Wrapper 类是 MQ-ClientInstance 类的 Wrapper 类,类中 pullKernelImpl 方法对  MQClientInstance 类中的 pullMessage 方法进行了装饰,这个装饰类主要增加了下面功能:

  1. 获取 Broker 地址;
  2. 检查 RocketMQ 版本;
  3. 如果 Broker 是从节点,把 sysFlag 标记偏移量的位改为 0,(偏移量 0x1);
  4. 封装请求 header;
  5. 获取 filterServer 地址(如果消费者是通过 filterServer 从 Broker 取消息,这里随机获取一个 filterServer  地址)。

代码如下 :

//PullAPIWrapper 
public PullResult pullKernelImpl(
    //省略所有参数
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //1.获取 Broker 地址
    FindBrokerResult findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                                                          this.recalculatePullFromWhichNode(mq), false);
    //省略从 Name sever 更新本地 Broker 缓存逻辑
    if (findBrokerResult != null) {
        {
            //2.检查 RocketMQ 版本
            if (!ExpressionType.isTagType(expressionType)
                && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                                            + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
            }
        }
        int sysFlagInner = sysFlag;
        if (findBrokerResult.isSlave()) {
            //3.把偏移量的位改为 0,(偏移量 0x1)
            sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
        }
        //4.封装请求 header
        PullMessageRequestHeader  = new PullMessageRequestHeader();
        //省略封装 requestHeader
        String brokerAddr = findBrokerResult.getBrokerAddr();
        if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
            //5.获取 filterServer 地址
            brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
        }
        PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
            brokerAddr,
            requestHeader,
            timeoutMillis,
            communicationMode,
            pullCallback);
        return pullResult;
    }
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

初始化 offset 存储器

offset 存储器的 UML 类图如下:

640.png

有两个实现类分别对应集群模式和广播模式,本文讨论的集群模式的实现类是 RemoteBrokerOffsetStore。offset 可以存储在本地或者远端服务器。

启动 MQ 客户端

启动 MQ 客户端主要包括如下步骤:

  1. 把 serviceState 改为 START_FAIL ED;
  2. 初始化 Netty channel;
  3. 启动定时任务,包括定时获取 Name Server 地址、从 Name Server 更新 Topic 路由信息、清理过期的 Broker、向 Broker 发送心跳、持久化 offset、定时调整线程池的数量(源码里面这个并没有实现逻辑);
  4. 启动拉取消息的线程,拉取线程的逻辑是从请求队列中不停地取出 pull 请求,然后将请求发送到 Broker 进行拉取消息,代码如下:
//PullMessageService类
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

从下面的代码可以看出,PULL 拉取消息最终使用了 DefaultMQPushConsumer Impl,所以 PULL 模式和 PUSH 模式拉取消息的逻辑是一样的。

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}


5.启动 MessageQueue 负载均衡线程;

6.启动生产者线程;7.把 serviceState 改为 Running。源码参考 MQClientInstance#start。

启动定时任务

这个定时任务默认每 30s 执行一次,用于监听每个 Topic 下的 MessageQueue 是否发生变化。代码见 startScheduleTask 方法。

启动轨迹消息

轨迹消息主要用于跟踪消息发送、消息消费的轨迹,用于记录详细日志。代码如下:

//AsyncTraceDispatcher 类
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}

这里不详细展开了,后面再单独讨论。

总结

本文通过源码分析讲解了 RocketMQ 中 PULL 模式下的消费者启动过程,在生产上使用比较多的还是 PUSH 模式,PULL 模式拉取消息的方法跟 PUSH 模式一样,不同的是 PULL 模式需要应用程序进行拉取动作,可以通过 PULL 模式的学习更容易的理解 PUSH 模式。最后,分析一个 PULL 模式启动过程涉及的 UML 类图:

微信图片_20221213113005.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
124 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
85 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
72 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
64 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
83 0
|
5月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
163 2
|
5月前
|
消息中间件 负载均衡 Apache
【RocketMQ系列七】消费者和生产者的实现细节
【RocketMQ系列七】消费者和生产者的实现细节
154 1