精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)

简介: 精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)


  • 上篇:分析对应总体消费流程的判断和校验以及限流控制和回调等处理流程分析
  • 下篇:分析基于上篇的总体流程的底层的消息通讯以及拉去处理数据传输流程分析

RocketMQ的消息模型

RocketMQ的基础消息模型是发布-订阅(Pub/Sub)是一种消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer),如下图所示。



消息通过生产者发送到某一个Topic,如果需要订阅该Topic并消费里面的消息的话,就要创建对应的消费者进行消费,而本文主要会进行介绍对应的消息队列的消费者。

本文主旨

本文主要会针对于RocketMQ的消费者Consumer的功能原理进行分析和介绍,消费者主要会通过以推(push),拉(pull)两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。提供实时消息订阅机制,可以满足大多数用户的需求。

RocketMQ提供Push模式也提供了Pull模式

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

Push模式处理消费消费

Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

DefaultMQPushConsumer的使用和初始化

Push模式主要通过初始化DefaultMQPushConsumer对象进行消费数据信息,案例代码如下所示。

java

复制代码

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址 
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
// 启动Consumer
consumer.start();
消费的位点配置

消费端的消费的位点计算值,可以在启动前进行配置,主要方法可以通过下面代码进行配置。

java

复制代码

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费。

注意:第一次启动是指从来没有消费过消息的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始

消费的模式的配置

消费模式主要分为:集群消费(Clustering)和广播消费(Broadcasting)这两种。

  • 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

java

复制代码

consumer.setMessageModel(MessageModel.BROADCASTING);
  • CLUSTERING:默认模式,同一个ConsumerGroup,每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所订阅topic整体,从而达到负载均衡的目的。
  • BROADCASTING:同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。
DefaultMQPushConsumer的运行原理和流程

DefaultMQPushConsumerImpl中各个对象的主要功能如下:

平衡和分配队列组件实现类-RebalancePushImpl

RebalancePushImpl:主要负责进行分配对应当前服务实例的消费者会从当前消费的topic中的那个Queue中进行消费消息;此外当消费者宕机或者下线的时候,还会执行rebalance再次平衡和分配给其他消费者对应的队列控制。

长连接进行拉去消息组件实现类-PullAPIWrapper

PullAPIWrapper:主要与broker服务端建立长连接,一直进行定时从broker服务端处拉取消息数据,默认为:32条消息,之后还会调用ConsumeMessageService实现类,进行用户注册的Listener执行消息消费逻辑。



看一下consumer.registerMessageListener的源码,如下所示。

java

复制代码

/**
 * Register a callback to execute on message arrival for concurrent consuming.
 * @param messageListener message handling callback.
 */
 @Override
 public void registerMessageListener(MessageListenerConcurrently messageListener) {
    this.messageListener = messageListener;
    this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
 }
回调用户的注册的MessageListener组件实现类-ConsumeMessageService

ConsumeMessageService:实现所谓的"Push-被动"消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息。


存储Offset的消费记录的位移组件实现类--OffsetStore

OffsetStore:维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式;



综合门面功能接口供各个Service组件实现类--MQClientFactory

MQClientFactory:负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成,总体流程架构如下图所示。


DefaultMQPushConsumerImpl的start方法的源码

我们先来看一下对应的DefaultMQPushConsumerImpl类的start方法源码,源码可以看出主要实现过程在consumer.start后调用DefaultMQPushConsumerImpl的同步start方法,如下所示。

java

复制代码

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
}
DefaultMQPushConsumerImpl的start方法的源码

主要我们能关注重点的代码和组件的start,通过mQClientFactory.start();发我们发现他调用了很多组件的start方法:

  • this.mQClientAPIImpl.start():主要用于开启请求-响应的网络通道对象。
  • this.startScheduledTask():主要开启多个定时任务的功能
  • this.pullMessageService.start():主要开启拉取数据的业务组件
  • this.rebalanceService.start():主要开启rebalance业务服务组件。
  • this.defaultMQProducer.getDefaultMQProducerImpl().start(false):开启push服务的对象组件作为门面。

