我服了,RocketMQ消费者负载均衡内核是这样设计的

简介: 文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。

前言

上文分析了RocketMQ消费者消费消息核心原理,其中在消费者端,有一个重量级的组件:负载均衡组件, 他负责相对均匀的给消费者分配需要拉取的队列信息。

我们此时可能会有以下问题:

1、一个Topic下可能会有很多逻辑队列,而消费者又有多个,这样不同的消费者到底消费哪个队列呢?

2、如果消费者或者队列扩缩容,Topic下的队列又该分配给谁呢?

这些时候负载均衡策略就有他的用武之地了。RocketMQ在处理上面的问题是统一处理的,也就是逻辑是一致的,它都是通过RebalanceService这个类来完成负载均衡的工作,看完本文我们就可以明白RocketMQ消费者负载均衡的核心逻辑。

image.png

本文将会从源码层面来剖析上面两个的问题。

RocketMQ负载均衡服务启动时机

image.png

从类定义可知,RocketMQ的负载均衡服务RebalanceService,他是一个线程任务。消费者客户端启动时候,会调用start()函数启动RebalanceService线程,从而触发其run方法运行。

image.png

RocketMQ负载均衡服务执行流程

负载均衡服务执行逻辑在doRebalance函数,里面会对每个消费者组执行负载均衡操作。 也就是一个负载均衡服务是对一个消费者组负责的,那么我们可以想到对不同的消费者组使用不同负载均衡策略consumerTable这个map对象里存储了消费者组对应的的消费者实例


ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();

public void doRebalance() {
   
   
    //每个消费者组都有负载均衡
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
   
   
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
   
   
            try {
   
   
                impl.doRebalance();
            } catch (Throwable e) {
   
   
                log.error("doRebalance exception", e);
            }
        }
    }
}

image.png

由于每个消费者组可能会消费很多topic,每个topic都有自己的不同队列,所以最终是按topic的维度进行负载均衡。

