开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息拉取客户端处理服务端相应】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12498
消息拉取客户端处理服务端相应
消息拉取客户端处理消息过程
在消息服务端把消息组装好了之后响应给客户端,客户端要怎么去处理?
客户端的处理的入口在 pullKernelImpl ,从客户端发起请求的位置请求 broker ,然后点击进入 pullKernelImpl:
PullResult pullResult = this.mQClientFactory.getMQClientAPIImp1().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullcallback );
return pullResult;}
通过 mQClientinstance 去发起一个请求。发起请求之后,点击pullMessage进入,
case ASYNC:
this.pullMessageAsync(addr,request,timeoutMillis
pullcallback);
reture null;
case SYNC:
return this.pullMessesync(addr,request, tineoutMillis);
从上面的代码发现,ASYNS 是异步的,SYNS 是同步的,同步的线程会带点阻塞,异步会直接返回 null 。
进入同步:
private PullResult pu11Messagesync(
final string addr,
final RemotingCommand request,
final long timeoutMillis
) throws RemotingException,InterruptedException,MQBrokerException {
RemotingCommand response = this.remotingClient.invokeSync(addr,request,timeoutMillis);
assert response != null;
return this.processPullResponse(response);
}
在这里会得到当前的响应。得到响应之后, processPullResponse 就开始去处理拉取请求的响应信息。
进入 processPullResponse :
发现,首先解析了响应头(respinseHeader)。然后封装了一个 PullResultExt 这样的一个结果类。
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) respanse.decodeCommandCustomHeader(PullMessageResponseHeader.class);
// 解析响应头
return new PullResultExt(pullstatus,responseHeader.getNextBeginoffset(),responseHeader.getMinoffset(),
responseHeader.getMaxoffset(),msgFoundList null,responseHeader.getSuggestWhichBrokerId(),response.getBody());}
// 封装
然后返回,返回到哪里?
在发起请求的时候不管是同步还是异步,里面都会有一个 PullCallback 。
在PullCallback当中,会去做响应的具体的处理,进去PullCallback当中,进行以下操作:
首先去 case 一下 PullResult .getPullstatus ,发现如果当前是找到(FOUND),
boolean dispatchToConsume = processQueue . putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consuneMessageService.submitConsumeRequest(pullResult.getMsgFoundiList(),processQueue,pullRequest.getMessageQueue(),dispatchToconsume);
主要是将当前的这个消息拿出来之后放到 processQueue 当中。因为消息拉取和处理是解耦的,是分开去做的,所以把它提交到 processQueue 当中,然后再将 processQueue提交到 consumerMessageservice 当中去进行一个具体的处理。
以上内容是消息拉取客户端处理消息全过程。
总结:
首先,客户端 (MQClientAPllpl) 的发起请求之后得到响应,然后它去解析响应码。解析响应码之后,它封装了一个 PullResultEXE 的结果对象,然后调用 PullCallback 回调函数。然后在回调函数主要做了两件事:第一件,取出消息并放到 processQueue 当中。然后再将 processQueue 提交到 PullMessageService当中的 consumeMessageService当中去进行具体的一个处理。这就是消息的客户端处理服务端的响应过程。
通过以上三个步骤,就把整个消息拉取的基本流程介绍完。其实在这个流程当中,只做了三件事,第一件,客户端一定要先发起请求;第二件,服务端处理请求,处理请求作为响应,客户端再去处理服务端它的相应学习。