RocketMQ 消费者启动流程

简介: RocketMQ 消费者启动流程

MQ 使用场景: 应用解耦、消峰填谷,消息分发。


Consumer


RocketMQ Consumer 分为 Pull Consumer 和 Push Consumer ,其实就是推拉消费者。

  • Pull Consumer
  • Push Consumer


DefaultMQPushConsumer


DefaultMQPushCOnsumerImpl 通过 start 方法启动

image.png

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer

DefaultMQPushConsumer#start 启动代码:


public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }


先简化说一下启动流程:

启动步骤如下, 启动准备,从nameserver 获取 topic 路由信息,检查 Consumer 配置,向每个 broker 发心跳,触发一次 rebalance.


public synchronized void start() throws MQClientException {
  // 1. 启动准备工作
  switch (this.serviceState) {...}
  // 2. 从 NameServer 更新 topic 路由信息
  this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  // 3. 检查 consumer 配置
        this.mQClientFactory.checkClientInBroker();
  // 4. 向每个broker 发送心跳
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  // 5. 立即触发一次 reblance
        this.mQClientFactory.rebalanceImmediately();
        }


启动准备工作中有 CREATE_JUST状态判断。CREATE_JUST 的意思是 Service just created,not start 创建服务,没有启动。

start 启动核心代码在 DefaultMQPushConsumerImpl#start


public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
// 1. 检查必要的参数consumerGroup、消费模式、consumerFromWhere、负载策略等配置
                this.checkConfig();
// 2. 拷贝订阅信息,监听重投队列%RETRY%TOPIC
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
// 3. 获取MQ 实例,先从缓存中国取,没有则创建
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 4. 设置 重负载的消费组
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                //设置消费模式
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                // 设置负载策略
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 实例化消息拉取的包装类并注册消息过滤的消息类
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 选择对应的 offset 实现类,并加载 offset 集群,广播消息的 offset 从消费者本地获取,集群模式从 Brocker 维护。
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
// 启动消息消费者服务
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
// 8. 启动 MQ ClinentInstance
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
//9. 从nameserver拉取 topic 订阅消息
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  //10. 向 broker 校验客户端 
        this.mQClientFactory.checkClientInBroker();
    // 11. 给所有的 broker发送心跳。
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 12. 负载队列
        this.mQClientFactory.rebalanceImmediately();
    }


咱们细化下启动过程


1. 检查必要的参数consumerGroup、消费模式、consumerFromWhere、负载策略等配置
2. 拷贝订阅信息,监听重投队列%RETRY%TOPIC
3. 获取MQ实例,先从缓存中取,没有则创建
4. 设置重负载的消费组、消费模式、负载策略等
5. 实例化消息拉取的包装类并注册消息过滤的钩子
选择对应的OffsetStore实现类,并加载offset。广播消息从本地获取offset (Consumer本地维护),6. 集群消息从远程获取 (broker端维护)
7. 启动消息消费服务
7.1 并发消费:15分钟执行一次,过期消息(15分钟还未被消费的消息)一次最多清理16条过期消息,将过期消息发回给broker
7.2 顺序消费:20s定时锁定当前实例消费的所有队列,上锁成功将ProcessQueue的lock属性设置为true
8. 启动MQClientInstance
8.1 启动请求和响应的通道
8.2 开启定时任务
8.2.1 定时30s拉取最新的broker和topic的路由信息
8.2.2.定时30s向broker发送心跳包
8.2.3 定时5s持久化consumer的offset
8.2.4 定时1分钟,动态调整线程池线程数量
8.3 启动消息拉取服务
8.4 每20s重新负载一次
9. 从nameserver拉取topic的订阅信息
10. 向broker校验客户端
11. 给所有的broker的master发送心跳包 、上传filterClass的源文件给FilterServer
12. 立即负载队列


内容比较多,主要关心如下几点:


拷贝订阅消息


构建主体订阅消息 SubscriptionData 并加入到 RebalanceImpl 的订阅消息中,订阅关系主要来自两个:

  1. 通过调用 DefaltMQPushConsumerImpl#subscribe 方法
  2. 订阅主题消息,RocketMQ 消息重试以消费组为单位,而不是主题,消息重试主题为 %RETRY%+消费组. 消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。


private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }
            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }


初始化消息进度

其实说的就是加载 offset , 如果消息消费是集群模式,那么消费进度(offset)保存在 Broker 上;如果是广播方式,消息进度存储在消费端。


switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
    }
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);


创建消费者线程服务


根据消费者是否顺序消费, 创建消费端消费线程服务。consumeMessageService 负责消息消费,内部维护的是个线程池。


if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();


向MQClientInstance 注册消费者,并启动


向 MQClientInstance 注册消费者,并启动MQClientInstance。在一个 JVM 中所有的生产者和消费者都持有同一个 MQClientInstance, MQClientInstance, 只会启动一次。

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}
mQClientFactory.start();


总结


介绍了一下 MQ 消费者启动过程,校验配置,获取订阅消息,设置消费者的消费模式,负载策略。加载消息进度,启动消息消费者服务,并向MQClientInstance注册消费者Group和并启动MQClientInstance,从 nameserver 拉取 topic 订阅信息,向 Broker 发送心跳包。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 数据安全/隐私保护 RocketMQ
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
|
4月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
115 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
83 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
71 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
62 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
75 0
|
4月前
|
消息中间件 缓存 API
RocketMQ - 生产者消息发送流程
RocketMQ - 生产者消息发送流程
82 0