Broker 组装消息|学习笔记

简介: 快速学习 Broker 组装消息

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)Broker 组装消息】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12497


Broker 组装消息


Broker 拉取消息过程介绍

Broker 怎么去处理拉取消息的请求?

Broker 有一个类,通过下图中,可以看到,broker当中有一个 processor 的包,

image.pngimage.png

包里有一个叫做 PullMessageProcessor 类。通过它去进行拉取请求的处理。

if (getMessageResult != null) {

response.setRemark(getMessageResult.getstatus().name() );responseHeader.setNextBeginoffset(getMessageResult.getNextBeginoffset());

responseHeader.setMinoffset(getMessageResult.getMinoffset();ResponseHeader.setMaxoffset(getiMessageResult.getMaxOffset();

点击 setMaxoffset ,会显示以下页面:

image.png查找能找到两个 processRequest。进入第二个里面。

看到下面所示代码:

public RemotingCommand processRequest(final ChannelHandlerContext ctx,

Remotingcommand request) throws RemotingCommandException {

return this.processpequest(ctx.channel(),request,brokerAllowSuspend: true);

}

之后发现它调用了一个私有的方法 processRequest。那么进入这个方法,

主要做的事情有:第一个,构建消息过滤器。哪个消息需要被消费哪个消息不需要消费,是通过这个过滤器的,也就是进行一个过滤的处理。构建过滤器的代码如下:

MessageFilter messageF1lter;

if(this.brokerController.getBrokerConfig( ).isFiltersupportRetry()){

messageFilter = new ExpressionForRetryMessageFilter( subscriptionData,consumerFilterData,

this.brokerController.getconsumerFilterManager());

}else {

messageFilter = new ExpressionMessageFilter(subscriptionData,consumerFilterData,

this.brokerController.getconsumerFilterManager());

}

构建过滤器上面的那些比较长的代码都在做一些检查的工作。messageFilter 就是在构建消息的过滤器。

紧接着去查找消息。消息在 DefaultMessageStore当中存储。所以去找 Messagestore去获得消息,代码是:

final GetMessageResult getMessageResult =

this.brokerController.getMessageStore( ).getMessage(requestHeader.getconsumerGroup(),requestHeader .getTopic(),

requestHeader . getQueueId(),requestHeader .getQueueOffset (),requestHeader . getMaxMsgNums ( ), messageFilter);

然后把消息拿过来之后,开始构建响应头,构建响应的相关信息。

if (getMessageResult != null){

response.setRemark(getMessageResult.getstatus( ).name();responseHeader.setNextBeginOffset(getMessageResult.getNextBeginoffset());responseHeader.setMinoffset(getMessageResult.getMinoffset());

responseHeader .setMaxoffset(getMessageResult.getMaxoffset();

然后有一个判断代码如下:

if (getMessageResult.isSuggestPullingFromSlave()){responseHeader.setSuggestwhichBrokerId(subscriptionGroupConfig.getwhichBrokerwhenConsumeslowly());

}else {

responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}

如果当前的拉取是从这个从节点拉取,它在这儿做了一个处理,就是如果从节点拉的比较慢,那下一次就会建议从主节点拉,因为主从之间的同步,如果延迟,那么为了拉取效率,下一次呢就会让从主节点去进行一个拉取。

消息拉取完了之后,对于服务端来讲,要去更新一下当前的这个拉取的一个进度。

bollean storeOffsetEnable = brokerAllowSuspend;

storeOffsetEnable = storeOffEnable && hasCommitOffsetFlag;

storeOffsetEnable = storeOffEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() ! = BrokerRole.SLAVE;

if (storeOffEnable){this.brokerController.getConsumerOffsetManager().commmitOffset(remotingHelper.parseChannelReoteAddr(channel),Request.getConsumerGroup(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getCommitOffset

}

return response;

}

如上所示代码,去更新当前拉取的一个进度,把这个进度通过 commits 进行存储,之后把 response 进行返回。以上是消息服务端Broker组装消息去处理当前拉取消息过程。核心类是PullMessageProcessor。

相关文章
|
2月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
181 4
|
7月前
|
消息中间件 RocketMQ
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
207 0
|
4月前
|
消息中间件
【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
【1月更文挑战第27天】【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
|
9月前
|
消息中间件
RabbitMQ如何支持事务性消息的发送和接收
RabbitMQ消息的发送和接收
137 0
|
7月前
|
消息中间件
RabbitMQ如何确保消息发送,消息接收
RabbitMQ如何确保消息发送,消息接收
45 0
|
10月前
|
消息中间件 负载均衡 Kafka
Kafka如何实现点对点消息和发布订阅消息?
Kafka 可以同时支持点对点消息和发布订阅消息模型
565 0
|
消息中间件 存储 Apache
解析 RocketMQ 业务消息--“顺序消息”
本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。
240 0
解析 RocketMQ  业务消息--“顺序消息”
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
存储 消息中间件 RocketMQ
RocketMQ学习Broker流程、生产者和存储流程联系
放入消息之后,进行操作体现在asyncSendMessage中。将消息以异步方式存储到存储器中,处理器可以处理下一个请求,而不是在结果完成后等待结果,以异步方式通知客户端。此时可以看到asyncPutMessage的操作中会进入到CommitLog中,此时进行提交日志操作,此时会执行写入到ByteBuffer中,然后刷盘到硬盘中。同时执行统计操作,进行HA同步。
119 0
RocketMQ学习Broker流程、生产者和存储流程联系
|
消息中间件 缓存 网络协议
多图详解kafka生产者消息发送过程
今天我们来通过源码来分析一下,生产者发送一条消息的所有流程~~~ 生产者客户端代码
多图详解kafka生产者消息发送过程

热门文章

最新文章