简述RocketMQ消息拉取过程【一】

简介: 简述RocketMQ消息拉取过程【一】

前言

相信每一个使用RocketMQ的开发工程师都想了解一下消息是如何被拉到消费端的,消费者在消息拉取过程中都做了什么操作?这些疑问将在接下来的这篇文章中给大家一一解开;

DefaultMQPushConsumer拉消息

RocketMQ有两种类型的消费者,一种是DefaultLitePullConsumer主动拉的模式,另一种是DefaultMQPushConsumer被动接收的模式,我们先来介绍DefaultMQPushConsumer是如何拉消息的。我们需要从MQClientInstance类作为入口,找到pullMessageService,它才是负责从Broker拉取消息的;

  • 启动拉消息线程

在消费者调用start()方法时,我们最终可以发现MQClientInstance.start()方法也被调用了,在里面我们可以发现这样一段代码:

// Start pull service
this.pullMessageService.start();
复制代码

最终我们发现pullMessageService其实是ServiceThread的子类,ServiceThread又实现了Runnable,说白了pullMessageService就是一个任务,pullMessageService.start()源码我们可以看一下:

public void start() {
        log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
        if (!started.compareAndSet(false, true)) {
            return;
        }
        stopped = false;
        // 把pullMessageService作为线程任务
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    }
复制代码

这里面就是启动了一个线程,然后这个线程的任务就是pullMessageService本身,所以我们需要重点关注run()方法的实现:

@Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            try {
                // 从阻塞队列中取MessageRequest
                MessageRequest messageRequest = this.messageRequestQueue.take();
                // 根据消息请求模式拉消息
                if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) {
                    this.popMessage((PopRequest)messageRequest);
                } else {
                    this.pullMessage((PullRequest)messageRequest);
                }
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
        log.info(this.getServiceName() + " service end");
    }
复制代码
  • 维护消息请求队列

DefaultMQPushConsumer中,当重平衡任务启动后,每个消费者实例将分配到对应的MessageQueue,然后就会创建对应的MessageRequest放进messageRequestQueue中,相关代码可查看RebalanceImpl.updateMessageQueueAssignment()方法,下面贴出相关的代码片段:

boolean allMQLocked = true;
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mq2PushAssignment.keySet()) {
    if (!this.processQueueTable.containsKey(mq)) {
        if (isOrder && !this.lock(mq)) {
            log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
            allMQLocked = false;
            continue;
        }
        this.removeDirtyOffset(mq);
        ProcessQueue pq = createProcessQueue();
        pq.setLocked(true);
        long nextOffset = -1L;
        try {
            nextOffset = this.computePullFromWhereWithException(mq);
        } catch (Exception e) {
            log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
            continue;
        }
        if (nextOffset >= 0) {
            ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
            if (pre != null) {
                log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
            } else {
                log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(consumerGroup);
                pullRequest.setNextOffset(nextOffset);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(pq);
                pullRequestList.add(pullRequest);
                changed = true;
            }
        } else {
            log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
        }
    }
}
if (!allMQLocked) {
    mQClientFactory.rebalanceLater(500);
}
this.dispatchPullRequest(pullRequestList, 500);
复制代码

这一段代码就是根据重平衡后生成了pullRequest并放进了messageRequestQueue请求队列中,同样的popRequest也是和上面代码类似:

List<PopRequest> popRequestList = new ArrayList<PopRequest>();
for (MessageQueue mq : mq2PopAssignment.keySet()) {
    if (!this.popProcessQueueTable.containsKey(mq)) {
        PopProcessQueue pq = createPopProcessQueue();
        PopProcessQueue pre = this.popProcessQueueTable.putIfAbsent(mq, pq);
        if (pre != null) {
            log.info("doRebalance, {}, mq pop already exists, {}", consumerGroup, mq);
        } else {
            log.info("doRebalance, {}, add a new pop mq, {}", consumerGroup, mq);
            PopRequest popRequest = new PopRequest();
            popRequest.setTopic(topic);
            popRequest.setConsumerGroup(consumerGroup);
            popRequest.setMessageQueue(mq);
            popRequest.setPopProcessQueue(pq);
            popRequest.setInitMode(getConsumeInitMode());
            popRequestList.add(popRequest);
            changed = true;
        }
    }
}
this.dispatchPopPullRequest(popRequestList, 500);
复制代码

最终都是调用dispatchPullRequest()dispatchPopPullRequest()把请求放进阻塞队列中:

@Override
    public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
        for (PullRequest pullRequest : pullRequestList) {
            if (delay <= 0) {
               this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            } else {
                this.defaultMQPushConsumerImpl.executePullRequestLater(pullRequest, delay);
            }
        }
    }
    @Override
    public void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay) {
        for (PopRequest pullRequest : pullRequestList) {
            if (delay <= 0) {
                this.defaultMQPushConsumerImpl.executePopPullRequestImmediately(pullRequest);
            } else {
                this.defaultMQPushConsumerImpl.executePopPullRequestLater(pullRequest, delay);
            }
        }
    }
复制代码

