顺利拿下Offer 通过分析rocketMq消费者拉取消息源码

简介: 顺利拿下Offer 通过分析rocketMq消费者拉取消息源码

Rocketmq服务器和消费者之间的消息传输有push(推模式)和pull(拉模式)两种。

Push模式:底层实现也是使用拉取模式,由Rocketmq客户端底层封装实现自动消息拉取,拉取到的消息会回调消费者注册的监听器函数,把消息传给消费者,让消费者进行消费。根据拉取的结果,设置队列下一次拉取的偏移量进行下一次拉取任务。

Pull消费模式:更加灵活,可以让消费者自己决定拉取消息时机和更新队列的偏移量,当然使用上也变得复杂一些。

本文主要讲解push模式的底层实现,pull模式在4.7版本之前,调用pull方法是直接请求服务端拉取消息,4.7版本的pull模式后面文章会单独再讲解。

Push模式的主要流程

主要流程图:(可以点击图片放大)

每个步骤的讲解:

  • 1.PullMessageService线程从阻塞队列中获取PullRequest任务(PullRequest对象什么时候放进阻塞队列的呢?后面讲解。)
  • 2.将PullRequest对象交给DefaultMQPushConsumerImpl类处

题外话:如果有发生了拉取流控会打印日志,我们排查消费者端是否出现了流控可以查看rocketmqClient.log日志进行排查,关键字”so do flow control”

  • b. 如果是顺序消费还需要判断是否获取到该队列的锁,该锁要向broker申请,因为顺序消息要只有在获取到队列的锁的情况下,才能进行消息的拉取,保证了消息的顺序。
  • c. 条件不满足就把PullRequest对象,延迟一定时间,重新放到PullMessageService的队列里。
  • a. 流控处理(拉取到本地的消息条数和大小最小和最大偏移量间隔),可以防止太多消息拉取到本地,导致内存溢出。
  • 3.PullAPIWrapper对PullRequest对象进行包装,主要查找broker地址,封装请求对象。
  • 4.MQClientAPIImpl选择异步还是同步方式,push是异步方式。
  • 5.NettyRemotingClient调用invokeAsynImpl方法。
  • a. 根据borker地址获取netty的channel
  • b. 获取信号量,防止太多请求
  • c. 生成ResponseFuture放到Map里,Map的key是请求的id,这个id很关键,每次请求都会生成,响应回来会带上这个id,然后通过这个id找到对应的ResponseFuture,然后调用回调函数做对应的业务逻辑处理
  • d. 把请求通过netty发送到broker,netty发送是异步的,所以线程立马就可以返回了。
  • 6.继续步骤一
  • 7.Netty接收响应的线程接收到响应结果,解析响应中包含的请求id,从Map(5-c步骤保存的)中找到对应的ResponseFuture执行回调函数
  • a. 根据返回结果,把拉取到消息放到DefaultMQPushConsumerImpl的消费线程池进行消费(消费主要步骤1.把消息传给消费者注册的监听函数 2.根据消费情况进行队列的偏移量更新)
  • b. 把设置下一次偏移量,生成PullRequest任务延迟或者立马放到PullMessageService线程的队列里,这是第一种PullRequest对象的来源。

第二种PullRequest对象的来源:

队列重平衡时,将新分配的队列生成PullRequest对象,然后放入队列,所以PullRequest对象和队列是一一对应的。

public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {     
             this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);                 
             log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);            
         }    
}

Push模式下的线程模型

分成3种不同职责的线程:

PullMessageService线程

Push模式流程下的1-6步骤都是由PullMessageService线程,单个线程进行执行,也就是说所有的PullRequest任务都是由此线程来执行;

为什么要这么做呢?

  • 1-6步骤都不涉及耗时操作,可以处理很快,单线程足够了,多线程会导致上下文切换,共享资源的锁操作。
  • 如果多线程向服务端broker拉取消息,就会存在并发问题,对偏移量的处理就会变得复杂和影响性能。

Netty响应线程

主要对响应结果的解析,执行回调函数

  • 把消息放到一个消费线程池单独异步处理
  • 生成下一次拉取PullRequest任务放到队列里

拉取线程和消费线程独立,这样可以做到拉取和消费互不影响。

消费消息线程

拉取到的消息进行调用消费者注册的监听函数进行消费,逻辑业务处理。

消费完成的消息的偏移量的更新。

小结

  • 1.push模式下的底层模式,主要是执行PullRequest任务进行消息的拉取,要弄明白PullRequest的来源。来源有两个,一个是消费者组队列重平衡时,新分配的队列会分配生成PullRequest任务;二是接收到响应后会生成下一次的PullRequest任务。
  • 2.push模式下的线程模型,拉取和消费是线程隔离的,这样拉取和消费互不影响,实现性能最大化。
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
5月前
|
Java API 网络架构
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码
177 0
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
|
4月前
|
消息中间件 Apache 开发工具
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
38 0
|
5月前
|
消息中间件 Apache RocketMQ
电子好书发您分享《Apache RocketMQ 源码解析》
电子好书发您分享《Apache RocketMQ 源码解析》
34 1
|
4月前
|
消息中间件 SQL Java
RabbitMQ之消费者可靠性
RabbitMQ之消费者可靠性
|
4月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
32 0
|
18天前
|
消息中间件 RocketMQ
RocketMq消费者/生产者配置
RocketMq消费者/生产者配置
|
21天前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
37 0
RabbitMQ入门指南(九):消费者可靠性
|
2月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
14 1
|
2月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
12 1