客户端发起拉取消息请求|学习笔记

简介: 快速学习客户端发起拉取消息请求

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)客户端发起拉取消息请求】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12496


客户端发起拉取消息请求

 

PullMessageService 是如何去拉取消息的,首先找一下代码的位置,其中有一行代码是:this.PullMessageService.start();说明了PullMessageService在 MQClientInstance 启动的时候也会同时启动。PullMessageService 是一个线程,在 start 的时候会调用它的 run 方法。

下面是 PullMessageService 的 run 方法:
image.png其中: this.pullMessage(pullRequest); 中可以看出,run 方法里取出一个消息 (pullMessage) ,同时取出一个拉取消息的请求 (pullRequest),然后进行处理。进入InterruptedException 中:

先拿到当前的消费者,对应代码:final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

然后把这个消费者转换成这种推送的方式,对应代码:DefaultMQPushconsumerImpl impl = (DefaultMQPushConsumerImpl) consumer

然后再去处理当前拉取的请求,对应代码:impl.pullMessage(pullRuquest)。

进入PullMessage

pullMessage 到底是怎么做的。首先先从 pullRequest 当中拿到 processQueue 。

然后看 processQuess 处理的对列是不是被丢弃了,如果被丢弃的话就直接去返回,就不需要再去进行拉取的处理了。

对应代码:if(processQueue.isDropped()){

1og.info("the pull request[{}] is dropped.", pullRequest.toString());

return;

}

去设置一个时间戳,如果当前消费者是被挂起来的:

If(this.isPause()){

log.warn(" consumer was paused,execute pull request later.instanceName={},

group={}" ,this.defaultMQPushConsumer .getInstanceName

this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);

return;

那么它默认会等待多长时间?进行以下操作:

点击进入PULL_TIME_DELAY_MILLS_WHEN_SUSPEND后会有这样的一串代码:

private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;

这行代码表示我们需要等待一秒钟才能再一次去执行。

image.png

上图所示,从 processQueue 当中去拿到当前已经缓存的消息的数量 (cacheMessageCount) ,以及拿到当前消息的大小 (processQueue.getMsgSize().get())。

流控的手段有两种。第一种是如果消息的数量大于1000那么当前也不需要再重复的去进行拉取的处理。如果消息的大小大于100兆也不需要再去进行了拉取的处理。

image.png

if (cachedMessagecount >

this.defaultMQPushConsumer.getPullThresholdForQueue ()){

this. executePullRequestLater (pullRequest, PULL_ TIME_DELAY _MILLS_ WHEN_FLOW_CONTROL);

if ((queueFlowcontrolTimes++ %1000) == 0){

log.warn(

Var1:

"the cached message count exceeds the threshold {}, so do flow control, minoffset={}, maxOffset={}, countthis.defaultMQPushConsumer.getPullThresholdForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.get

}

return;

}

以上代码是在做按照数量的流控(pullThresholdForQueue),

点击getPullThresholdForQueue进入:

会显示:private int pullThresholdForQueue = 1000;

所以它是按照数量的1000条去判断的。

下面是按照消息大小的兆数判断的代码:

if (cachedMessageSizeInMiB >

this.defaultMQPushConsumer.getPullThresholdForQueue ()){

this. executePullRequestLater (pullRequest, PULL_ TIME_DELAY _MILLS_ WHEN_FLOW_CONTROL);

if ((queueFlowcontrolTimes++ %1000) == 0){

log.warn(

Var1:

"the cached message count exceeds the threshold {}MiB, so do flow control, maxoffset={},

this.defaultMQPushConsumer.getPullThresholdForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.

}

return;

}

重复之前的操作得到:

private int pullThresholdSizeForQueue = 100;

得出当前消息大小是100兆,如果是100兆就不会拉取。

如果现在消息的数量比较小的话,就可以重复的拉取了。

那如何去拉取?

image.png

如上图,当前订阅信息(subcriptionData),判断这个订阅信息是否为空?对应代码: if(null ==subscriptionData) 。

如果为空的话,我们看它点的属性,还是按之前的操作,有:

private static final long

PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;

所以订阅信息为空,就会在三秒之后再次去拿订阅的信息。

因为只有拿到订阅信息之后,才知道到底哪一个地方拉取。

首先要会构建一个消息拉取的系统的标记(buildSysFlag) 。标识

一下当前的是哪一个节点去发起的一个拉取消息的一个请求。

代码如下:

image.png通过那客户端(pullKernelImpl)直接去请求 Broker 去拉取这个消息。

如何去拉取?

首先找到 Broker 的地址。进入 pullKernelImpl 中:

通过 findBrokerAddressInsubscribe,也就是从订阅信息中找到这个 Broker,然后拿到Broker 的地址findBorkerResult,拿到地址之后就可以去拉取对应的这个数据。

image.png

上图代码的作用是封装拉取的请求。

然后要取出 broker 地址 (brokerAddr) ,代码是:

string brokerAddr = findBrokerResult.getBrolkerAddr();

if(PullSysFlag.hasCLassFiLterFLog(sysFlagInner)){

brokerAddr = computPullFromwhichFilterServer(mq.getTopic(),brokerAddr);

}

然后 brokerAddr 通过 mQClientFactory 去拉取数据,从这儿发起一个请求到这个服务端(pullResult),然后让服务端把对应的数据返回回来。

代码:

PullResult pullResult = this.mQClientFactory.getMQClientAPImpl().pullMessage(

brokerAddr,

requestHeader,

timeoutMillis,

communicationmode,

pullCallback);

return pullResult;

}

这就是消息拉取的整个过程。最核心的类是MessageService。

相关文章
|
2月前
发送同步请求模块
发送同步请求模块
24 1
|
Web App开发 缓存 网络协议
如何实现服务端向客户端推送数据
常见的http协议只能从客户端主动向服务端请求数据,而服务端无法向客户端发送数据.本文通过介绍几种方式来实现上述功能.
|
2月前
|
存储 JSON 监控
源码分析Zabbix客户端如何向服务端发起请求
源码分析Zabbix客户端如何向服务端发起请求
41 2
|
7月前
|
网络协议 Cloud Native
为什么服务端会有那么多的 TimeWait ?
为什么服务端会有那么多的 TimeWait ?
|
10月前
|
存储 缓存 关系型数据库
1.6 服务器处理客户端请求
1.6 服务器处理客户端请求
51 0
|
消息中间件 RocketMQ 开发者
消息拉取客户端处理服务端相应|学习笔记
快速学习消息拉取客户端处理服务端相应
95 0
消息拉取客户端处理服务端相应|学习笔记
|
消息中间件 RocketMQ 开发者
拉取消息长轮询机制|学习笔记
快速学习拉取消息长轮询机制
256 0
拉取消息长轮询机制|学习笔记
|
消息中间件 RocketMQ 开发者
发送同步消息|学习笔记
快速学习发送同步消息
81 0
发送同步消息|学习笔记
|
移动开发 网络协议 测试技术
服务器循环接收客户端消息|学习笔记
快速学习服务器循环接收客户端消息
117 0
服务器循环接收客户端消息|学习笔记
|
运维 Java 数据库连接
排除法,先找客户端问题,再找服务端问题
先找客户端问题,再找服务端问题
62 0