java

复制代码

public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

重点我们先来主要看pullMessageService.start(),通过这里我们发现RocketMQ的Push模式底层其实也是通过pull实现的,接下来我们先来分析一下pullMessageService中的pullMessage方法的源码。


java

复制代码

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
}
DefaultMQPushConsumerImpl的pullMessage方法的源码

源码中需要进行根据消费组进行筛选对应的消费组,以方便选对应的消费组件DefaultMQPushConsumerImpl,如下图所示。

java

复制代码

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

最后还是通过DefaultMQPushConsumerImpl类的pullMessage方法来进行消息的逻辑处理,

java

复制代码

DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
DefaultMQPushConsumerImpl的逻辑限流控制流程

总结一下针对于限流的总体流程控制:



  1. 首先拉去消息数据的时候会先去判断对应的ProcessQueue的对象元素是否还存在订阅关系或者被删除了,从而进行跳过那些不应该被消费的数据。

java

复制代码

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
     log.info("the pull request[{}] is dropped.", pullRequest.toString());
     return;
}

上面的逻辑是先会判断和校验PullRequest对象中的ProcessQueue对象的dropped是否为true(在RebalanceService线程中为topic下的MessageQueue创建拉取消息请求时要维护对应的ProcessQueue对象,若Consumer不再订阅该topic则会将该对象的dropped置为true);若是则认为该请求是已经取消的,则直接跳出该方法。


  1. 更新PullRequest对象中的ProcessQueue对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳。此外会判断当前的Consumer消费者组件是否运行中,主要是通过DefaultMQPushConsumerImpl.serviceState是否为RUNNING。

java

复制代码

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
    this.makeSureStateOK();
} catch (MQClientException e) {
    log.warn("pullMessage exception, consumer state not ok", e);
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    return;
}
if (this.isPause()) {
  log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
  return;
}

如果运行状态或者是暂停状态this.isPause()=false(DefaultMQPushConsumerImpl.pause=true),则会进行执行PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延迟再拉取消息,其中timeDelay=3000;

该方法的目的是在3秒之后再次将该PullRequest对象放入PullMessageService. pullRequestQueue队列中;并跳出该方法


  1. 主要进行消费者端进行速度和控制消费速度的流控。若ProcessQueue对象的msgCount大于了消费端的流控阈值,默认值为1000,主要通过DefaultMQPushConsumer.pullThresholdForQueue的执行进行判断。当调用的processQueue.getMsgCount().get()的数值大于DefaultMQPushConsumer.pullThresholdForQueue 的值时候会进行 PullMessageService.executePullRequestLater方法。

java

复制代码

long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
  1. 主要进行消费者端进行速度和控制消费速度的流控。主要会通过 this.defaultMQPushConsumer.getPullThresholdSizeForQueue()与进行计算消息的内存空间的总大小进行对比,单位是M,当大于系统定义的 this.defaultMQPushConsumer.getPullThresholdSizeForQueue()的阈值大小的时候,则会进行限流处理。

java

复制代码

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }

以上3和4步骤中的(DELAY_MILLS_WHEN_FLOW_CONTROL)50毫秒之后,才会将该PullRequest请求放入PullMessageService.pullRequestQueue队列中。从而实现看限流的能力。


  1. 当上面的直接限流之后,还会有跨度限流的控制,首先系统还会判断当前的消费方式是否顺序消费(即DefaultMQPushConsumerImpl.consumeOrderly等于false)。

java

复制代码

if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        }

则检查ProcessQueue对象的msgTreeMap:TreeMap 变量的第一个key值与最后一个key值之间的的差值,该key值表示查询的队列偏移量queueoffset;

若queueoffset差值大于阈值(DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默认是2000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后再该PullRequest请求放入PullMessageService.pullRequestQueue队列中。


  1. PullRequest.messageQueue对象的topic值为参数RebalanceImpl.subscriptionInner,ConcurrentHashMap, SubscriptionData>中获取对应的SubscriptionData对象,若该对象为null,考虑到并发的关系,调用executePullRequestLater方法。

java

复制代码

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
 }
