精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)

简介: 精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)


  • 上篇:分析对应总体消费流程的判断和校验以及限流控制和回调等处理流程分析
  • 下篇:分析基于上篇的总体流程的底层的消息通讯以及拉去处理数据传输流程分析

RocketMQ的消息模型

RocketMQ的基础消息模型是发布-订阅(Pub/Sub)是一种消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer),如下图所示。



消息通过生产者发送到某一个Topic,如果需要订阅该Topic并消费里面的消息的话,就要创建对应的消费者进行消费,而本文主要会进行介绍对应的消息队列的消费者。

本文主旨

本文主要会针对于RocketMQ的消费者Consumer的功能原理进行分析和介绍,消费者主要会通过以推(push),拉(pull)两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。提供实时消息订阅机制,可以满足大多数用户的需求。

RocketMQ提供Push模式也提供了Pull模式

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

Push模式处理消费消费

Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

DefaultMQPushConsumer的使用和初始化

Push模式主要通过初始化DefaultMQPushConsumer对象进行消费数据信息,案例代码如下所示。

java

复制代码

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址 
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
// 启动Consumer
consumer.start();
消费的位点配置

消费端的消费的位点计算值,可以在启动前进行配置,主要方法可以通过下面代码进行配置。

java

复制代码

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费。

注意:第一次启动是指从来没有消费过消息的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始

消费的模式的配置

消费模式主要分为:集群消费(Clustering)和广播消费(Broadcasting)这两种。

  • 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

java

复制代码

consumer.setMessageModel(MessageModel.BROADCASTING);
  • CLUSTERING:默认模式,同一个ConsumerGroup,每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所订阅topic整体,从而达到负载均衡的目的。
  • BROADCASTING:同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。
DefaultMQPushConsumer的运行原理和流程

DefaultMQPushConsumerImpl中各个对象的主要功能如下:

平衡和分配队列组件实现类-RebalancePushImpl

RebalancePushImpl:主要负责进行分配对应当前服务实例的消费者会从当前消费的topic中的那个Queue中进行消费消息;此外当消费者宕机或者下线的时候,还会执行rebalance再次平衡和分配给其他消费者对应的队列控制。

长连接进行拉去消息组件实现类-PullAPIWrapper

PullAPIWrapper:主要与broker服务端建立长连接,一直进行定时从broker服务端处拉取消息数据,默认为:32条消息,之后还会调用ConsumeMessageService实现类,进行用户注册的Listener执行消息消费逻辑。



看一下consumer.registerMessageListener的源码,如下所示。

java

复制代码

/**
 * Register a callback to execute on message arrival for concurrent consuming.
 * @param messageListener message handling callback.
 */
 @Override
 public void registerMessageListener(MessageListenerConcurrently messageListener) {
    this.messageListener = messageListener;
    this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
 }
回调用户的注册的MessageListener组件实现类-ConsumeMessageService

ConsumeMessageService:实现所谓的"Push-被动"消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息。


存储Offset的消费记录的位移组件实现类--OffsetStore

OffsetStore:维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式;



综合门面功能接口供各个Service组件实现类--MQClientFactory

MQClientFactory:负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成,总体流程架构如下图所示。


DefaultMQPushConsumerImpl的start方法的源码

我们先来看一下对应的DefaultMQPushConsumerImpl类的start方法源码,源码可以看出主要实现过程在consumer.start后调用DefaultMQPushConsumerImpl的同步start方法,如下所示。

java

复制代码

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
}
DefaultMQPushConsumerImpl的start方法的源码

主要我们能关注重点的代码和组件的start,通过mQClientFactory.start();发我们发现他调用了很多组件的start方法:

  • this.mQClientAPIImpl.start():主要用于开启请求-响应的网络通道对象。
  • this.startScheduledTask():主要开启多个定时任务的功能
  • this.pullMessageService.start():主要开启拉取数据的业务组件
  • this.rebalanceService.start():主要开启rebalance业务服务组件。
  • this.defaultMQProducer.getDefaultMQProducerImpl().start(false):开启push服务的对象组件作为门面。

java

复制代码

public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

重点我们先来主要看pullMessageService.start(),通过这里我们发现RocketMQ的Push模式底层其实也是通过pull实现的,接下来我们先来分析一下pullMessageService中的pullMessage方法的源码。


java

复制代码

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
}
DefaultMQPushConsumerImpl的pullMessage方法的源码

源码中需要进行根据消费组进行筛选对应的消费组,以方便选对应的消费组件DefaultMQPushConsumerImpl,如下图所示。

java

复制代码

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

最后还是通过DefaultMQPushConsumerImpl类的pullMessage方法来进行消息的逻辑处理,

java

复制代码

DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
DefaultMQPushConsumerImpl的逻辑限流控制流程

总结一下针对于限流的总体流程控制:



  1. 首先拉去消息数据的时候会先去判断对应的ProcessQueue的对象元素是否还存在订阅关系或者被删除了,从而进行跳过那些不应该被消费的数据。

java

复制代码

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
     log.info("the pull request[{}] is dropped.", pullRequest.toString());
     return;
}

上面的逻辑是先会判断和校验PullRequest对象中的ProcessQueue对象的dropped是否为true(在RebalanceService线程中为topic下的MessageQueue创建拉取消息请求时要维护对应的ProcessQueue对象,若Consumer不再订阅该topic则会将该对象的dropped置为true);若是则认为该请求是已经取消的,则直接跳出该方法。


  1. 更新PullRequest对象中的ProcessQueue对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳。此外会判断当前的Consumer消费者组件是否运行中,主要是通过DefaultMQPushConsumerImpl.serviceState是否为RUNNING。

java