public void doRebalance(final boolean isOrder) {
   
   
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
   
   
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
   
   
            final String topic = entry.getKey();
            try {
   
   
                //按topic维度执行负载均衡
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
   
   
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
   
   
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    this.truncateMessageQueueNotMyTopic();
}

最终负载均衡逻辑处理的实现在org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic,其中分为广播消息和集群消息模型两种情况处理。由于广播消息是每个消费者实例都需要消费到,因此逻辑会简单点(不需要分配哪个队列给哪个消费者),我们主要关注集群消息模式。

image.png

由于广播消息模型是topic下的每条消息都需要被每个消费者消费到,因此少了给不同的消费者分配消费队列这个流程。

集群消息模型是topic下的每条消息,一个消费者组下只能有一个消费者实例消费到,因此有给不同消费者实例分配消费队列的流程。

private void rebalanceByTopic(final String topic, final boolean isOrder) {
   
   
        switch (messageModel) {
   
   
            //广播模型
            case BROADCASTING: {
   
   
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
   
   
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
   
   
                        this.messageQueueChanged(topic, mqSet, mqSet);

                    }
                }
                break;
            }
            //集群模型
            case CLUSTERING: {
   
   
                //查topic下的消息队列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //查询topic下的所有消费者
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
   
   
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
   
   
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (mqSet != null && cidAll != null) {
   
   
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    //负载均衡组件
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    //负载均衡结果
                    List<MessageQueue> allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
   
   
                        allocateResultSet.addAll(allocateResult);
                    }
                    //负载均衡执行结束后,判断是否有新的消费策略变化,更新拉取策略
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
   
   
                        //发送更新通知
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

代码逻辑可以看出负载均衡核心功能的主流程,主要做了4件事情:

image.png

其中比较重要的是具体的负载均衡策略,他关系着哪些队列是当前消费者需要消费的。下面我们看下负载均衡策略的具体实现。

负载均衡策略原理

看负载均衡策略的具体实现前,我们看下RocketMQ中的负载均衡策略顶层接口


/**
 * Strategy Algorithm for message allocating between consumers
 */
public interface AllocateMessageQueueStrategy {
   
   

    /**
     * Allocating by consumer id
     * 给消费者id分配消费队列
     */
    List<MessageQueue> allocate(
    final String consumerGroup, //消费者组
    final String currentCID, //当前消费者id
    final List<MessageQueue> mqAll, //所有的队列
    final List<String> cidAll //所有的消费者
);

}

他默认共有6种负载均衡策略实现。

image.png

Push消息客户端使用的队列平均分配策略。我们本文主要分析队列平均分配负载均衡策略的实现。

image.png

为了说明这种分配算法的分配规则,现在对 16 个队列,进行编号,用 q0~q15 表示, 消费者用 c0~c2 表示。 AllocateMessageQueueAveragely分配算法的队列负载机制如下:

c0:q0 q1 q2 q3 q4 q5

c1: q6 q7 q8 q9 q10

c2: q11 q12 q13 q14 q15

其算法的特点是用总数除以消费者个数,余数按消费者顺序分配给消费者,故 c0 会多 分配一个队列,而且队列分配是连续的。

        List<String> result = new ArrayList<>();
        // 消息队列%消费者 是否能够正好整数倍分配完整
        int mod = mqAll.size() % cidAll.size();
        //平均每个消费者消费的队列大小
        int averageSize = 0;

        //计算当前消费者需要消费的队列大小
        //如果需要消费的队列数 小于 消费者数量 则每个(编号小于队列编号的)消费者需要消费1个队列
        if (mqAll.size() <= cidAll.size()) {
   
   
            averageSize = 1;
        } else {
   
   
            //如果队列不能被正好整数被分配完,并且当前消费者需要比整数个消费多一个
            if (mod > 0 && index < mod) {
   
   
                averageSize = mqAll.size() / cidAll.size() + 1;
            } else {
   
   
                ////如果队列不能被正好整数被分配完,并且当前消费者不需要比整数个消费多一个(当前消费者消费队列数不加1),刚好消费整数个
                averageSize = mqAll.size() / cidAll.size();
            }
        }

        //计算消费者需要开始消费的队列下标。
        int startIndex;
        //消费者不能正好整数倍消费完成,并且需要多消费一个队列的情况下 比如是第3个消费者 平均大小是1 则开始位置是2*1=2
        if (mod > 0 && index < mod) {
   
   
          //计算当前消费者的 需要消费队列大小
            startIndex = index * averageSize;
        } else {
   
   
            // 总共3个队列 2个消费者 mod = 1    则第2个消费者的开始位置为 1*1 + 1 = 2
            // 总共5个队列 3个消费者 mod = 2    则第2个消费者的开始位置为 2*1 + 1 = 3
            startIndex = index * averageSize + mod;
        }
        //  startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        //消费队列的范围  总共3个队列 2个消费者 mode = 1    则第2个消费者的开始位置为 1*1 + 1 = 2
        //如果消费者需要消费的数量不会加1 则消费范围为averageSize, 但是也可能存在一个消费者
        System.out.println(mqAll.size() - startIndex);
        System.out.println(averageSize);
        //范围比较 存在一种情况 消费者数量比队列数量多的情况 则存在部分消费者消费不到队列情况,
        // 则会使得 averageSize=1 但是 (mqAll.size() - startIndex) =0的情况 这样就范围就是0了。
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
   
   
            //按范围获取队列,保证连续性质
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }

上面是平均分配法具体实现,它是按范围分配,一个范围内的消费队列分配给一个消费者。队列分配好之后,会更新到本地注册表,这时候就是当前消费者最新需要消费的队列。

image.png

更新本地注册表后,主要是移除老的拉取消息任务,新增新的拉取消息任务

上面的逻辑可以回答上文第1,2个问题了,当前消费者需要拉取消息消费的队列以及新的消费者加入或者移除都会走上面的流程进行负载均衡重加载,达到更新当前消费者实例需要拉取最新的队列目的。

消费者负载均衡能力有哪些扩展应用?

有个疑问?如果我们需要对消息有灰度消费能力的需求,我们要怎么对负载均衡策略进行定制化改造来支持这个能力呢?

总结

看完负载均衡策略的实现,我们发现他完全异步执行的,负载均衡和拉取消息完全解耦,这样做既兼顾了性能,也可以达到最终一致性的目的。

在RocketMQ中比较重要,在很多中间件都需要使用,比如Dubbo,kafka等等,看完就会发现思想都是相通的。

image.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
26天前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
26天前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
28天前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
27 0
|
28天前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
39 0
|
28天前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
39 0
|
28天前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
30 0
|
28天前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
26 0
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
24天前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
45 5