RocketMQ消费者消费消息核心原理(含长轮询机制)

简介: 这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。

前言

在消息系统中,消费者消费消息有拉和推消息两种实现方式,拉消息消费者主动向消息服务器发送拉消息请求,消息服务器将消息返回给消费者,而推消息消息服务器主动向消费者推送消息的形式,这两种消息消费实现各有各的优势和劣势。

在RocketMQ中,有两种消费者客户端,一种是Push模式消费者, 一种是Pull模式消费者,这两个其实都是表现,在RocketMQ底层实现中采用长轮询的机制来实现消息拉取消费功能。

image.png

长轮询模式兼顾了拉和推消息的优势。本文将从源码层面分析RocketMQ的消费者角色的工作原理。从整体看下org.apache.rocketmq.client.consumer.DefaultMQPushConsumer消费者实现类在消费客户端的启动流程,RocketMQ的消费者客户端即支持拉又支持推,但是底层都是基于长轮询的实现。

既然内核底层是长轮训模式,为什么RocketMQ会有Push,Pull客户端呢?

image.png

Push只是在客户端,RocketMQ客户端组件将消息拉取到本地后,自动回调我们业务方的业务逻辑执行消费。

Pull模式,其实也是Rocketmq会将消息拉取到本地,我们的业务代码主动从本地读消息进行消费。

这两种客户端对我们开发人员更加友好,可以自己选择最合适的消费方式。

Push消费者启动流程

看源码前,我们可以思考下,消费者能够从broker拉取到消息主要需要做哪些事情?

image.png

1、首先肯定需要和Broker创建连接,RocketMQ底层使用Netty组件通信,肯定会使用Netty和Broker建立连接。

2、和Broker建立连接之后,需要告诉Broker拉取Topic下哪个队列的消息,这里就需要用到消费者负载均衡机制了。

3、向Broker拉取消息,还有一个关键的属性,那就是消息拉取偏移量offset, 消费者需要知道下次从哪个偏移量开始拉取消息。在RocketMQ中广播消息模式是将偏移量存储在消费者本地, 集群消息模式是将消息偏移量存储在Broker。因此消费者启动时还需要加载好各个topic的偏移量

分析了上面主线逻辑之后,我们可以看看源码是如何做的。

消费者启动流程在源码 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

加载偏移量存储服务。

image.png

偏移量根据消费模式有两个策略,如果是广播��息偏移量存储在本地,如果是集群消息则存储在broker

消费者调用业务逻辑执行消费逻辑

image.png

消费者实例启动,里面的主逻辑非常清晰。

org.apache.rocketmq.client.impl.factory.MQClientInstance#start

public void start() throws MQClientException {
   
   

                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
   
   
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // 启动netty客户端,和Broker建立通信连接
                    this.mQClientAPIImpl.start();
                    // 定时一些任务
                    this.startScheduledTask();
                    // 启动拉消息的服务,通过线程异步拉取
                    this.pullMessageService.start();
                    // 启动负载均衡服务
                    this.rebalanceService.start();
                    // 启动内置消息发送服务 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    //更新消费者启动状态
                    this.serviceState = ServiceState.RUNNING;

}

通过上面的步骤消费者实例就启动完成了。

我们发现,消费者客户端主要包含了几个核心组件服务

1、偏移量加载服务 => 记录消息消费进度

2、消息拉取服务 => 建立连接,拉取消息

3、负载均衡服务 => 分配消费队列

4、消息消费服务 => 回调消费消息业务逻辑

和我们一开始的想的没有很大差别。 这里体现了单一职责设计原则的设计思想,不同的类专门负责各自不同的职责。

image.png

看到这里,我们就把消费者客户端启动过程的主线捋清楚了。

客户端拉取消息请求发起

我们接着看最主要的流程,拉取消息服务PullMessageService, 看看他做了哪些事情。

点进去看发现他是一个线程任务。自然我们就会想到去看他的run方法实现,因此我们可以猜测,他是通过异步拉取消息的。

image.png

