顺利拿下Offer 通过分析rocketMq消费者拉取消息源码

简介: 顺利拿下Offer 通过分析rocketMq消费者拉取消息源码

Rocketmq服务器和消费者之间的消息传输有push(推模式)和pull(拉模式)两种。

Push模式:底层实现也是使用拉取模式,由Rocketmq客户端底层封装实现自动消息拉取,拉取到的消息会回调消费者注册的监听器函数,把消息传给消费者,让消费者进行消费。根据拉取的结果,设置队列下一次拉取的偏移量进行下一次拉取任务。

Pull消费模式:更加灵活,可以让消费者自己决定拉取消息时机和更新队列的偏移量,当然使用上也变得复杂一些。

本文主要讲解push模式的底层实现,pull模式在4.7版本之前,调用pull方法是直接请求服务端拉取消息,4.7版本的pull模式后面文章会单独再讲解。

Push模式的主要流程

主要流程图:(可以点击图片放大)

每个步骤的讲解:

  • 1.PullMessageService线程从阻塞队列中获取PullRequest任务(PullRequest对象什么时候放进阻塞队列的呢?后面讲解。)
  • 2.将PullRequest对象交给DefaultMQPushConsumerImpl类处

题外话:如果有发生了拉取流控会打印日志,我们排查消费者端是否出现了流控可以查看rocketmqClient.log日志进行排查,关键字”so do flow control”

  • b. 如果是顺序消费还需要判断是否获取到该队列的锁,该锁要向broker申请,因为顺序消息要只有在获取到队列的锁的情况下,才能进行消息的拉取,保证了消息的顺序。
  • c. 条件不满足就把PullRequest对象,延迟一定时间,重新放到PullMessageService的队列里。
  • a. 流控处理(拉取到本地的消息条数和大小最小和最大偏移量间隔),可以防止太多消息拉取到本地,导致内存溢出。
  • 3.PullAPIWrapper对PullRequest对象进行包装,主要查找broker地址,封装请求对象。
  • 4.MQClientAPIImpl选择异步还是同步方式,push是异步方式。
  • 5.NettyRemotingClient调用invokeAsynImpl方法。
  • a. 根据borker地址获取netty的channel
  • b. 获取信号量,防止太多请求
  • c. 生成ResponseFuture放到Map里,Map的key是请求的id,这个id很关键,每次请求都会生成,响应回来会带上这个id,然后通过这个id找到对应的ResponseFuture,然后调用回调函数做对应的业务逻辑处理
  • d. 把请求通过netty发送到broker,netty发送是异步的,所以线程立马就可以返回了。
  • 6.继续步骤一
  • 7.Netty接收响应的线程接收到响应结果,解析响应中包含的请求id,从Map(5-c步骤保存的)中找到对应的ResponseFuture执行回调函数
  • a. 根据返回结果,把拉取到消息放到DefaultMQPushConsumerImpl的消费线程池进行消费(消费主要步骤1.把消息传给消费者注册的监听函数 2.根据消费情况进行队列的偏移量更新)
  • b. 把设置下一次偏移量,生成PullRequest任务延迟或者立马放到PullMessageService线程的队列里,这是第一种PullRequest对象的来源。

第二种PullRequest对象的来源:

队列重平衡时,将新分配的队列生成PullRequest对象,然后放入队列,所以PullRequest对象和队列是一一对应的。

public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {     
             this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);                 
             log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);            
         }    
}

Push模式下的线程模型

分成3种不同职责的线程:

PullMessageService线程

Push模式流程下的1-6步骤都是由PullMessageService线程,单个线程进行执行,也就是说所有的PullRequest任务都是由此线程来执行;

为什么要这么做呢?

  • 1-6步骤都不涉及耗时操作,可以处理很快,单线程足够了,多线程会导致上下文切换,共享资源的锁操作。
  • 如果多线程向服务端broker拉取消息,就会存在并发问题,对偏移量的处理就会变得复杂和影响性能。

Netty响应线程

主要对响应结果的解析,执行回调函数

  • 把消息放到一个消费线程池单独异步处理
  • 生成下一次拉取PullRequest任务放到队列里

拉取线程和消费线程独立,这样可以做到拉取和消费互不影响。

消费消息线程

拉取到的消息进行调用消费者注册的监听函数进行消费,逻辑业务处理。

消费完成的消息的偏移量的更新。

小结

  • 1.push模式下的底层模式,主要是执行PullRequest任务进行消息的拉取,要弄明白PullRequest的来源。来源有两个,一个是消费者组队列重平衡时,新分配的队列会分配生成PullRequest任务;二是接收到响应后会生成下一次的PullRequest任务。
  • 2.push模式下的线程模型,拉取和消费是线程隔离的,这样拉取和消费互不影响,实现性能最大化。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
40 0
|
27天前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
27天前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
28天前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
27 0
|
28天前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
39 0
|
28天前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
40 0
|
28天前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
30 0
|
28天前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
26 0
|
2月前
|
消息中间件 API RocketMQ
消息队列 MQ使用问题之消息在没有消费者的情况下丢失,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。