48. 源代码解读-RocketMQ-client接收消息流程

简介:

1. 简介

消费消息可以分成pull和push方式,push消息使用比较简单,因为RocketMQ已经帮助我们封装了大部分流程,我们只要重写回调函数即可。

下面我们就以push消费方式为例,分析下这部分源代码流程。

2. 消费者启动流程图

48. 源代码解读-RocketMQ-client接收消息流程

3.消费者类图

48. 源代码解读-RocketMQ-client接收消息流程

4. 消费者源代码流程

4.1 消费客户端启动

根据官方(https://github.com/apache/rocketmq)提供的例子,Consumer.java里面使用DefaultMQPushConsumer启动消息消费者,如下

//初始化DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//设置命名服务,参考namesrv的启动
consumer.setNamesrvAddr("localhost:9876");
//设置消费起始位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅消费的主题和过滤符
consumer.subscribe("TopicTest", "*");
//设置消息回调函数
consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
      ConsumeConcurrentlyContext context) {
      System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
});
//启动消费者
consumer.start();

4.2 消息者启动

我们接着看consumer.start()方法

@Override
public void start() throws MQClientException {
     this.defaultMQPushConsumerImpl.start();
}

DefaultMQPushConsumerImpl.java

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                ...

                this.checkConfig();//检查参数

                ...

                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                ...

                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                ...
                this.offsetStore.load();

                ...

                this.consumeMessageService.start();

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                ...

                mQClientFactory.start();

                this.serviceState = ServiceState.RUNNING;
            ...
        }

        ...
    }

在初始化一堆参数之后,然后调用mQClientFactory.start();

private MQClientInstance mQClientFactory;

其实这个命名有点奇怪啊(阿里程序员手抖了?),为什么MQClientInstance类型的变量名称叫mQClientFactory ...

那继续看MQClientInstance的start

4.3 MQClientInstance

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    ...
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
               ...
            }
        }
    }

各行代码的作用就像源代码里面的注释一样,重点看下pullMessageService.start和rebalanceService.start
pullMessageService.start作用是不断从一个阻塞队列里面获取pullRequest请求,然后去RocketMQ broker里面获取消息。
如果没有pullRequest的话,那么它将阻塞。
那么,pullRequest请求是怎么放进去的呢?这个就要看rebalanceService了。

4.4 pullMessageService.start

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

@Override
public void run() {
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {
                ..
            }
     }
}

顺便说一句,pullMessageService和rebalanceService都是继承自ServiceThread

public class PullMessageService extends ServiceThread {}

ServiceThread简单封装了线程的启动,调用start方法,就会调用它的run方法。

public ServiceThread() {
        this.thread = new Thread(this, this.getServiceName()); //把当前对象作为runnable传入线程构造函数
    }

    public void start() {
        this.thread.start();
    }

这样启动线程就要方便一点,看起来舒服一点。

嗯,继续分析之前的分析。

从pullMessageService的run方法可以看出它是从阻塞队列pullRequestQueue里面获取pullRequest,如果没有那么将阻塞。(如果不清楚java阻塞的使用,清百度)

执行完一次pullReqeust之后,再继续下一次获取阻塞队列,因为它是个while循环。

所以,我们需要分析下pullRequest放进队列的流程,也就是rebalanceService.

4.5 rebalanceService

public class RebalanceService extends ServiceThread {
    @Override
    public void run() {
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}       

MQClientInstance.java

public void doRebalance() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }

DefaultMQPushConsumerImpl.java

@Override
    public void doRebalance() {
        if (!this.pause) {
            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
        }
    }

RebalanceImpl.java

