RocketMQ消费者启动流程

简介: 问题消费者启动的时候,去哪拿的消息呢?

问题答案


1.png


(1)当broker启动的时候,会把broker的地址端口、broker上的主题信息、主题队列信息发送到nameserver(如图中1)

(2)消费者Client启动的时候会去nameserver拿toipc、topic队列以及对应的broker信息,拿到以后把信息存储到本地(如图中2)

(3)消费者会给所有的broker发送心跳,并且附带自己的消费者组信息和ClientID信息,此时broker中就有消费者组对应的ClientID集合(如图中3)

(4)消费者启动后会reblance,有订阅的主题队列列表,并且通过broker可以拿到消费者组的ClientID集合,两个集合做rebalance,就可以拿到当前消费者对应消费的主题队列

(5) 消费者知道自己消费的主题队列,就可以根据队列信息通过Netty发送消息


跟源码


注意


本文是消费者启动流程,所以不去关注broker和nameserver的启动流程,这样关注点比较集中,因此步骤(1)本文不做描述。


消费者启动时怎么拿到toipc的信息


消费者启动的时候会调用

MQClientInstance###start()方法,start()方法里有会调用

MQClientInstance###startScheduledTask()方法,里面的一段代码如下,会每隔一段时间更新一下topic路由信息


//MQClientInstance###startScheduledTask()
 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);


会把路由信息保存到本地的一个HashMap里,这样消费者就拿到了topic的信息并且会把broker的信息保存下来


//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//根据主题从nameserver获取topic信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);


//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//把主题和主题队列相关的broker保存下来
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }


总结:消费者拿到主题的队列列表和broker信息


消费者给broker发现心跳的作用


MQClientInstance###startScheduledTask()方法,里面的一段代码如下,会每隔一段时间给所有的broker发送心跳消息


//MQClientInstance###startScheduledTask()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);


那么发送的心跳包中携带什么信息呢?如代码中所示,携带clientID和组名称


//MQClientInstance###prepareHeartbeatData
private HeartbeatData prepareHeartbeatData() {
        HeartbeatData heartbeatData = new HeartbeatData();
        // clientID
        //放入了当前消费者的clientID
        //放入了当前消费者的clientID
        //放入了当前消费者的clientID
        heartbeatData.setClientID(this.clientId);
        // Consumer
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                ConsumerData consumerData = new ConsumerData(); 
                //放入了当前消费者的组名称
                //放入了当前消费者的组名称
                //放入了当前消费者的组名称
                //放入了当前消费者的组名称
                consumerData.setGroupName(impl.groupName());
                consumerData.setConsumeType(impl.consumeType());
                consumerData.setMessageModel(impl.messageModel());
                consumerData.setConsumeFromWhere(impl.consumeFromWhere());
                consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
                consumerData.setUnitMode(impl.isUnitMode());
                heartbeatData.getConsumerDataSet().add(consumerData);
            }
        }
        // Producer
        for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                ProducerData producerData = new ProducerData();
                producerData.setGroupName(entry.getKey());
                heartbeatData.getProducerDataSet().add(producerData);
            }
        }
        return heartbeatData;
    }


此时broker拿到心跳消息怎么处理的呢?有一部分逻辑如下面代码所示,记录一下消费者信息


