开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):Broker 组装消息】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12497
Broker 组装消息
Broker 拉取消息过程介绍
Broker 怎么去处理拉取消息的请求?
Broker 有一个类,通过下图中,可以看到,broker当中有一个 processor 的包,
包里有一个叫做 PullMessageProcessor 类。通过它去进行拉取请求的处理。
if (getMessageResult != null) {
response.setRemark(getMessageResult.getstatus().name() );
responseHeader.setNextBeginoffset(getMessageResult.getNextBeginoffset());
responseHeader.setMinoffset(getMessageResult.getMinoffset();
ResponseHeader.setMaxoffset(getiMessageResult.getMaxOffset();
点击 setMaxoffset ,会显示以下页面:
查找能找到两个 processRequest。进入第二个里面。
看到下面所示代码:
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
Remotingcommand request) throws RemotingCommandException {
return this.processpequest(ctx.channel(),request,brokerAllowSuspend: true);
}
之后发现它调用了一个私有的方法 processRequest。那么进入这个方法,
主要做的事情有:第一个,构建消息过滤器。哪个消息需要被消费哪个消息不需要消费,是通过这个过滤器的,也就是进行一个过滤的处理。构建过滤器的代码如下:
MessageFilter messageF1lter;
if(this.brokerController.getBrokerConfig( ).isFiltersupportRetry()){
messageFilter = new ExpressionForRetryMessageFilter( subscriptionData,consumerFilterData,
this.brokerController.getconsumerFilterManager());
}else {
messageFilter = new ExpressionMessageFilter(subscriptionData,consumerFilterData,
this.brokerController.getconsumerFilterManager());
}
构建过滤器上面的那些比较长的代码都在做一些检查的工作。messageFilter 就是在构建消息的过滤器。
紧接着去查找消息。消息在 DefaultMessageStore当中存储。所以去找 Messagestore去获得消息,代码是:
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore( ).getMessage(requestHeader.getconsumerGroup(),requestHeader .getTopic(),
requestHeader . getQueueId(),requestHeader .getQueueOffset (),requestHeader . getMaxMsgNums ( ), messageFilter);
然后把消息拿过来之后,开始构建响应头,构建响应的相关信息。
if (getMessageResult != null){
response.setRemark(getMessageResult.getstatus( ).name();
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginoffset());
responseHeader.setMinoffset(getMessageResult.getMinoffset());
responseHeader .setMaxoffset(getMessageResult.getMaxoffset();
然后有一个判断代码如下:
if (getMessageResult.isSuggestPullingFromSlave()){
responseHeader.setSuggestwhichBrokerId(subscriptionGroupConfig.getwhichBrokerwhenConsumeslowly());
}else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}
如果当前的拉取是从这个从节点拉取,它在这儿做了一个处理,就是如果从节点拉的比较慢,那下一次就会建议从主节点拉,因为主从之间的同步,如果延迟,那么为了拉取效率,下一次呢就会让从主节点去进行一个拉取。
消息拉取完了之后,对于服务端来讲,要去更新一下当前的这个拉取的一个进度。
bollean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() ! = BrokerRole.SLAVE;
if (storeOffEnable){
this.brokerController.getConsumerOffsetManager().commmitOffset(remotingHelper.parseChannelReoteAddr(channel),
Request.getConsumerGroup(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getCommitOffset
}
return response;
}
如上所示代码,去更新当前拉取的一个进度,把这个进度通过 commits 进行存储,之后把 response 进行返回。以上是消息服务端Broker组装消息去处理当前拉取消息过程。核心类是PullMessageProcessor。