public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }
private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                ....
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {

                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

一路跟下来,来到了RebalanceImpl.java的rebalanceByTopic方法,这个方法里面有两个case(Broadcasting和Clustering)也就是消息消费的两个模式,广播和集群消息。
广播的话,所有的监听者都会收到消息,集群的话,只有一个消费者可以收到,我们以集群消息为例。
先大概解释下在rebalanceByTopic里面要做什么。

  1. 从namesrv获取broker里面这个topic的消费者数量
  2. 从namesrv获取broker这个topic的消息队列数量
  3. 根据前两部获取的数据进行负载均衡计算,计算出当前消费者客户端分配到的消息队列。
  4. 按照分配到的消息队列,去broker请求这个消息队列里面的消息。

上面代码厘米mqset就是这个topic的消费队列,一般是4个,但是这个值是可以修改的,存储的位置在~/store/config/topics.json里面,比如:

"TopicTest":{
        "order":false,
        "perm":6,
        "readQueueNums":4,
        "topicFilterType":"SINGLE_TAG",
        "topicName":"TopicTest",
        "topicSysFlag":0,
        "writeQueueNums":4
}    

可以修改readQueueNums和writeQueueNums为其他值

try {
     allocateResult = strategy.allocate(
                 this.consumerGroup,
         this.mQClientFactory.getClientId(),
         mqAll,
         cidAll);
  } catch (Throwable e) {
         return;
  }

这段代码就是客户端根据获取到的这个topic消费者数量和消息队列数量,使用负载均衡策略计算出当前客户端能够使用的消息队列。
负载均衡策略代码在这个位置。

48. 源代码解读-RocketMQ-client接收消息流程

那我们继续4.4 pullMessageService.start分析,因为rebalanceService已经把pullRequest放到了阻塞队列。

4.6 PullMessageService.run

@Override
    public void run() {
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {

            }
        }
    }
private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {

        }
    }

调用到DefaultMQPushConsumerImpl.pullMessage(pullRequest)这个方法里面。

4.6.1

public void pullMessage(final PullRequest pullRequest) {
        ...

        final long beginTimestamp = System.currentTimeMillis();

        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                System.out.printf("pullcallback onsuccess: " + pullResult + " %n");
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {

                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispathToConsume);
                            }
                            break;
                    }
                }
            }

            @Override
            public void onException(Throwable e) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };

        try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    }

上面这段代码主要就是设置消息获取后的回调函数PullCallback pullCallback,然后调用pullAPIWrapper.pullKernelImpl去Broker里面获取消息。

获取成功后,就会回调pullCallback的onSuccess方法的FOUND case分支。

在pullCallback的onSucess方法的FOUND case分支,会根据回调是同步还是异步,分为两种情况,如下:

48. 源代码解读-RocketMQ-client接收消息流程

同步消息和异步消息区别的源代码实现以后再讲。





     本文转自rongwei84n 51CTO博客,原文链接:http://blog.51cto.com/483181/2056301,如需转载请自行联系原作者



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 RocketMQ
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
601 0
|
4月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 数据安全/隐私保护 RocketMQ
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
|
3月前
|
消息中间件 缓存 API
RocketMQ - 生产者消息发送流程
RocketMQ - 生产者消息发送流程
70 0
|
3月前
|
消息中间件 网络协议 API
RocketMQ - 生产者启动流程
RocketMQ - 生产者启动流程
38 0
|
5月前
|
消息中间件 监控 Java
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
483 3
|
6月前
|
消息中间件
【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
【1月更文挑战第27天】【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
|
6月前
|
消息中间件 Java RocketMQ
MQ产品使用合集之在同一个 Java 进程内建立三个消费对象并设置三个消费者组订阅同一主题和标签的情况下,是否会发生其中一个消费者组无法接收到消息的现象
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
121 1
|
网络协议 物联网 开发者
详细介绍 MQTT 的工作原理,包括 MQTT 协议的特点、核心概念以及消息传递的流程
详细介绍 MQTT 的工作原理,包括 MQTT 协议的特点、核心概念以及消息传递的流程
3311 1
|
6月前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算中,使用了RocketMQ的触发器,并且发送和接收消息都没有问题,但是消息轨迹中没有体现出来消费的情况如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。