前言
我们在搭建好RocketMQ
集群后,当一个集群中有多个消费者的情况下,它们是如何选择消息队列进行消费的呢?如果在项目运行过程中,有新的消费者加入或者某一个消费者宕机了,其他消费者是如何应对的呢?同样的,新的Broker
上线或者某一个Broker
下线了,消费者应该如何应对才能保证消息被正常地消费掉,下面我们就来看一下RocketMQ
源码是如何处理的;
定时任务
我们在MQClientInstance
实例对象调用start()
方法里面可以找到这样一段代码:
this.rebalanceService.start(); 复制代码
实现说明一下,所有的消费者实例都要通过start()
方法启动,所以最终会调用到MQClientInstance.start()
;
我们跟着源码进入ServiceThread.start()
方法:
public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } stopped = false; // 创建线程 this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); // 启动线程 this.thread.start(); } 复制代码
这个任务很隐蔽,它并没有使用scheduledExecutorService
来跑这个定时任务,我们在这一段源码中只看到创建了一个线程,那么它的真正的业务逻辑在哪里执行呢?
我们发现ServiceThread
实现了Runnable
,并且它是一个抽象类,所以它没有实现run()
方法,那意味着真正的业务逻辑在它的子类里面;
我们可以在MQClientInstance
的构造函数中找到rebalanceService
的实例:
this.rebalanceService = new RebalanceService(this); 复制代码
最终,我们在RebalanceService
中找到了run()
方法:
// 默认20秒,可以通过rocketmq.client.rebalance.waitInterval配置调整 private static long waitInterval = Long.parseLong(System.getProperty( "rocketmq.client.rebalance.waitInterval", "20000")); @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 等待20秒 this.waitForRunning(waitInterval); // 执行重平衡 this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } 复制代码
这段代码就是一个死循环,只要这个任务没有停止,那么它就不断地每隔20秒执行一次重平衡;
重平衡
根据源码,我们发现所有的消费者重平衡的逻辑都是继承自RebalanceImpl
类:
public boolean doRebalance(final boolean isOrder) { boolean balanced = true; Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { // 1.先判断是否是客户端重平衡 // 2.如果不是客户端重平衡,就要判断这个topic是否存在broker里面,因为要使用boker重平衡,不在这篇文章讨论范围内 if (!clientRebalance(topic) && tryQueryAssignment(topic)) { balanced = this.getRebalanceResultFromBroker(topic, isOrder); } else { // 客户端重平衡实现 balanced = this.rebalanceByTopic(topic, isOrder); } } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalance Exception", e); balanced = false; } } } } // 更新重平衡后的processQueueTable、popProcessQueueTable、topicClientRebalance、topicBrokerRebalance this.truncateMessageQueueNotMyTopic(); return balanced; } 复制代码
所以我们重点关注一下rebalanceByTopic()
方法,因为它才是客户端重平衡的核心实现方法;
在客户端中,重平衡的实现需要根据消费者的messageMode
来做不同的实现:
- 广播模式重平衡
我们可以在创建消费者实例的时候特别指定messageMode
,通过setMessageModel()
方法设置;
case BROADCASTING: { // 根据topic获取所有的MessageQueue实例 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { // 如果MessageQueue列表有变化,那么就更新processQueueTable // 剔除下线的消费者与超时的broker // 给新上线的broker创建pullRequest准备拉消息 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { // 调用监听器的messageQueueChanged()方法 this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } // 对比当前正在运行的MessageQueue,如果和mqSet一致,说明已经达到平衡了 balanced = mqSet.equals(getWorkingMessageQueue(topic)); } else { this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet()); log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } 复制代码
- 广播模式的话,就直接拿到所有的
Broker
下的MessageQueue
,然后更新processQueueTable
即可;topicSubscribeInfoTable
这个Map中的数据是通过定时任务更新的,可以查看MQClientInstance.startScheduledTask()
方法:
this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); 复制代码
- 集群模式重平衡我们可以发现,其实广播模式下的重平衡主要是检测上线和下线的
Broker
,因为广播模式要求所有订阅该topic
的消费者都要消费消息;但是集群模式下不一样,同一个消费组的消费者只要消费一次即可;我们来看一下集群模式下是如何重平衡的;
- 获取
MessageQueue
集合与ConsumerId
列表
// topic对应的所有MessageQueue Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 同一个消费组下订阅了topic的消费者ConsumerId列表 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 复制代码
- 重平衡就是要让同一个消费组下所有订阅了这个
topic
的消费者来分配所有的MessageQueue
;
- 分配
MessageQueue
- 上面最关键的数据都准备好后,我们来看一下是如何实现分配策略的:
List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); // 排序MessageQueue Collections.sort(mqAll); // 排序ConsumerId Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { // 按照指定策略分配 allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e); return false; } 复制代码
- 在消费者中,默认的分配策略是
AllocateMessageQueueAveragely
:
@Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!check(consumerGroup, currentCID, mqAll, cidAll)) { return result; } // 获取当前消费者consumerId所在的索引位 int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); // 计算每个消费者平均能分配到多少个MessageQueue int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; } 复制代码
- 假设有8个
MessageQueue
,有3个消费者,那么在AllocateMessageQueueAveragely
分配策略下,最终分配的结果为:
1.第一个消费者分配到的MessageQueue索引值为[0,1,2];
2.第二个消费者分配到的MessageQueue索引值为[3,4,5];
2.第三个消费者分配到的MessageQueue索引值为[6,7];
- 更新
processQueueTable
- 剩下的时候和广播模式下是一样的,需要拿分配到的
MessageQueue
去更新processQueueTable
:
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } balanced = allocateResultSet.equals(getWorkingMessageQueue(topic)); 复制代码
小结
我们从消费者的源码中,非常清楚地知道了它是如何应对Broker
和其他消费者上下线的情况,保证能够及时地感知到Broker
及其他消费者的状况,实现消息消费的高可用;
1.消费者重平衡是通过定时任务的方式实现的,默认是每20秒调用执行一次重平衡;该时间间隔可以通过rocketmq.client.rebalance.waitInterval配置调整;
2.重平衡分为两种,一种是
Broker
上面实现,一种是消费者客户端实现;3.广播模式下的重平衡主要是检测
Broker
的上下线;4.集群模式下有多种重平衡策略,默认是平均分配的策略;