复制代码

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
    this.makeSureStateOK();
} catch (MQClientException e) {
    log.warn("pullMessage exception, consumer state not ok", e);
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    return;
}
if (this.isPause()) {
  log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
  return;
}

如果运行状态或者是暂停状态this.isPause()=false(DefaultMQPushConsumerImpl.pause=true),则会进行执行PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延迟再拉取消息,其中timeDelay=3000;

该方法的目的是在3秒之后再次将该PullRequest对象放入PullMessageService. pullRequestQueue队列中;并跳出该方法


  1. 主要进行消费者端进行速度和控制消费速度的流控。若ProcessQueue对象的msgCount大于了消费端的流控阈值,默认值为1000,主要通过DefaultMQPushConsumer.pullThresholdForQueue的执行进行判断。当调用的processQueue.getMsgCount().get()的数值大于DefaultMQPushConsumer.pullThresholdForQueue 的值时候会进行 PullMessageService.executePullRequestLater方法。

java

复制代码

long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
  1. 主要进行消费者端进行速度和控制消费速度的流控。主要会通过 this.defaultMQPushConsumer.getPullThresholdSizeForQueue()与进行计算消息的内存空间的总大小进行对比,单位是M,当大于系统定义的 this.defaultMQPushConsumer.getPullThresholdSizeForQueue()的阈值大小的时候,则会进行限流处理。

java

复制代码

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }

以上3和4步骤中的(DELAY_MILLS_WHEN_FLOW_CONTROL)50毫秒之后,才会将该PullRequest请求放入PullMessageService.pullRequestQueue队列中。从而实现看限流的能力。


  1. 当上面的直接限流之后,还会有跨度限流的控制,首先系统还会判断当前的消费方式是否顺序消费(即DefaultMQPushConsumerImpl.consumeOrderly等于false)。

java

复制代码

if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        }

则检查ProcessQueue对象的msgTreeMap:TreeMap 变量的第一个key值与最后一个key值之间的的差值,该key值表示查询的队列偏移量queueoffset;

若queueoffset差值大于阈值(DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默认是2000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后再该PullRequest请求放入PullMessageService.pullRequestQueue队列中。


  1. PullRequest.messageQueue对象的topic值为参数RebalanceImpl.subscriptionInner,ConcurrentHashMap, SubscriptionData>中获取对应的SubscriptionData对象,若该对象为null,考虑到并发的关系,调用executePullRequestLater方法。

java

复制代码

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
 }
DefaultMQPushConsumerImpl的消费模式分类

如果当前的消费者属于集群模式(RebalanceImpl.messageModel等于CLUSTERING)。

  • PullRequest对象的MessageQueue变量值,type =READ_FROM_MEMORY(从内存中获取消费进度offset值)为参数调用DefaultMQPushConsumerImpl的offsetStore对象。
  • 实际代表着RemoteBrokerOffsetStore对象的readOffset(MessageQueue mq, ReadOffsetType type)方法从本地内存中获取消费进度offset值。

java

复制代码

boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
   }

若该offset值大于0,则置临时变量commitOffsetEnable等于true否则为false;该offset值作为pullKernelImpl方法中的commitOffset参数,在Broker端拉取消息之后根据commitOffsetEnable参数值决定是否用该offset更新消息进度。

该readOffset方法的逻辑是:以入参MessageQueue对象从RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap 变量中获取消费进度偏移量;若该偏移量不为null则返回该值,否则返回-1;


DefaultMQPushConsumerImpl的订阅模型

当每次拉取消息之后需要更新订阅关系(由DefaultMQPushConsumer. postSubscriptionWhenPull参数表示,默认为false)并且以topic值参数从RebalanceImpl.subscriptionInner获取的SubscriptionData对象的classFilterMode等于false(默认为false),则将sysFlag标记的第3个字节置为1,否则该字节置为0;

java

复制代码

SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }
            classFilter = sd.isClassFilterMode();
        }
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
 );

该sysFlag标记的第1个字节置为commitOffsetEnable的值;第2个字节(suspend标记)置为1;第4个字节置为classFilterMode的值。

DefaultMQPushConsumerImpl的底层客户端如何拉取消息的通信方法

调用底层的拉取消息API接口,方法进行消息拉取操作。

java

复制代码

PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)

内部会存在将回调类PullCallback传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。

java

复制代码

try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                // 消息的通信方式为异步
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }

初始化匿名内部类PullCallback,实现了onSuccess/onException方法; 该方法只有在异步请求的情况下才会回调;

java

复制代码

PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }
            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };

主要针对于拉去底层消息的状态进行分状态处理,针对于this.pullAPIWrapper.pullKernelImpl的方法,我会在【下篇】进行介绍,此处不做讲述分析。

此外针对于PullStatus状态的分析-FOUND状态的处理,主要更新本地的offset值,以及流程控制等。如下所示。

java

复制代码

long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;                           
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
  • 此外针对于PullStatus状态的分析-NO_NEW_MSG状态的处理,主要进行更新topic标签的offset的数据值、以及下一次拉去消息的offset值。

java

复制代码

pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  • 此外针对于PullStatus状态的分析-NO_MATCHED_MSG:状态的处理,主要进行更新topic标签的offset的数据值、以及下一次拉去消息的offset值。

java

复制代码

pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  • 此外针对于PullStatus状态的分析-OFFSET_ILLEGAL:状态的处理,重新同步远程的offset数据值。

java

复制代码

pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
   @Override
   public void run() {
     try {
     DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());                                     DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
 }, 10000);


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
2月前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
2月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
75 12
|
2月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
40 2
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
54 0
|
3月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
109 0
|
5月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
199 0
|
5月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。