开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):客户端发起拉取消息请求】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12496
客户端发起拉取消息请求
PullMessageService 是如何去拉取消息的,首先找一下代码的位置,其中有一行代码是:this.PullMessageService.start();
说明了PullMessageService在 MQClientInstance 启动的时候也会同时启动。PullMessageService 是一个线程,在 start 的时候会调用它的 run 方法。
下面是 PullMessageService 的 run 方法:
其中: this.pullMessage(pullRequest);
中可以看出,run 方法里取出一个消息 (pullMessage) ,同时取出一个拉取消息的请求 (pullRequest),然后进行处理。进入InterruptedException 中:
先拿到当前的消费者,对应代码:final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
然后把这个消费者转换成这种推送的方式,对应代码:DefaultMQPushconsumerImpl impl = (DefaultMQPushConsumerImpl) consumer
然后再去处理当前拉取的请求,对应代码:impl.pullMessage(pullRuquest)。
进入PullMessage
pullMessage 到底是怎么做的。首先先从 pullRequest 当中拿到 processQueue 。
然后看 processQuess 处理的对列是不是被丢弃了,如果被丢弃的话就直接去返回,就不需要再去进行拉取的处理了。
对应代码:if(processQueue.isDropped()){
1og.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
去设置一个时间戳,如果当前消费者是被挂起来的:
If(this.isPause()){
log.warn(" consumer was paused,execute pull request later.instanceName={},
group={}" ,this.defaultMQPushConsumer .getInstanceName
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
那么它默认会等待多长时间?进行以下操作:
点击进入PULL_TIME_DELAY_MILLS_WHEN_SUSPEND后会有这样的一串代码:
private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
这行代码表示我们需要等待一秒钟才能再一次去执行。
上图所示,从 processQueue 当中去拿到当前已经缓存的消息的数量 (cacheMessageCount) ,以及拿到当前消息的大小 (processQueue.getMsgSize().get())。
流控的手段有两种。第一种是如果消息的数量大于1000那么当前也不需要再重复的去进行拉取的处理。如果消息的大小大于100兆也不需要再去进行了拉取的处理。
if (cachedMessagecount >
this.defaultMQPushConsumer.getPullThresholdForQueue ()){
this. executePullRequestLater (pullRequest, PULL_ TIME_DELAY _MILLS_ WHEN_FLOW_CONTROL);
if ((queueFlowcontrolTimes++ %1000) == 0){
log.warn(
Var1:
"the cached message count exceeds the threshold {}, so do flow control, minoffset={}, maxOffset={}, count
this.defaultMQPushConsumer.getPullThresholdForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.get
}
return;
}
以上代码是在做按照数量的流控(pullThresholdForQueue),
点击getPullThresholdForQueue进入:
会显示:private int pullThresholdForQueue = 1000;
所以它是按照数量的1000条去判断的。
下面是按照消息大小的兆数判断的代码:
if (cachedMessageSizeInMiB >
this.defaultMQPushConsumer.getPullThresholdForQueue ()){
this. executePullRequestLater (pullRequest, PULL_ TIME_DELAY _MILLS_ WHEN_FLOW_CONTROL);
if ((queueFlowcontrolTimes++ %1000) == 0){
log.warn(
Var1:
"the cached message count exceeds the threshold {}MiB, so do flow control, maxoffset={},
this.defaultMQPushConsumer.getPullThresholdForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.
}
return;
}
重复之前的操作得到:
private int pullThresholdSizeForQueue = 100;
得出当前消息大小是100兆,如果是100兆就不会拉取。
如果现在消息的数量比较小的话,就可以重复的拉取了。
那如何去拉取?
如上图,当前订阅信息(subcriptionData),判断这个订阅信息是否为空?对应代码: if(null ==subscriptionData) 。
如果为空的话,我们看它点的属性,还是按之前的操作,有:
private static final long
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
所以订阅信息为空,就会在三秒之后再次去拿订阅的信息。
因为只有拿到订阅信息之后,才知道到底哪一个地方拉取。
首先要会构建一个消息拉取的系统的标记(buildSysFlag) 。标识
一下当前的是哪一个节点去发起的一个拉取消息的一个请求。
代码如下:
通过那客户端(pullKernelImpl)直接去请求 Broker 去拉取这个消息。
如何去拉取?
首先找到 Broker 的地址。进入 pullKernelImpl 中:
通过 findBrokerAddressInsubscribe,也就是从订阅信息中找到这个 Broker,然后拿到Broker 的地址findBorkerResult,拿到地址之后就可以去拉取对应的这个数据。
上图代码的作用是封装拉取的请求。
然后要取出 broker 地址 (brokerAddr) ,代码是:
string brokerAddr = findBrokerResult.getBrolkerAddr();
if(PullSysFlag.hasCLassFiLterFLog(sysFlagInner)){
brokerAddr = computPullFromwhichFilterServer(mq.getTopic(),brokerAddr);
}
然后 brokerAddr 通过 mQClientFactory 去拉取数据,从这儿发起一个请求到这个服务端(pullResult),然后让服务端把对应的数据返回回来。
代码:
PullResult pullResult = this.mQClientFactory.getMQClientAPImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationmode,
pullCallback);
return pullResult;
}
这就是消息拉取的整个过程。最核心的类是MessageService。