RocketMQ分析:PushConsumer流量控制

简介:

    本文分析PushConsumer的流量控制方法。PushConsumer使用Pull方式获取消息,好处是客户端能够根据自身的处理速度调整获取消息的操作速度。PushConsumer的流量控制采用多线程处理方式。

    RocketMQ的版本为:4.2.0 release。

一.PushConsumer使用线程池,每个线程同时执行对应的消息处理逻辑

     线程池的定义在 PushConsumer 启动的时候,初始化consumeMessageService的时候,在构造方法里面创建的。

    DefaultMQPushConsumer#start

public void start() throws MQClientException {
    this.defaultMQPushConsumerImpl.start();
}

    DefaultMQPushConsumerImpl#start

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

    ConsumeMessageOrderlyService#ConsumeMessageOrderlyService    构造方法 :

this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),// 线程池初始化时线程数量
    this.defaultMQPushConsumer.getConsumeThreadMax(),// 线程池最大线程数
    1000 * 60,
    TimeUnit.MILLISECONDS,// 线程保持活着的空闲时间,60秒
    this.consumeRequestQueue,// 排队等待线程队列
    new ThreadFactoryImpl("ConsumeMessageThread_")
);


二.使用ProcessQueue保持Message Queue消息处理状态的快照

    在pullMessage开始的时候,从pullRequest中获取ProcessQueue。

DefaultMQPushConsumerImpl#pullMessage
final ProcessQueue processQueue = pullRequest.getProcessQueue();// 从pullRequest中获取ProcessQueue

    拿到ProcessQueue对象之后,客户端在每次Pull请求之前会做下面三个判断来控制流量:消息个数、消息总大小以及Offset的跨度,任何一个值超过设定的大小就隔一段时间(默认50毫秒)再拉取消息,由此来达到流量控制的目的。

long cachedMessageCount = processQueue.getMsgCount().get();// 消息个数
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);// 消息总大小(单位M)
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {// 默认最大1000个
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);// 延迟50毫秒执行
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,                                 pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount,                             cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {// 默认最大100M
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);// 延迟50毫秒执行
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount,                         cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
if (!this.consumeOrderly) {
    if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {// Offset的跨度
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);// 延迟50毫秒执行
        if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
            log.warn(
                "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                pullRequest, queueMaxSpanFlowControlTimes);
        }
    return;
    }
} else {
    if (processQueue.isLocked()) {
        if (!pullRequest.isLockedFirst()) {
        final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
        boolean brokerBusy = offset < pullRequest.getNextOffset();
        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
            pullRequest, offset, brokerBusy);
        if (brokerBusy) {
            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                pullRequest, offset);
        }
        pullRequest.setLockedFirst(true);
        pullRequest.setNextOffset(offset);
    }
} else {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    log.info("pull message later because not locked in broker, {}", pullRequest);
    return;
}
}

    DefaultMQPushConsumerImpl#executePullRequestLater 延迟执行

this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
//PullMessageService#executePullRequestLater
if (!isStopped()) {
    this.scheduledExecutorService.schedule(new Runnable() {
        @Override
        public void run() {
            PullMessageService.this.executePullRequestImmediately(pullRequest);
        }
    }, timeDelay, TimeUnit.MILLISECONDS);// 延迟50毫秒执行
} else {
    log.warn("PullMessageServiceScheduledThread has shutdown");
}


三.ProcessQueue的结构

    ProcessQueue中主要是一个TreeMap和一个读写锁。TreeMap里以MessageQueue的Offset作为Key,以消息内容的引用为Value,保存所有从MessageQueue获取到,但是还未被处理的消息;读写锁的作用是控制多线程下对TreeMap对象的并发访问

private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();// 保护TreeMap的读写锁
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

    使用读写锁作并发控制:

#ProcessQueue#putMessage 写锁
boolean dispatchToConsume = false;
try {
    this.lockTreeMap.writeLock().lockInterruptibly();// 写加锁
    try {
         ......
        msgCount.addAndGet(validMsgCnt);
        ......
    } finally {
        this.lockTreeMap.writeLock().unlock();// 写解锁
    }
} catch (InterruptedException e) {
    log.error("putMessage exception", e);
}
return dispatchToConsume;
#ProcessQueue#getMaxSpan 读锁
try {
    this.lockTreeMap.readLock().lockInterruptibly();// 读加锁
    try {
        if (!this.msgTreeMap.isEmpty()) {
            return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
        }
    } finally {
        this.lockTreeMap.readLock().unlock();// 读解锁
    }
} catch (InterruptedException e) {
    log.error("getMaxSpan exception", e);
}
return 0;

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
105 0
|
4月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
285 2
|
4月前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
125 1
|
7月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
104 0
|
4月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
295 0
RocketMQ—一次连接namesvr失败的案例分析
|
6月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
180 1
|
7月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
647 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
7月前
|
消息中间件 Oracle Java
【RocketMq】Broker 启动脚本分析
【RocketMq】Broker 启动脚本分析
114 0
|
7月前
|
消息中间件 缓存 Java
【RocketMq】NameServ启动脚本分析(Ver4.9.4)
【RocketMq】NameServ启动脚本分析(Ver4.9.4)
169 0
|
7月前
|
消息中间件 缓存 算法
【RocketMq】NameServ启动脚本分析(Ver4.9.4)(二)
【RocketMq】NameServ启动脚本分析(Ver4.9.4)
136 0
【RocketMq】NameServ启动脚本分析(Ver4.9.4)(二)
下一篇
DataWorks