前言
相信每一个使用RocketMQ
的开发工程师都想了解一下消息是如何被拉到消费端的,消费者在消息拉取过程中都做了什么操作?这些疑问将在接下来的这篇文章中给大家一一解开;
DefaultMQPushConsumer拉消息
RocketMQ
有两种类型的消费者,一种是DefaultLitePullConsumer
主动拉的模式,另一种是DefaultMQPushConsumer
被动接收的模式,我们先来介绍DefaultMQPushConsumer
是如何拉消息的。我们需要从MQClientInstance
类作为入口,找到pullMessageService
,它才是负责从Broker
拉取消息的;
- 启动拉消息线程
在消费者调用start()
方法时,我们最终可以发现MQClientInstance.start()
方法也被调用了,在里面我们可以发现这样一段代码:
// Start pull service this.pullMessageService.start(); 复制代码
最终我们发现pullMessageService
其实是ServiceThread
的子类,ServiceThread
又实现了Runnable
,说白了pullMessageService
就是一个任务,pullMessageService.start()
源码我们可以看一下:
public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } stopped = false; // 把pullMessageService作为线程任务 this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); this.thread.start(); } 复制代码
这里面就是启动了一个线程,然后这个线程的任务就是pullMessageService
本身,所以我们需要重点关注run()
方法的实现:
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 从阻塞队列中取MessageRequest MessageRequest messageRequest = this.messageRequestQueue.take(); // 根据消息请求模式拉消息 if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) { this.popMessage((PopRequest)messageRequest); } else { this.pullMessage((PullRequest)messageRequest); } } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); } 复制代码
- 维护消息请求队列
在DefaultMQPushConsumer
中,当重平衡任务启动后,每个消费者实例将分配到对应的MessageQueue
,然后就会创建对应的MessageRequest
放进messageRequestQueue
中,相关代码可查看RebalanceImpl.updateMessageQueueAssignment()
方法,下面贴出相关的代码片段:
boolean allMQLocked = true; List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mq2PushAssignment.keySet()) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); allMQLocked = false; continue; } this.removeDirtyOffset(mq); ProcessQueue pq = createProcessQueue(); pq.setLocked(true); long nextOffset = -1L; try { nextOffset = this.computePullFromWhereWithException(mq); } catch (Exception e) { log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); continue; } if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } if (!allMQLocked) { mQClientFactory.rebalanceLater(500); } this.dispatchPullRequest(pullRequestList, 500); 复制代码
这一段代码就是根据重平衡后生成了pullRequest
并放进了messageRequestQueue
请求队列中,同样的popRequest
也是和上面代码类似:
List<PopRequest> popRequestList = new ArrayList<PopRequest>(); for (MessageQueue mq : mq2PopAssignment.keySet()) { if (!this.popProcessQueueTable.containsKey(mq)) { PopProcessQueue pq = createPopProcessQueue(); PopProcessQueue pre = this.popProcessQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq pop already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new pop mq, {}", consumerGroup, mq); PopRequest popRequest = new PopRequest(); popRequest.setTopic(topic); popRequest.setConsumerGroup(consumerGroup); popRequest.setMessageQueue(mq); popRequest.setPopProcessQueue(pq); popRequest.setInitMode(getConsumeInitMode()); popRequestList.add(popRequest); changed = true; } } } this.dispatchPopPullRequest(popRequestList, 500); 复制代码
最终都是调用dispatchPullRequest()
和dispatchPopPullRequest()
把请求放进阻塞队列中:
@Override public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) { for (PullRequest pullRequest : pullRequestList) { if (delay <= 0) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); } else { this.defaultMQPushConsumerImpl.executePullRequestLater(pullRequest, delay); } } } @Override public void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay) { for (PopRequest pullRequest : pullRequestList) { if (delay <= 0) { this.defaultMQPushConsumerImpl.executePopPullRequestImmediately(pullRequest); } else { this.defaultMQPushConsumerImpl.executePopPullRequestLater(pullRequest, delay); } } } 复制代码
我们从这里就可以知道,DefaultMQPushConsumer
拉消息的请求是通过重平衡来第一次触发的;重平衡会触发拉消息的请求产生,并存放到阻塞队列中;另外pullMessageService
将会持续从阻塞队列中取出请求去真正地从Broker
拉消息;
- 从
Broker
拉消息
以PullRequest
作为示例,我们来看一下源码:
private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; // 最终还是要去DefaultMQPushConsumerImpl中调用pullMessage()方法 impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } } 复制代码
所以我们下一步进入核心代码,真正实现从Broker
拉消息,在处理之前会做一系列的判断:
- 1.状态检测
if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } 复制代码
如果在重平衡后,processQueue
已经被移除了,那么就不再去拉消息了;
try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return; } 复制代码
如果当前消费者不在运行状态,那么把这个pullRequest
重新放回阻塞队列,延迟3秒再处理;
if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } 复制代码
- 2.流量控制
如果这个消费者被暂停了,那么把这个任务放回阻塞队列,延迟1秒再处理;
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } 复制代码
如果processQueue
中未消费的消息数量大于PullThresholdForQueue
阈值,那么触发流控,这个pullRequest
将重新放回阻塞队列中,延迟50毫秒再处理;
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } 复制代码
如果processQueue
中未消费的消息大小(cachedMessageSizeInMiB
代表未消费消息的MB
)大于PullThresholdSizeForQueue
阈值,那么依然触发流控,延迟50豪秒放回阻塞队列中;
if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } } 复制代码
并发模式下,如果最大消费点位与最小消费点位间的差距超出了ConsumeConcurrentlyMaxSpan
阈值,默认是2000
,也会触发流控,延迟50豪秒放回阻塞队列中;
- 顺序消息
如果是顺序消息,那么需要先上锁再重新计算一下消费点位,如果锁竞争失败,那么重新放回阻塞队列,并延迟3秒钟:
if (processQueue.isLocked()) { if (!pullRequest.isPreviouslyLocked()) { long offset = -1L; try { // 重新计算消费点位 offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); if (offset < 0) { throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset); } } catch (Exception e) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); return; } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setPreviouslyLocked(true); pullRequest.setNextOffset(offset); } } else { // 锁竞争失败后,延迟3秒放回阻塞队列 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest); return; } 复制代码
- 检查
topic
设置
在即将发送前还会检查是否设置了subscriptionData
,其实就是对应的tag
,默认情况下是*
,如果没有设置的话,同样会延迟3秒后放回阻塞队列:
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; } 复制代码
- 构建
PullCallback
下一步就是构建PullCallback
,但是可以先不看这个,这个属于消息拿到后的回调出来,里面有两个方法,一个是onSuccess(PullResult pullResult)
,一个是onException(Throwable e)
;如无论消息获取是否成功,依然还会创建新的pullRequest
放进阻塞队列中,这样就保证了消息拉取的持续性;
- 发送请求拉取消息
最终通过一系列的判断后,最后一步就是发出拉取消息的请求:
try { this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), this.defaultMQPushConsumer.getPullBatchSizeInBytes(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } 复制代码
上面这个代码其实就是向Broker
发送请求拉消息,消息拉到后将调用pullCallback
里面的onSuccess()
方法或onException()
方法;
小结
从上述分析过程可以看出,在DefaultMQPushConsumer
中有两个关键点可以保证消息的拉取的持续性:
1.重平衡机制会创建pullRequest
放进阻塞队列中;
2.消息拉取成功或失败都会再次给阻塞队列补充pullRequest
;
以上两点保证消息拉取形成一个闭环,在客户端存活期间保证消息拉取不间断;