客户端发起拉取消息请求|学习笔记

简介: 快速学习客户端发起拉取消息请求

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)客户端发起拉取消息请求】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12496


客户端发起拉取消息请求

 

PullMessageService 是如何去拉取消息的,首先找一下代码的位置,其中有一行代码是:this.PullMessageService.start();说明了PullMessageService在 MQClientInstance 启动的时候也会同时启动。PullMessageService 是一个线程,在 start 的时候会调用它的 run 方法。

下面是 PullMessageService 的 run 方法:
image.png其中: this.pullMessage(pullRequest); 中可以看出,run 方法里取出一个消息 (pullMessage) ,同时取出一个拉取消息的请求 (pullRequest),然后进行处理。进入InterruptedException 中:

先拿到当前的消费者,对应代码:final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

然后把这个消费者转换成这种推送的方式,对应代码:DefaultMQPushconsumerImpl impl = (DefaultMQPushConsumerImpl) consumer

然后再去处理当前拉取的请求,对应代码:impl.pullMessage(pullRuquest)。

进入PullMessage

pullMessage 到底是怎么做的。首先先从 pullRequest 当中拿到 processQueue 。

然后看 processQuess 处理的对列是不是被丢弃了,如果被丢弃的话就直接去返回,就不需要再去进行拉取的处理了。

对应代码:if(processQueue.isDropped()){

1og.info("the pull request[{}] is dropped.", pullRequest.toString());

return;

}

去设置一个时间戳,如果当前消费者是被挂起来的:

If(this.isPause()){

log.warn(" consumer was paused,execute pull request later.instanceName={},

group={}" ,this.defaultMQPushConsumer .getInstanceName

this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);

return;

那么它默认会等待多长时间?进行以下操作:

点击进入PULL_TIME_DELAY_MILLS_WHEN_SUSPEND后会有这样的一串代码:

private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;

这行代码表示我们需要等待一秒钟才能再一次去执行。

image.png

上图所示,从 processQueue 当中去拿到当前已经缓存的消息的数量 (cacheMessageCount) ,以及拿到当前消息的大小 (processQueue.getMsgSize().get())。

流控的手段有两种。第一种是如果消息的数量大于1000那么当前也不需要再重复的去进行拉取的处理。如果消息的大小大于100兆也不需要再去进行了拉取的处理。

image.png

if (cachedMessagecount >

this.defaultMQPushConsumer.getPullThresholdForQueue ()){

this. executePullRequestLater (pullRequest, PULL_ TIME_DELAY _MILLS_ WHEN_FLOW_CONTROL);

if ((queueFlowcontrolTimes++ %1000) == 0){

log.warn(

Var1:

"the cached message count exceeds the threshold {}, so do flow control, minoffset={}, maxOffset={}, countthis.defaultMQPushConsumer.getPullThresholdForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.get

}

return;

}

以上代码是在做按照数量的流控(pullThresholdForQueue),

点击getPullThresholdForQueue进入:

会显示:private int pullThresholdForQueue = 1000;

所以它是按照数量的1000条去判断的。

下面是按照消息大小的兆数判断的代码:

if (cachedMessageSizeInMiB >

this.defaultMQPushConsumer.getPullThresholdForQueue ()){

this. executePullRequestLater (pullRequest, PULL_ TIME_DELAY _MILLS_ WHEN_FLOW_CONTROL);

if ((queueFlowcontrolTimes++ %1000) == 0){

log.warn(

Var1:

"the cached message count exceeds the threshold {}MiB, so do flow control, maxoffset={},

this.defaultMQPushConsumer.getPullThresholdForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.

}

return;

}

重复之前的操作得到:

private int pullThresholdSizeForQueue = 100;

得出当前消息大小是100兆,如果是100兆就不会拉取。

如果现在消息的数量比较小的话,就可以重复的拉取了。

那如何去拉取?

image.png

如上图,当前订阅信息(subcriptionData),判断这个订阅信息是否为空?对应代码: if(null ==subscriptionData) 。

如果为空的话,我们看它点的属性,还是按之前的操作,有:

private static final long

PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;

所以订阅信息为空,就会在三秒之后再次去拿订阅的信息。

因为只有拿到订阅信息之后,才知道到底哪一个地方拉取。

首先要会构建一个消息拉取的系统的标记(buildSysFlag) 。标识

一下当前的是哪一个节点去发起的一个拉取消息的一个请求。

代码如下:

image.png通过那客户端(pullKernelImpl)直接去请求 Broker 去拉取这个消息。

如何去拉取?

首先找到 Broker 的地址。进入 pullKernelImpl 中:

通过 findBrokerAddressInsubscribe,也就是从订阅信息中找到这个 Broker,然后拿到Broker 的地址findBorkerResult,拿到地址之后就可以去拉取对应的这个数据。

image.png

上图代码的作用是封装拉取的请求。

然后要取出 broker 地址 (brokerAddr) ,代码是:

string brokerAddr = findBrokerResult.getBrolkerAddr();

if(PullSysFlag.hasCLassFiLterFLog(sysFlagInner)){

brokerAddr = computPullFromwhichFilterServer(mq.getTopic(),brokerAddr);

}

然后 brokerAddr 通过 mQClientFactory 去拉取数据,从这儿发起一个请求到这个服务端(pullResult),然后让服务端把对应的数据返回回来。

代码:

PullResult pullResult = this.mQClientFactory.getMQClientAPImpl().pullMessage(

brokerAddr,

requestHeader,

timeoutMillis,

communicationmode,

pullCallback);

return pullResult;

}

这就是消息拉取的整个过程。最核心的类是MessageService。

相关文章
|
自然语言处理 数据可视化 物联网
Qwen1.5-MoE开源,魔搭社区推理训练最佳实践教程来啦
通义千问团队推出Qwen系列的首个MoE模型,Qwen1.5-MoE-A2.7B。
|
SQL 分布式计算 NoSQL
【SQL 审核查询平台】Archery使用介绍
【SQL 审核查询平台】Archery使用介绍
851 0
【SQL 审核查询平台】Archery使用介绍
|
缓存 SpringCloudAlibaba NoSQL
Spring Boot多级缓存实现方案
整合redis和caffeine实现多级缓存,解决上面单一缓存的痛点,从而做到相互补足
978 0
|
JavaScript
vue element plus Radio 单选框
vue element plus Radio 单选框
468 0
|
弹性计算 人工智能 测试技术
阿里云服务器收费标准、价格计算器和活动报价查询
阿里云服务器收费标准、价格计算器和活动报价查询,阿里云轻量应用服务器2核2G3M带宽轻量服务器一年108元,2核4G4M带宽轻量服务器一年297.98元12个月;ECS云服务器e系列2核2G配置182元一年、2核4G配置365元一年、2核8G配置522元一年
377 0
|
网络协议 网络安全
防火墙nat豁免与控制原理讲解
防火墙nat豁免与控制原理讲解
700 0
|
5天前
|
存储 弹性计算 人工智能
【2025云栖精华内容】 打造持续领先,全球覆盖的澎湃算力底座——通用计算产品发布与行业实践专场回顾
2025年9月24日,阿里云弹性计算团队多位产品、技术专家及服务器团队技术专家共同在【2025云栖大会】现场带来了《通用计算产品发布与行业实践》的专场论坛,本论坛聚焦弹性计算多款通用算力产品发布。同时,ECS云服务器安全能力、资源售卖模式、计算AI助手等用户体验关键环节也宣布升级,让用云更简单、更智能。海尔三翼鸟云服务负责人刘建锋先生作为特邀嘉宾,莅临现场分享了关于阿里云ECS g9i推动AIoT平台的场景落地实践。
【2025云栖精华内容】 打造持续领先,全球覆盖的澎湃算力底座——通用计算产品发布与行业实践专场回顾
|
4天前
|
云安全 人工智能 自然语言处理
阿里云x硅基流动:AI安全护栏助力构建可信模型生态
阿里云AI安全护栏:大模型的“智能过滤系统”。
|
4天前
|
人工智能 自然语言处理 自动驾驶
关于举办首届全国大学生“启真问智”人工智能模型&智能体大赛决赛的通知
关于举办首届全国大学生“启真问智”人工智能模型&智能体大赛决赛的通知