DefaultMQPushConsumerImpl的消费模式分类

如果当前的消费者属于集群模式(RebalanceImpl.messageModel等于CLUSTERING)。

  • PullRequest对象的MessageQueue变量值,type =READ_FROM_MEMORY(从内存中获取消费进度offset值)为参数调用DefaultMQPushConsumerImpl的offsetStore对象。
  • 实际代表着RemoteBrokerOffsetStore对象的readOffset(MessageQueue mq, ReadOffsetType type)方法从本地内存中获取消费进度offset值。

java

复制代码

boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
   }

若该offset值大于0,则置临时变量commitOffsetEnable等于true否则为false;该offset值作为pullKernelImpl方法中的commitOffset参数,在Broker端拉取消息之后根据commitOffsetEnable参数值决定是否用该offset更新消息进度。

该readOffset方法的逻辑是:以入参MessageQueue对象从RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap 变量中获取消费进度偏移量;若该偏移量不为null则返回该值,否则返回-1;


DefaultMQPushConsumerImpl的订阅模型

当每次拉取消息之后需要更新订阅关系(由DefaultMQPushConsumer. postSubscriptionWhenPull参数表示,默认为false)并且以topic值参数从RebalanceImpl.subscriptionInner获取的SubscriptionData对象的classFilterMode等于false(默认为false),则将sysFlag标记的第3个字节置为1,否则该字节置为0;

java

复制代码

SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }
            classFilter = sd.isClassFilterMode();
        }
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
 );

该sysFlag标记的第1个字节置为commitOffsetEnable的值;第2个字节(suspend标记)置为1;第4个字节置为classFilterMode的值。

DefaultMQPushConsumerImpl的底层客户端如何拉取消息的通信方法

调用底层的拉取消息API接口,方法进行消息拉取操作。

java

复制代码

PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)

内部会存在将回调类PullCallback传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。

java

复制代码

try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                // 消息的通信方式为异步
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }

初始化匿名内部类PullCallback,实现了onSuccess/onException方法; 该方法只有在异步请求的情况下才会回调;

java

复制代码

PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }
            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };

主要针对于拉去底层消息的状态进行分状态处理,针对于this.pullAPIWrapper.pullKernelImpl的方法,我会在【下篇】进行介绍,此处不做讲述分析。

此外针对于PullStatus状态的分析-FOUND状态的处理,主要更新本地的offset值,以及流程控制等。如下所示。

java

复制代码

long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;                           
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
  • 此外针对于PullStatus状态的分析-NO_NEW_MSG状态的处理,主要进行更新topic标签的offset的数据值、以及下一次拉去消息的offset值。

java

复制代码

pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  • 此外针对于PullStatus状态的分析-NO_MATCHED_MSG:状态的处理,主要进行更新topic标签的offset的数据值、以及下一次拉去消息的offset值。

java

复制代码

pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  • 此外针对于PullStatus状态的分析-OFFSET_ILLEGAL:状态的处理,重新同步远程的offset数据值。

java

复制代码

pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
   @Override
   public void run() {
     try {
     DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());                                     DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
 }, 10000);


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
454 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
17天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
172 1
|
6月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
6月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
6月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
6月前
|
消息中间件 存储 缓存
RocketMQ原理—4.消息读写的性能优化
本文详细解析了RocketMQ消息队列的核心原理与性能优化机制,涵盖Producer消息分发、Broker高并发写入、Consumer拉取消息流程等内容。重点探讨了基于队列的消息分发、Hash有序分发、CommitLog内存写入优化、ConsumeQueue物理存储设计等关键技术点。同时分析了数据丢失场景及解决方案,如同步刷盘与JVM OffHeap缓存分离策略,并总结了写入与读取流程的性能优化方法,为理解和优化分布式消息系统提供了全面指导。
RocketMQ原理—4.消息读写的性能优化
|
3月前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
878 5
|
4月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
2357 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
6月前
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
1553 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
6月前
|
消息中间件 Java 中间件
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署

热门文章

最新文章