我们从这里就可以知道,DefaultMQPushConsumer拉消息的请求是通过重平衡来第一次触发的;重平衡会触发拉消息的请求产生,并存放到阻塞队列中;另外pullMessageService将会持续从阻塞队列中取出请求去真正地从Broker拉消息;

  • Broker拉消息

PullRequest作为示例,我们来看一下源码:

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            // 最终还是要去DefaultMQPushConsumerImpl中调用pullMessage()方法
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }
复制代码

所以我们下一步进入核心代码,真正实现从Broker拉消息,在处理之前会做一系列的判断:

  • 1.状态检测
if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }
复制代码

如果在重平衡后,processQueue已经被移除了,那么就不再去拉消息了;

try {
            this.makeSureStateOK();
        } catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            return;
        }
复制代码

如果当前消费者不在运行状态,那么把这个pullRequest重新放回阻塞队列,延迟3秒再处理;

if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }
复制代码
  • 2.流量控制

如果这个消费者被暂停了,那么把这个任务放回阻塞队列,延迟1秒再处理;

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
复制代码

如果processQueue中未消费的消息数量大于PullThresholdForQueue阈值,那么触发流控,这个pullRequest将重新放回阻塞队列中,延迟50毫秒再处理;

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
复制代码

如果processQueue中未消费的消息大小(cachedMessageSizeInMiB代表未消费消息的MB)大于PullThresholdSizeForQueue阈值,那么依然触发流控,延迟50豪秒放回阻塞队列中;

if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        }
复制代码

并发模式下,如果最大消费点位与最小消费点位间的差距超出了ConsumeConcurrentlyMaxSpan阈值,默认是2000,也会触发流控,延迟50豪秒放回阻塞队列中;

  • 顺序消息

如果是顺序消息,那么需要先上锁再重新计算一下消费点位,如果锁竞争失败,那么重新放回阻塞队列,并延迟3秒钟:

if (processQueue.isLocked()) {
                if (!pullRequest.isPreviouslyLocked()) {
                    long offset = -1L;
                    try {
                       // 重新计算消费点位
                        offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
                        if (offset < 0) {
                            throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
                        }
                    } catch (Exception e) {
                        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                        log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
                        return;
                    }
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) {
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                            pullRequest, offset);
                    }
                    pullRequest.setPreviouslyLocked(true);
                    pullRequest.setNextOffset(offset);
                }
            } else {
               // 锁竞争失败后,延迟3秒放回阻塞队列
                this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                log.info("pull message later because not locked in broker, {}", pullRequest);
                return;
            }
复制代码
  • 检查topic设置

在即将发送前还会检查是否设置了subscriptionData,其实就是对应的tag,默认情况下是*,如果没有设置的话,同样会延迟3秒后放回阻塞队列:

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
        }
复制代码
  • 构建PullCallback

下一步就是构建PullCallback,但是可以先不看这个,这个属于消息拿到后的回调出来,里面有两个方法,一个是onSuccess(PullResult pullResult),一个是onException(Throwable e);如无论消息获取是否成功,依然还会创建新的pullRequest放进阻塞队列中,这样就保证了消息拉取的持续性;

  • 发送请求拉取消息

最终通过一系列的判断后,最后一步就是发出拉取消息的请求:

try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                this.defaultMQPushConsumer.getPullBatchSizeInBytes(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
复制代码

上面这个代码其实就是向Broker发送请求拉消息,消息拉到后将调用pullCallback里面的onSuccess()方法或onException()方法;

小结

从上述分析过程可以看出,在DefaultMQPushConsumer中有两个关键点可以保证消息的拉取的持续性:

1.重平衡机制会创建pullRequest放进阻塞队列中;

2.消息拉取成功或失败都会再次给阻塞队列补充pullRequest

以上两点保证消息拉取形成一个闭环,在客户端存活期间保证消息拉取不间断;



相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
632 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 存储 数据可视化
【RocketMq-生产者】消息发送者参数详解
首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。 这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
625 0
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
317 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
|
消息中间件 存储 缓存
RocketMQ Schema——让消息成为流动的结构化数据
RocketMQ Schema 提供了对消息的数据结构托管服务,同时为原生客户端提供了较为丰富的序列化/反序列化 SDK ,补齐了 RocketMQ 在数据治理和业务上下游解耦方面的短板,让数据成为流动的结构化数据,那么快来了解下实现原理吧~
474 0
RocketMQ Schema——让消息成为流动的结构化数据
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
940 1
|
消息中间件 缓存 负载均衡
RocketMQ消息生产者是如何选择Broker的
RocketMQ消息生产者是如何选择Broker的
481 1
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
501 1
|
消息中间件 存储 Java
10 张图告诉你 RocketMQ 是怎样保存消息的
10 张图告诉你 RocketMQ 是怎样保存消息的
198 0
10 张图告诉你 RocketMQ 是怎样保存消息的
|
消息中间件 存储 uml
5 张图带你彻底理解 RocketMQ 轨迹消息
5 张图带你彻底理解 RocketMQ 轨迹消息
407 0
5 张图带你彻底理解 RocketMQ 轨迹消息
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67607 2
3 张图带你彻底理解 RocketMQ 事务消息