RocketMQ消费者如何实现重平衡

简介: RocketMQ消费者如何实现重平衡

前言

我们在搭建好RocketMQ集群后,当一个集群中有多个消费者的情况下,它们是如何选择消息队列进行消费的呢?如果在项目运行过程中,有新的消费者加入或者某一个消费者宕机了,其他消费者是如何应对的呢?同样的,新的Broker上线或者某一个Broker下线了,消费者应该如何应对才能保证消息被正常地消费掉,下面我们就来看一下RocketMQ源码是如何处理的;

定时任务

我们在MQClientInstance实例对象调用start()方法里面可以找到这样一段代码:

this.rebalanceService.start();
复制代码

实现说明一下,所有的消费者实例都要通过start()方法启动,所以最终会调用到MQClientInstance.start()

我们跟着源码进入ServiceThread.start()方法:

public void start() {
        log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
        if (!started.compareAndSet(false, true)) {
            return;
        }
        stopped = false;
        // 创建线程
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        // 启动线程
        this.thread.start();
    }
复制代码

这个任务很隐蔽,它并没有使用scheduledExecutorService来跑这个定时任务,我们在这一段源码中只看到创建了一个线程,那么它的真正的业务逻辑在哪里执行呢?

我们发现ServiceThread实现了Runnable,并且它是一个抽象类,所以它没有实现run()方法,那意味着真正的业务逻辑在它的子类里面;

我们可以在MQClientInstance的构造函数中找到rebalanceService的实例:

this.rebalanceService = new RebalanceService(this);
复制代码

最终,我们在RebalanceService中找到了run()方法:

// 默认20秒,可以通过rocketmq.client.rebalance.waitInterval配置调整
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));  
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            // 等待20秒
            this.waitForRunning(waitInterval);
            // 执行重平衡
            this.mqClientFactory.doRebalance();
        }
        log.info(this.getServiceName() + " service end");
    }
复制代码

这段代码就是一个死循环,只要这个任务没有停止,那么它就不断地每隔20秒执行一次重平衡;

重平衡

根据源码,我们发现所有的消费者重平衡的逻辑都是继承自RebalanceImpl类:

public boolean doRebalance(final boolean isOrder) {
        boolean balanced = true;
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    // 1.先判断是否是客户端重平衡
                    // 2.如果不是客户端重平衡,就要判断这个topic是否存在broker里面,因为要使用boker重平衡,不在这篇文章讨论范围内
                    if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
                        balanced = this.getRebalanceResultFromBroker(topic, isOrder);
                    } else {
                        // 客户端重平衡实现
                        balanced = this.rebalanceByTopic(topic, isOrder);
                    }
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalance Exception", e);
                        balanced = false;
                    }
                }
            }
        }
        // 更新重平衡后的processQueueTable、popProcessQueueTable、topicClientRebalance、topicBrokerRebalance
        this.truncateMessageQueueNotMyTopic();
        return balanced;
    }
复制代码

所以我们重点关注一下rebalanceByTopic()方法,因为它才是客户端重平衡的核心实现方法;

在客户端中,重平衡的实现需要根据消费者的messageMode来做不同的实现:

  • 广播模式重平衡
    我们可以在创建消费者实例的时候特别指定messageMode,通过setMessageModel()方法设置;
case BROADCASTING: {
    // 根据topic获取所有的MessageQueue实例
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    if (mqSet != null) {
        // 如果MessageQueue列表有变化,那么就更新processQueueTable
        // 剔除下线的消费者与超时的broker
        // 给新上线的broker创建pullRequest准备拉消息
        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
        if (changed) {
            // 调用监听器的messageQueueChanged()方法
            this.messageQueueChanged(topic, mqSet, mqSet);
            log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet);
        }
        // 对比当前正在运行的MessageQueue,如果和mqSet一致,说明已经达到平衡了
        balanced = mqSet.equals(getWorkingMessageQueue(topic));
    } else {
        this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    }
    break;
}
复制代码
  • 广播模式的话,就直接拿到所有的Broker下的MessageQueue,然后更新processQueueTable即可;
    topicSubscribeInfoTable这个Map中的数据是通过定时任务更新的,可以查看MQClientInstance.startScheduledTask()方法:
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
复制代码
  • 集群模式重平衡我们可以发现,其实广播模式下的重平衡主要是检测上线和下线的Broker,因为广播模式要求所有订阅该topic的消费者都要消费消息;但是集群模式下不一样,同一个消费组的消费者只要消费一次即可;我们来看一下集群模式下是如何重平衡的;
  • 获取MessageQueue集合与ConsumerId列表
// topic对应的所有MessageQueue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 同一个消费组下订阅了topic的消费者ConsumerId列表
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
复制代码
  • 重平衡就是要让同一个消费组下所有订阅了这个topic的消费者来分配所有的MessageQueue
  • 分配MessageQueue
  • 上面最关键的数据都准备好后,我们来看一下是如何实现分配策略的:
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 排序MessageQueue
Collections.sort(mqAll);
// 排序ConsumerId
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
    // 按照指定策略分配
    allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable e) {
    log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
    return false;
}
复制代码
  • 在消费者中,默认的分配策略是AllocateMessageQueueAveragely:
@Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }
        // 获取当前消费者consumerId所在的索引位
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        // 计算每个消费者平均能分配到多少个MessageQueue
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
复制代码
  • 假设有8个MessageQueue,有3个消费者,那么在AllocateMessageQueueAveragely分配策略下,最终分配的结果为:

1.第一个消费者分配到的MessageQueue索引值为[0,1,2];

2.第二个消费者分配到的MessageQueue索引值为[3,4,5];

2.第三个消费者分配到的MessageQueue索引值为[6,7];

  • 更新processQueueTable
  • 剩下的时候和广播模式下是一样的,需要拿分配到的MessageQueue去更新processQueueTable:
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
    log.info(
                            "client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
    this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
balanced = allocateResultSet.equals(getWorkingMessageQueue(topic));
复制代码

小结

我们从消费者的源码中,非常清楚地知道了它是如何应对Broker和其他消费者上下线的情况,保证能够及时地感知到Broker及其他消费者的状况,实现消息消费的高可用;

1.消费者重平衡是通过定时任务的方式实现的,默认是每20秒调用执行一次重平衡;该时间间隔可以通过rocketmq.client.rebalance.waitInterval配置调整;

2.重平衡分为两种,一种是Broker上面实现,一种是消费者客户端实现;

3.广播模式下的重平衡主要是检测Broker的上下线;

4.集群模式下有多种重平衡策略,默认是平均分配的策略;


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
124 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
85 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
72 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
64 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
82 0
|
5月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
160 2
|
5月前
|
消息中间件 负载均衡 Apache
【RocketMQ系列七】消费者和生产者的实现细节
【RocketMQ系列七】消费者和生产者的实现细节
154 1