客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic 扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费的呢?
RebalancePullImpl 和 RebalancePushImpl 两个重平衡实现类,分别被 DefaultMQPullConsumer 和DefaultMQPushConsumer 使用。下面讲一下 Rebalancelmpl 的核心属性和方法
核心属性
public abstract class RebalanceImpl { protected static final InternalLogger log = ClientLogger.getLog(); //记录MessageQueue和ProcessQueue的关系。MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器 protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); //Topic 路由信息 。保存 Topic 和 MessageQueue的关系。 protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>(); //真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>(); protected String consumerGroup; protected MessageModel messageModel; //消费分配策略的实现 protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; //client实例对象 protected MQClientInstance mQClientFactory; }
核心方法
public abstract class RebalanceImpl { //为MessageQueue加锁 public boolean lock(final MessageQueue mq) {} //执行Rebalance操作 public void doRebalance(final boolean isOrder) {} //通知Message发生变化,这个方法在Push和Pull两个类中被重写 public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided); //去掉不再需要的 MessageQueue public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq); //执行消息拉取请求 public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList); //在Rebalance中更新processQueue private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) }
Rebalancelmpl 、 RebalancePushImpl 、 RebalancePullImpl 是Rebalance的核心实现,主要逻辑都在Rebalancelmpl中,因为Pull消费者和Push消费者对Rebalance的需求不同,在各自的实现中重写了部分方法,以满足自身需求
如果有一个消费者实例下线了,Broker和其他消费者是怎么做Rebalance的呢
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }
目前队列分配策略有以下5种实现方法
- AllocateMessageQueueAveragely:平均分配,也是默认使用的策略(强烈推荐)。
- AllocateMessageQueueAveragelyByCircle:环形分配策略。
- AllocateMessageQueueByConfig:手动配置。
- AllocateMessageQueueConsistentHash:一致性Hash分配。
- AllocateMessageQueueByMachineRoom:机房分配策略