Broker 组装消息|学习笔记

简介: 快速学习 Broker 组装消息

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)Broker 组装消息】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12497


Broker 组装消息


Broker 拉取消息过程介绍

Broker 怎么去处理拉取消息的请求?

Broker 有一个类,通过下图中,可以看到,broker当中有一个 processor 的包,

image.pngimage.png

包里有一个叫做 PullMessageProcessor 类。通过它去进行拉取请求的处理。

if (getMessageResult != null) {

response.setRemark(getMessageResult.getstatus().name() );responseHeader.setNextBeginoffset(getMessageResult.getNextBeginoffset());

responseHeader.setMinoffset(getMessageResult.getMinoffset();ResponseHeader.setMaxoffset(getiMessageResult.getMaxOffset();

点击 setMaxoffset ,会显示以下页面:

image.png查找能找到两个 processRequest。进入第二个里面。

看到下面所示代码:

public RemotingCommand processRequest(final ChannelHandlerContext ctx,

Remotingcommand request) throws RemotingCommandException {

return this.processpequest(ctx.channel(),request,brokerAllowSuspend: true);

}

之后发现它调用了一个私有的方法 processRequest。那么进入这个方法,

主要做的事情有:第一个,构建消息过滤器。哪个消息需要被消费哪个消息不需要消费,是通过这个过滤器的,也就是进行一个过滤的处理。构建过滤器的代码如下:

MessageFilter messageF1lter;

if(this.brokerController.getBrokerConfig( ).isFiltersupportRetry()){

messageFilter = new ExpressionForRetryMessageFilter( subscriptionData,consumerFilterData,

this.brokerController.getconsumerFilterManager());

}else {

messageFilter = new ExpressionMessageFilter(subscriptionData,consumerFilterData,

this.brokerController.getconsumerFilterManager());

}

构建过滤器上面的那些比较长的代码都在做一些检查的工作。messageFilter 就是在构建消息的过滤器。

紧接着去查找消息。消息在 DefaultMessageStore当中存储。所以去找 Messagestore去获得消息,代码是:

final GetMessageResult getMessageResult =

this.brokerController.getMessageStore( ).getMessage(requestHeader.getconsumerGroup(),requestHeader .getTopic(),

requestHeader . getQueueId(),requestHeader .getQueueOffset (),requestHeader . getMaxMsgNums ( ), messageFilter);

然后把消息拿过来之后,开始构建响应头,构建响应的相关信息。

if (getMessageResult != null){

response.setRemark(getMessageResult.getstatus( ).name();responseHeader.setNextBeginOffset(getMessageResult.getNextBeginoffset());responseHeader.setMinoffset(getMessageResult.getMinoffset());

responseHeader .setMaxoffset(getMessageResult.getMaxoffset();

然后有一个判断代码如下:

if (getMessageResult.isSuggestPullingFromSlave()){responseHeader.setSuggestwhichBrokerId(subscriptionGroupConfig.getwhichBrokerwhenConsumeslowly());

}else {

responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}

如果当前的拉取是从这个从节点拉取,它在这儿做了一个处理,就是如果从节点拉的比较慢,那下一次就会建议从主节点拉,因为主从之间的同步,如果延迟,那么为了拉取效率,下一次呢就会让从主节点去进行一个拉取。

消息拉取完了之后,对于服务端来讲,要去更新一下当前的这个拉取的一个进度。

bollean storeOffsetEnable = brokerAllowSuspend;

storeOffsetEnable = storeOffEnable && hasCommitOffsetFlag;

storeOffsetEnable = storeOffEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() ! = BrokerRole.SLAVE;

if (storeOffEnable){this.brokerController.getConsumerOffsetManager().commmitOffset(remotingHelper.parseChannelReoteAddr(channel),Request.getConsumerGroup(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getCommitOffset

}

return response;

}

如上所示代码,去更新当前拉取的一个进度,把这个进度通过 commits 进行存储,之后把 response 进行返回。以上是消息服务端Broker组装消息去处理当前拉取消息过程。核心类是PullMessageProcessor。

相关文章
|
11月前
|
监控 测试技术 持续交付
自动化和持续集成在软件开发中各自扮演什么角色
自动化和持续集成在软件开发中各自扮演什么角色
|
6月前
|
Java Linux API
课时3:Java简介(Java主要特点)
本文介绍了Java的主要特点及其运行机制。Java结合了编译型和解释型语言的优点,通过Java虚拟机(JVM)实现跨平台移植,简化了不同操作系统间的开发流程。Java的特点包括可移植性、简单易用、支持多线程编程、自动垃圾收集和面向对象编程。随着硬件技术的发展,Java的性能问题已大大改善,成为行业标准之一,广泛应用于各种商用平台开发。
225 1
|
11月前
|
SQL 数据库
数据审计 -本福德定律 Benford‘s law (sample database classicmodels _No.6)
数据审计 -本福德定律 Benford‘s law (sample database classicmodels _No.6)
203 1
学生0元购|低配也能畅玩!《黑神话:悟空》云电脑攻略
《黑神话:悟空》正式上市,这款备受期待的游戏对电脑配置要求不低,但通过云电脑,你无需担心硬件限制,随时随地畅玩大作。最低仅需1.2元/小时,还能利用学生福利免费畅玩。快速上手教程与省钱攻略,助你轻松征服《黑神话:悟空》!
665 8
学生0元购|低配也能畅玩!《黑神话:悟空》云电脑攻略
|
开发框架 前端开发 C#
Sublime Text 3配置 C# 开发环境
【5月更文挑战第2天】本篇 Huazie 介绍了 Sublime Text 3 配置 C# 的相关内容,感兴趣的朋友赶紧配置起来,有任何问题可以随时评论区沟通。
63712 7
Sublime Text 3配置 C# 开发环境
|
移动开发 监控 Java
如何使用Java中的WebSocket?
如何使用Java中的WebSocket?
|
网络协议 网络架构 C++
【计算机网络概述】第一章:概论 1.1什么是Internet
【计算机网络概述】第一章:概论 1.1什么是Internet
116 2
|
开发工具 数据安全/隐私保护 git
码云——vscode多人协同开发
码云——vscode多人协同开发
347 0
|
机器学习/深度学习 决策智能
**批量归一化(BN)**是2015年提出的深度学习优化技术,旨在解决**内部协变量偏移**和**梯度问题**。
【6月更文挑战第28天】**批量归一化(BN)**是2015年提出的深度学习优化技术,旨在解决**内部协变量偏移**和**梯度问题**。BN通过在每个小批量上执行**标准化**,然后应用学习到的γ和β参数,确保层间输入稳定性,加速训练,减少对超参数的敏感性,并作为隐含的正则化手段对抗过拟合。这提升了模型训练速度和性能,简化了初始化。
173 0
|
Java Spring
精通 Spring Boot 系列 11
精通 Spring Boot 系列 11
65 0