本文分析PushConsumer的流量控制方法。PushConsumer使用Pull方式获取消息,好处是客户端能够根据自身的处理速度调整获取消息的操作速度。PushConsumer的流量控制采用多线程处理方式。
RocketMQ的版本为:4.2.0 release。
一.PushConsumer使用线程池,每个线程同时执行对应的消息处理逻辑
线程池的定义在 PushConsumer 启动的时候,初始化consumeMessageService的时候,在构造方法里面创建的。
DefaultMQPushConsumer#start
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}
DefaultMQPushConsumerImpl#start
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
ConsumeMessageOrderlyService#ConsumeMessageOrderlyService 构造方法 :
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),// 线程池初始化时线程数量
this.defaultMQPushConsumer.getConsumeThreadMax(),// 线程池最大线程数
1000 * 60,
TimeUnit.MILLISECONDS,// 线程保持活着的空闲时间,60秒
this.consumeRequestQueue,// 排队等待线程队列
new ThreadFactoryImpl("ConsumeMessageThread_")
);
二.使用ProcessQueue保持Message Queue消息处理状态的快照
在pullMessage开始的时候,从pullRequest中获取ProcessQueue。
DefaultMQPushConsumerImpl#pullMessage
final ProcessQueue processQueue = pullRequest.getProcessQueue();// 从pullRequest中获取ProcessQueue
拿到ProcessQueue对象之后,客户端在每次Pull请求之前会做下面三个判断来控制流量:消息个数、消息总大小以及Offset的跨度,任何一个值超过设定的大小就隔一段时间(默认50毫秒)再拉取消息,由此来达到流量控制的目的。
long cachedMessageCount = processQueue.getMsgCount().get();// 消息个数
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);// 消息总大小(单位M)
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {// 默认最大1000个
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);// 延迟50毫秒执行
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {// 默认最大100M
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);// 延迟50毫秒执行
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {// Offset的跨度
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);// 延迟50毫秒执行
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
DefaultMQPushConsumerImpl#executePullRequestLater 延迟执行
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
//PullMessageService#executePullRequestLater
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);// 延迟50毫秒执行
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
三.ProcessQueue的结构
ProcessQueue中主要是一个TreeMap和一个读写锁。TreeMap里以MessageQueue的Offset作为Key,以消息内容的引用为Value,保存所有从MessageQueue获取到,但是还未被处理的消息;读写锁的作用是控制多线程下对TreeMap对象的并发访问。
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();// 保护TreeMap的读写锁
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
使用读写锁作并发控制:
#ProcessQueue#putMessage 写锁
boolean dispatchToConsume = false;
try {
this.lockTreeMap.writeLock().lockInterruptibly();// 写加锁
try {
......
msgCount.addAndGet(validMsgCnt);
......
} finally {
this.lockTreeMap.writeLock().unlock();// 写解锁
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
return dispatchToConsume;
#ProcessQueue#getMaxSpan 读锁
try {
this.lockTreeMap.readLock().lockInterruptibly();// 读加锁
try {
if (!this.msgTreeMap.isEmpty()) {
return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
}
} finally {
this.lockTreeMap.readLock().unlock();// 读解锁
}
} catch (InterruptedException e) {
log.error("getMaxSpan exception", e);
}
return 0;