消息拉取客户端处理服务端相应|学习笔记

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 快速学习消息拉取客户端处理服务端相应

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息拉取客户端处理服务端相应】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12498


消息拉取客户端处理服务端相应

 

消息拉取客户端处理消息过程

在消息服务端把消息组装好了之后响应给客户端,客户端要怎么去处理?

image.png客户端的处理的入口在 pullKernelImpl ,从客户端发起请求的位置请求 broker ,然后点击进入 pullKernelImpl:

PullResult pullResult = this.mQClientFactory.getMQClientAPIImp1().pullMessage(

brokerAddr,

requestHeader,

timeoutMillis,

communicationMode,

pullcallback );

return pullResult;}

通过 mQClientinstance 去发起一个请求。发起请求之后,点击pullMessage进入,

case ASYNC:

this.pullMessageAsync(addr,request,timeoutMillis

pullcallback);

reture null;

case SYNC:

return this.pullMessesync(addr,request, tineoutMillis);

从上面的代码发现,ASYNS 是异步的,SYNS 是同步的,同步的线程会带点阻塞,异步会直接返回 null 。

进入同步:

private PullResult pu11Messagesync(

final string addr,

final RemotingCommand request,

final long timeoutMillis

) throws RemotingException,InterruptedException,MQBrokerException {

RemotingCommand response = this.remotingClient.invokeSync(addr,request,timeoutMillis);

assert response != null;

return this.processPullResponse(response);

}

在这里会得到当前的响应。得到响应之后, processPullResponse 就开始去处理拉取请求的响应信息。

进入 processPullResponse :

发现,首先解析了响应头(respinseHeader)。然后封装了一个 PullResultExt 这样的一个结果类。

PullMessageResponseHeader responseHeader =

(PullMessageResponseHeader) respanse.decodeCommandCustomHeader(PullMessageResponseHeader.class); // 解析响应头

return new PullResultExt(pullstatus,responseHeader.getNextBeginoffset(),responseHeader.getMinoffset(),

responseHeader.getMaxoffset(),msgFoundList null,responseHeader.getSuggestWhichBrokerId(),response.getBody());} // 封装

然后返回,返回到哪里?

在发起请求的时候不管是同步还是异步,里面都会有一个 PullCallback 。

在PullCallback当中,会去做响应的具体的处理,进去PullCallback当中,进行以下操作:

image.png首先去 case 一下 PullResult .getPullstatus ,发现如果当前是找到(FOUND),

boolean dispatchToConsume = processQueue . putMessage(pullResult.getMsgFoundList());DefaultMQPushConsumerImpl.this.consuneMessageService.submitConsumeRequest(pullResult.getMsgFoundiList(),processQueue,pullRequest.getMessageQueue(),dispatchToconsume);

主要是将当前的这个消息拿出来之后放到 processQueue 当中。因为消息拉取和处理是解耦的,是分开去做的,所以把它提交到 processQueue 当中,然后再将 processQueue提交到 consumerMessageservice 当中去进行一个具体的处理。

以上内容是消息拉取客户端处理消息全过程。

总结:

image.png首先,客户端 (MQClientAPllpl) 的发起请求之后得到响应,然后它去解析响应码。解析响应码之后,它封装了一个 PullResultEXE 的结果对象,然后调用 PullCallback 回调函数。然后在回调函数主要做了两件事:第一件,取出消息并放到 processQueue 当中。然后再将 processQueue 提交到 PullMessageService当中的 consumeMessageService当中去进行具体的一个处理。这就是消息的客户端处理服务端的响应过程。

通过以上三个步骤,就把整个消息拉取的基本流程介绍完。其实在这个流程当中,只做了三件事,第一件,客户端一定要先发起请求;第二件,服务端处理请求,处理请求作为响应,客户端再去处理服务端它的相应学习。

相关文章
|
消息中间件 Apache RocketMQ
rocketmq客户端发送消息报错和超时问题
org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <10.0.21.69:10911> timeout, 1000(ms)、 closeChannel: close the connection to remote address
3677 1
rocketmq客户端发送消息报错和超时问题
|
3月前
|
消息中间件 监控 Java
RocketMQ 同步发送、异步发送和单向发送,如何选择?
本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。
367 4
|
Web App开发 缓存 网络协议
如何实现服务端向客户端推送数据
常见的http协议只能从客户端主动向服务端请求数据,而服务端无法向客户端发送数据.本文通过介绍几种方式来实现上述功能.
|
8月前
|
消息中间件
【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
【1月更文挑战第27天】【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
|
7月前
|
消息中间件 Serverless 网络性能优化
消息队列 MQ产品使用合集之客户端和服务器之间的保活心跳检测间隔是怎么设置的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
网络协议 Cloud Native
为什么服务端会有那么多的 TimeWait ?
为什么服务端会有那么多的 TimeWait ?
|
消息中间件 缓存 RocketMQ
客户端发起拉取消息请求|学习笔记
快速学习客户端发起拉取消息请求
客户端发起拉取消息请求|学习笔记
|
消息中间件 RocketMQ 开发者
拉取消息长轮询机制|学习笔记
快速学习拉取消息长轮询机制
拉取消息长轮询机制|学习笔记
|
消息中间件 负载均衡 RocketMQ
消息拉取介绍|学习笔记
快速学习消息拉取介绍
消息拉取介绍|学习笔记
|
消息中间件 RocketMQ 开发者
发送同步消息|学习笔记
快速学习发送同步消息
117 0
发送同步消息|学习笔记

热门文章

最新文章