【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析

简介: 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析

DefaultMQPushConsumer

使用系统控制读取操作的DefaultMQPushConsumer可以自动调用传入的处理方法来处理收到的消息。通过设置各种参数和传入处理消息的函数,使用DefaultMQPushConsumer的主要目的是方便配置和处理消息。在收到消息后,系统会自动保存Offset,并且如果加入了新的DefaultMQPushConsumer,系统会自动做负载均衡。

RocketMQ的消息模式

RocketMQ提供Clustering和Broadcasting两种消息模式。

  • Clustering模式下,ConsumerGroup内每个Consumer只消费所订阅消息的一部分,而所有Consumer消费内容合在一起构成Topic内容,实现负载均衡。
  • Broadcasting模式下,同一ConsumerGroup内每个Consumer都接收所订阅Topic的全部消息,每个消息分发给多个Consumer消费。

推模式的的案例代码

使用 DefaultMQPushConsumer 可以自动控制读取操作,收到消息后会自动调用传入的处理方法进行处理,并且自动保存 Offset。主要需要设置好各种参数以及传入处理消息的函数。当加入新的 DefaultMQPushConsumer 后,系统会自动进行负载均衡。

java

复制代码

public class DefaultMQPushConsumerSample {
    public static void main(String[] args) throws MQClientException {
        // Consumer 的 GroupName 用于把多个 Consumer 组织到一起,提高并发处理能力,GroupName 需要和消息模式( MessageModel) 配合使用。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // NameServer 的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,比如“ip1:port;ip2:port;ip3:port”。
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /* Specify where to start in case the specified Consumer group is a brand new one. */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);
        /* Subscribe one more more Topics to consume. */
        // Topic 名称用来标识消息类型,需要提前创建。
        // 如果不需要消费某个 Topic 下的所有消息,可以通过指定消息的 Tag 进行消息过滤,
        // 比如:Consumer.subscribe("TopicTest","tag1||tag2||tag3"),表示这个 Consumer 要消费“TopicTest”下带有tag1或tag2或tag3的消息
        // ( Tag 是在发送消息时设置的标签)。在填写 Tag 参数的位置, 用 null 或者“*” 表示要消费这个 Topic 的所有消息。
        consumer.subscribe("TopicTest", "*");
        /* * Register callback to execute on arrival of Messages fetched from brokers. */
        consumer.registerMessageListener(
                (MessageListenerConcurrently) (msgs, context) -> {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    System.out.println();
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
        /* * Launch the Consumer instance.*/
        consumer.start();
    }
}

DefaultMQPushConsumer 的处理流程

DefaultMQPushConsumer 的主要功能是由 DefaultMQPushConsumerImpl 类实现的。消息的处理逻辑在 pullMessage 函数中的 PullCallBack 中完成。PullCallBack 函数里有一个 switch 语句,根据从 Broker 返回的消息类型进行相应的处理。

com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

java

复制代码

PullCallback pullCallback = new PullCallback() {
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
            switch(pullResult.getPullStatus()) {
            case FOUND:
              // 省略代码
                break;
            case NO_NEW_MSG:
              // 省略代码
                break;
            case NO_MATCHED_MSG:
              // 省略代码
                break;
            case OFFSET_ILLEGAL:
                // 省略代码
            }
        }
    }
    public void onException(Throwable e) {
        if (!pullRequest.getMessageQueue().getTopic().startsWith("%RETRY%")) {
            DefaultMQPushConsumerImpl.this.log.warn("execute the pull request exception", e);
        }
        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 3000L);
    }
};

长轮询

DefaultMQPushConsumer 中通过使用“PullRequest”以“长轮询”(long polling)方式实现了 Push 效果。长轮询方式既保留了 Pull 的好处,又具有 Push 方式的实时性。在长轮询中,客户端的请求权力仍然掌握在 Consumer 手中,即使 Broker 有大量消息积压,也不会主动推送给 Consumer。长轮询方式的局限性是需要占用资源来维护客户端的请求,因此适合在消息队列等客户端连接数可控的场景中使用。

Push 方式是 Server 端接收到消息后主动将消息推送给 Client 端,这种方式实时性强。然而对于提供队列服务的 Server 来说,用 Push 方式主动推送会增加 Server 端的工作量,从而影响 Server 的性能;而且 Client 的处理能力不同,Client 的状态也不受 Server 控制,如果 Client 不能及时处理 Server 推送过来的消息,就会出现潜在问题。

Pull 方式是 Client 端循环地从 Server 端拉取消息,主动权在 Client 手中,自己拉取到一定数量的消息后,再进行处理。Pull 方式的问题在于循环拉取消息的间隔不好设定,间隔太短会导致一种“忙等”状态,浪费资源;而每个 Pull 的时间间隔太长会导致 Server 端有更多的消息到来而没有被及时处理。

长轮询方式通过 Client 端和 Server 端的合作,既保留了 Pull 的优点,又在保证实时性方面达到了目的。

com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl

java

复制代码

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
    brokerAddr = this.computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().
  pullMessage(brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);

requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis) 的作用是设置 Broker 的最长阻塞时间,其默认设置是 15 秒,但需注意仅当 Broker 没有新消息时才会被阻塞,如果有新消息则会立即返回。

“长轮询” 服务端代码

从 Broker 的源码中可以看出,服务端在接收到新的消息请求后,并不会急于返回,而是通过一个循环状态不断地查看队列中是否有新消息。每次查看状态时,会暂停一段时间(默认为 5 秒),然后再次进行检查。在默认情况下,当 Broker 没有新的消息时,第三次检查时,若等待时间超过 Request 中设定的 Broker-SuspendMaxTimeMillis,会返回一个空结果。

java

复制代码

if (this.brokerController.getBrokerConfig().isLongPollingEnable()){
    this.waitForRunning( 5 * 1000); 
} else {
  this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); 
}
long beginLockTimestamp = this.systemClock.now(); 
this.checkHoldRequest(); 
long costTime = this.systemClock.now() - beginLockTimestamp; 
if (costTime > 5 * 1000) { 
    Log. info("[ NOTIFYME] check hold request cost {} ms.", costTime); 
}

在等待的过程中,一旦 Broker 收到新的消息,就会立即调用 notifyMessageArriving 函数并返回请求结果。"长轮询"的核心是,Broker 会暂时地保留客户端请求,在这段时间内如果有新的消息到达,则可以不用创建新的连接,而是利用现有的连接立刻返回消息给 Consumer。

messageQueue和processQueue

PullRequest中定义了messageQueue和processQueue。

processQueue

processQueue是一个快照类,在PushConsumer运行时,每个MessageQueue都会有一个对应的ProcessQueue对象,用于保存该MessageQueue消息处理状态的快照。

ProcessQueue对象主要包含一个TreeMap和一个读写锁。TreeMap以Message Queue的Offset作为Key,以消息内容的引用为Value,保存了所有从MessageQueue获取到但还未被处理的消息;读写锁控制着多个线程对TreeMap对象的并发访问。

在pull逻辑中,PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,如果有任何一个值超过了设置的大小,则会隔一段时间再拉取消息,以达到流量控制的目的。此外,ProcessQueue还可以辅助实现顺序消费的逻辑。相应代码如下:

com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

PushConsumer会判断获取但还未处理的消息个数

java

复制代码

long size = processQueue.getMsgCount().get();
if (size > (long)this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, 50L);
    if (this.flowControlTimes1++ % 1000L == 0L) {
        this.log.warn("the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, this.flowControlTimes1});
    }
}
消息总大小、Offset的跨度

java

复制代码

if (!this.consumeOrderly) {
        if (processQueue.getMaxSpan() > (long)this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
            this.executePullRequestLater(pullRequest, 50L);
            if (this.flowControlTimes2++ % 1000L == 0L) {
                this.log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, this.flowControlTimes2});
            }
            return;
        }
    }
ProcessQueue对象主要包含一个TreeMap和一个读写锁

java

复制代码

else {
        if (!processQueue.isLocked()) {
            this.executePullRequestLater(pullRequest, 3000L);
            this.log.info("pull message later because not locked in broker, {}", pullRequest);
            return;
        }
        if (!pullRequest.isLockedFirst()) {
            long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
            boolean brokerBusy = offset < pullRequest.getNextOffset();
            this.log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", new Object[]{pullRequest, offset, brokerBusy});
            if (brokerBusy) {
                this.log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset);
            }
            pullRequest.setLockedFirst(true);
            pullRequest.setNextOffset(offset);
        }
    }

内容总结

DefaultMQPushConsumer使用长轮训技术,让consumer不断向broker端请求消息,如果没有可消费的消息,则阻塞一段时间,等待broker推送消息给consumer。具体实现过程为,先每隔一段时间从broker获取消息进行消费,如果没有需要消费的消息,则调用poll函数向远程broker获取最新的消息,最长等待时间为Consumer的maxTimeConsumeConitnusly属性,如果超时时间到达还没有新的消息,则返回null。这种方式实现实时更新消息且对于broker的开销较小,但会导致consumer不断发起请求,增加网络负载和调用次数。因此需要合理设置长轮询的超时时间。

相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
482 7
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
2月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
3月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
|
3月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
3月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
3月前
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
|
3月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
3月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
3月前
|
消息中间件 存储 Kafka
RocketMQ实战—4.消息零丢失的方案
本文分析了用户支付完成后未收到红包的问题,深入探讨了RocketMQ事务消息机制的实现原理及其在确保消息零丢失中的作用。首先,通过全链路分析发现消息可能在推送、存储或消费环节丢失。接着,介绍了RocketMQ事务消息机制如何通过half消息、本地事务执行及回调确认来保证消息发送成功,并详细解析了其底层原理,如half消息对消费者不可见、rollback与commit操作等。同时,对比了同步重试方案,指出其在复杂场景下的局限性。
RocketMQ实战—4.消息零丢失的方案
|
3月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
332 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构