顺利拿下Offer 通过分析rocketMq消费者拉取消息源码

简介: 顺利拿下Offer 通过分析rocketMq消费者拉取消息源码

Rocketmq服务器和消费者之间的消息传输有push(推模式)和pull(拉模式)两种。

Push模式:底层实现也是使用拉取模式,由Rocketmq客户端底层封装实现自动消息拉取,拉取到的消息会回调消费者注册的监听器函数,把消息传给消费者,让消费者进行消费。根据拉取的结果,设置队列下一次拉取的偏移量进行下一次拉取任务。

Pull消费模式:更加灵活,可以让消费者自己决定拉取消息时机和更新队列的偏移量,当然使用上也变得复杂一些。

本文主要讲解push模式的底层实现,pull模式在4.7版本之前,调用pull方法是直接请求服务端拉取消息,4.7版本的pull模式后面文章会单独再讲解。

Push模式的主要流程

主要流程图:(可以点击图片放大)

每个步骤的讲解:

  • 1.PullMessageService线程从阻塞队列中获取PullRequest任务(PullRequest对象什么时候放进阻塞队列的呢?后面讲解。)
  • 2.将PullRequest对象交给DefaultMQPushConsumerImpl类处

题外话:如果有发生了拉取流控会打印日志,我们排查消费者端是否出现了流控可以查看rocketmqClient.log日志进行排查,关键字”so do flow control”

  • b. 如果是顺序消费还需要判断是否获取到该队列的锁,该锁要向broker申请,因为顺序消息要只有在获取到队列的锁的情况下,才能进行消息的拉取,保证了消息的顺序。
  • c. 条件不满足就把PullRequest对象,延迟一定时间,重新放到PullMessageService的队列里。
  • a. 流控处理(拉取到本地的消息条数和大小最小和最大偏移量间隔),可以防止太多消息拉取到本地,导致内存溢出。
  • 3.PullAPIWrapper对PullRequest对象进行包装,主要查找broker地址,封装请求对象。
  • 4.MQClientAPIImpl选择异步还是同步方式,push是异步方式。
  • 5.NettyRemotingClient调用invokeAsynImpl方法。
  • a. 根据borker地址获取netty的channel
  • b. 获取信号量,防止太多请求
  • c. 生成ResponseFuture放到Map里,Map的key是请求的id,这个id很关键,每次请求都会生成,响应回来会带上这个id,然后通过这个id找到对应的ResponseFuture,然后调用回调函数做对应的业务逻辑处理
  • d. 把请求通过netty发送到broker,netty发送是异步的,所以线程立马就可以返回了。
  • 6.继续步骤一
  • 7.Netty接收响应的线程接收到响应结果,解析响应中包含的请求id,从Map(5-c步骤保存的)中找到对应的ResponseFuture执行回调函数
  • a. 根据返回结果,把拉取到消息放到DefaultMQPushConsumerImpl的消费线程池进行消费(消费主要步骤1.把消息传给消费者注册的监听函数 2.根据消费情况进行队列的偏移量更新)
  • b. 把设置下一次偏移量,生成PullRequest任务延迟或者立马放到PullMessageService线程的队列里,这是第一种PullRequest对象的来源。

第二种PullRequest对象的来源:

队列重平衡时,将新分配的队列生成PullRequest对象,然后放入队列,所以PullRequest对象和队列是一一对应的。

public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {     
             this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);                 
             log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);            
         }    
}

Push模式下的线程模型

分成3种不同职责的线程:

PullMessageService线程

Push模式流程下的1-6步骤都是由PullMessageService线程,单个线程进行执行,也就是说所有的PullRequest任务都是由此线程来执行;

为什么要这么做呢?

  • 1-6步骤都不涉及耗时操作,可以处理很快,单线程足够了,多线程会导致上下文切换,共享资源的锁操作。
  • 如果多线程向服务端broker拉取消息,就会存在并发问题,对偏移量的处理就会变得复杂和影响性能。

Netty响应线程

主要对响应结果的解析,执行回调函数

  • 把消息放到一个消费线程池单独异步处理
  • 生成下一次拉取PullRequest任务放到队列里

拉取线程和消费线程独立,这样可以做到拉取和消费互不影响。

消费消息线程

拉取到的消息进行调用消费者注册的监听函数进行消费,逻辑业务处理。

消费完成的消息的偏移量的更新。

小结

  • 1.push模式下的底层模式,主要是执行PullRequest任务进行消息的拉取,要弄明白PullRequest的来源。来源有两个,一个是消费者组队列重平衡时,新分配的队列会分配生成PullRequest任务;二是接收到响应后会生成下一次的PullRequest任务。
  • 2.push模式下的线程模型,拉取和消费是线程隔离的,这样拉取和消费互不影响,实现性能最大化。
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
8月前
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
382 11
RocketMQ原理—3.源码设计简单分析下
|
8月前
|
消息中间件 Java 数据管理
RocketMQ原理—2.源码设计简单分析上
本文介绍了NameServer的启动脚本、启动时会解析哪些配置、如何初始化Netty网络服务器、如何启动Netty网络服务器,介绍了Broker启动时是如何初始化配置的、BrokerController的创建以及包含的组件、BrokerController的初始化、启动、Broker如何把自己注册到NameServer上、BrokerOuterAPI是如何发送注册请求的,介绍了NameServer如何处理Broker的注册请求、Broker如何发送定时心跳
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
1084 2
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
429 12
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
239 2
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
493 1
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
241 0