@Override
    public void run() {
   
   
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
   
   
            try {
   
   
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
   
   
            } catch (Exception e) {
   
   
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

从阻塞队列拿拉数据请求

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

每个拉取请求里包含了对应的消费者组,消息队列,拉取偏移量,从这些信息就可以知道要拉取哪个topic下面哪个队列哪个偏移量开始的消息了。

public class PullRequest {
   
   
    private String consumerGroup;
    private MessageQueue messageQueue;
    private ProcessQueue processQueue;
    private long nextOffset;
    private boolean lockedFirst = false;

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl中会发起拉消息请求,将消费者组,topic,队列号,偏移量传给broker。

image.png

我们可以注意到,上面还有两个重要的参数brokerSuspendMaxTimeMillis,timeoutMillis

这两个个暂停时间参数就和大名鼎鼎的长轮询机制有着密切的关系。

brokerSuspendMaxTimeMillis参数表示broker在没有消息拉取的时候会暂停等待多久返回给消费者客户端,

timeoutMillis则是表示消费者客户端最长等待多久broker返回消息的。

在消费者客户端,我们可以看到timeoutMillis的使用,控制了请求超时时间。

image.png

看看默认的超时时间配置

brokerSuspendMaxTimeMillis 默认15s, 也就是broker会在没有消息拉取停15s,再返回客户端

timeoutMillis 默认30s,也就是消费者客户端会等待broker30s。

image.png

下文我们还可以看到broker端对拉取消息的处理时对参数brokerSuspendMaxTimeMillis的应用。

消费者实例已经发起拉取消息请求了,broker把消息返回后,消费者做什么呢?

消费者需要对消息内容解码,然后将消息分发给我们写的业务代码执行。

image.png

这里就会把拉取到消息,通过任务的形式投递给消息消费处理线程池,线程池把消息转交给业务类处理。

image.png

我们经常会写个Listerner来监听RocketMQ消息,这种消费消息的方式,RocketMQ就是在Push模式中实现的。

消费完消息,还需要更新最新的消息消费偏移量,在这里RocketMQ会同步更新内存中的偏移量,不会直接远程调用Broker。

image.png

image.png

看到这几个点,RocketMQ中,拉取消息支持异步消费消息异步,还有偏移量更新到Brocker是异步,都是异步的,这种就像io线程和业务线程做了隔离的设计思想,如果要说RocketMQ性能高的原因,这个点也可以说。

看完上面的主线逻辑,我们其实已经能够说明白RocketMQ消费者端的主流程了。

image.png

下面看看Broker端处理拉取消息请求的流程

Broker端处理拉取消息

Broker端有个专门处理类处理拉取消息请求,PullMessageProcessor

image.png

我们也可以想下,RocketMQ拉取消息时需要做哪些事情?

1、因为消息是存储在CommitLog中,CommitLog对应的是一个一个的文件,怎么定位到具体的文件呢?也就是怎么定位到具体的CommitLog?

2、我们订阅消息消息,可能会使用不同的tag,tag会传到Broker端,因此Broker端需要做tag过滤

image.png

RocketMQ如何定位CommitLog?

在Broker端,会先根据topic和队列号定位到ConsumeQueue

image.png

ConsumeQueue是Commitlog文件的索引,里面缓存了所有的Commitlog文件信息和物理偏移量信息

image.png

通过ConsumeQueue就可以知道的CommitLog的物理偏移量了。

image.png

消息查询前会根据tag的hash值进行过滤,这里是不是经常问消息tag是在哪过滤的? 消息的tag的会以hash值形式存储在ConsumeQueue,由于ConsumeQueue是索引,所以Broker端查询消息前会进行过滤。

image.png

image.png

执行完消息过滤,就可以真正的查询消息啦。查询消息就是返回了ByteBuffer

image.png

到这里,消息查询出来了,但是长轮询处理在哪呢?在拉取结果处理的地方。

image.png

Broker将当前请求加入等待队列,response置为null, 这样不会立即把响应写到消费者客户端。

image.png

长轮询机制的核心实现马上就要揭开神秘面纱了。 Broker通过将当前请求的快照信息存储下来。

image.png

每5秒检查一次每个等待的请求队列里是否有新消息,有新消息就会返回消息给消费者客户端,或者超时了也不会再等待。 image.png

看到这里,Broker处理消息拉取逻辑也清晰了。

image.png

总结

本文分析了RocketMQ消费消息逻辑的实现原理,我们平时写消费者逻辑应该是比较多的,消费消息本质上是我们应用程序和Broker服务进行rpc通信,交换业务请求和消息的过程。

我们发现RocketMQ在代码设计上有几个比较突出的优点,值得我们学习和借鉴

1、代码的单一职责化很明显,各个类都各司其职,专门处理不同领域的逻辑。

2、线程池的使用,将消息拉取,消息偏移量同步,消息长轮询机制等都通过线程池,异步处理,同时也把消息拉取和消息消费进行了解耦,确实非常妙。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
1月前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
5月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
4月前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
4月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
5月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
5月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决