//ClientManageProcessor###heartBeat(ChannelHandlerContext ctx, RemotingCommand request)
```java
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
        //省略
        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            //省略
            boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(),
                clientChannelInfo,
                data.getConsumeType(),
                data.getMessageModel(),
                data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(),
                isNotifyConsumerIdsChangedEnable
            );
            //省略
        }
       //省略
    }


消费者怎么做reblance


MQClientInstance的start的方法里会开启一个rebalance的线程,如下面代码所示


//MQClientInstance###start()
public void start() throws MQClientException {
 //省略
 // Start rebalance service
 this.rebalanceService.start();
 //省略
}


跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法,如下面代码所示。根据主题队列列表和消费者组集合去做一个Rebalance,最后的返回结果是当前消费者需要消费的主题队列。


//RebalanceImpl##rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
                //获取订阅的主题的队列
                //获取订阅的主题的队列
                //获取订阅的主题的队列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //获取同消费者组的ClientID集合
                //获取同消费者组的ClientID集合
                //获取同消费者组的ClientID集合
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    //排序
                    //排序
                    //排序
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                    }
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        allocateResultSet.addAll(allocateResult);
                    }
                    //此处看下面的消费者怎么去拉消息
                    //此处看下面的消费者怎么去拉消息
                    //此处看下面的消费者怎么去拉消息
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
        }
    }


总结:消费者拿到主题的队列列表和消费者组中ClientID集合,通过在消费者这变做rebalance,从而确定被分配的主题队列集合


消费者怎么拉取消息


此处还是继续跟上面的代码,,然后执行到下面的代码,当消费者确定自己被分配的主题队列后,会把主题队列封装成PullRequest 并进行dispatch


//RebalanceImpl###updateProcessQueueTableInRebalance
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
       //省列
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
                        //省略
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
            }
        }
         //省略
        //派发请求任务
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }


下面跟RebalanceImpl###dispatchPullRequest方法,最后跟到下面的代码,就是把PullRequest放入到一个阻塞队列里。


//PullMessageService###executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }


那么谁取阻塞队列里的数据谁就是消费消息了?


PullMessageService是一个线程,他的run方法里会取上面阻塞队列里的PullRequest,如下面代码所示


//PullMessageService###run()
public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
        log.info(this.getServiceName() + " service end");
    }


从PullMessageService###pullMessage方法一直往下跟,就跟到下面的代码


//DefaultMQPushConsumerImpl###pullMessage(final PullRequest pullRequest) 
public void pullMessage(final PullRequest pullRequest) {
        //省略
        final long beginTimestamp = System.currentTimeMillis();
        PullCallback pullCallback = new PullCallback() {
            //省略,但是重要,后面会说
            //省略,但是重要,后面会说
            //省略,但是重要,后面会说
            //省略,但是重要,后面会说
        };
           //省略
        try {
            //发送数据并且执行回调方法,下面我们看一下回调方法的内容就好好了
            //发送数据并且执行回调方法,下面我们看一下回调方法的内容就好好了
            //发送数据并且执行回调方法,下面我们看一下回调方法的内容就好好了
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    }


那么回调方法是什么逻辑呢?代码如下所示,发现数据并且submitConsumeRequest


PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        //发现数据
                        //发现数据
                        //发现数据
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                //跟进去
                                //跟进去
                                //跟进去
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            //省略
                            break;
                        case NO_NEW_MSG:
                        case NO_MATCHED_MSG:
                            //省略
                    }
                }
            }
        };


跟上面submitConsumeRequest方法的到下面的代码,封装成ConsumeRequest,其实ConsumerRequest是一个线程


//ConsumeMessageConcurrentlyService###submitConsumeRequest
public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            //省略
            }
        }
    }


ConsumeRequest的run方法就会执行我们注册的listener方法,此时就消费到数据
```java
@Override
        public void run() {
            //省略
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            //省略
            }
        }


2.png


总结:

如下图所示,RebalanceService线程会根据情况把请求放在PullMessageService的pullRequestQueue阻塞队列队列里,队列的每一个节点就是拉请求;PullMessageService线程就是不断去pullRequestQueue里拿任务然后去看一下broker中有没有数据,如果有数据就消费。


3.png


总结


(1)忽然发现nameserver在整个过程中的作用感觉不是很大,其实我感觉这种设计还挺好的,因为把所有的压力都放在nameserver返回减少系统的健壮性。

(2)RocketMQ的rebalance是在消息消费者这边实现的,这样有一个很大的优势是减少nameserver和broker的压力。那消费者是怎么实现rebalance的呢?通过一个参数为当前消费者ID、主题队列、消费者组ClientID列表的匹配算法,每次只要保证算法的幂等性就可以了。

(3)RocketMQ的rebalance的rebalance是根据单个主题去实现的,这样的一个缺点是容易出现消费不平衡的问题。如下图所示。


4.png


(4)RocketMQ是AP的,因为他的很操作都是都是通过线程池的定时任务去做的。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
3月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
3月前
|
消息中间件 数据安全/隐私保护 RocketMQ
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
|
3月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
90 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
72 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
58 0
|
3月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
50 0
|
3月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
62 0
|
3月前
|
消息中间件 缓存 API
RocketMQ - 生产者消息发送流程
RocketMQ - 生产者消息发送流程
70 0