前言
上文分析了RocketMQ消费者消费消息核心原理,其中在消费者端,有一个重量级的组件:负载均衡组件
, 他负责相对均匀的给消费者分配需要拉取的队列信息。
我们此时可能会有以下问题:
1、一个Topic下可能会有很多逻辑队列,而消费者又有多个,这样不同的消费者到底消费哪个队列
呢?
2、如果消费者或者队列扩缩容,Topic下的队列
又该分配给谁呢?
这些时候负载均衡策略就有他的用武之地了。RocketMQ在处理上面的问题是统一处理
的,也就是逻辑是一致
的,它都是通过RebalanceService
这个类来完成负载均衡的工作,看完本文我们就可以明白RocketMQ消费者负载均衡的核心逻辑。
本文将会从源码层面来剖析上面两个的问题。
RocketMQ负载均衡服务启动时机
从类定义可知,RocketMQ的负载均衡服务RebalanceService
,他是一个线程任务。消费者客户端启动时候,会调用start()函数启动RebalanceService
线程,从而触发其run方法运行。
RocketMQ负载均衡服务执行流程
负载均衡服务执行逻辑在doRebalance
函数,里面会对每个消费者组执行负载均衡操作。 也就是一个负载均衡服务是对一个消费者组负责的,那么我们可以想到对不同的消费者组使用不同负载均衡策略
。consumerTable
这个map对象里存储了消费者组对应的的消费者实例
。
ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
public void doRebalance() {
//每个消费者组都有负载均衡
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
由于每个消费者组可能会消费很多topic,每个topic都有自己的不同队列,所以最终是按topic的维度进行负载均衡。
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//按topic维度执行负载均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
最终负载均衡逻辑处理的实现在org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
,其中分为广播消息和集群消息模型两种情况处理。由于广播消息是每个消费者实例都需要消费到,因此逻辑会简单点(不需要分配哪个队列给哪个消费者),我们主要关注集群消息模式。
由于广播消息模型是topic下的每条消息都需要被每个消费者消费到,因此少了给不同的消费者分配消费队列这个流程。
集群消息模型是topic下的每条消息,一个消费者组下只能有一个消费者实例消费到,因此有给不同消费者实例分配消费队列的流程。
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
//广播模型
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
}
}
break;
}
//集群模型
case CLUSTERING: {
//查topic下的消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//查询topic下的所有消费者
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
//负载均衡组件
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
//负载均衡结果
List<MessageQueue> allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//负载均衡执行结束后,判断是否有新的消费策略变化,更新拉取策略
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
//发送更新通知
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
代码逻辑可以看出负载均衡核心功能的主流程,主要做了4件事情:
其中比较重要的是具体的负载均衡策略,他关系着哪些队列是当前消费者需要消费的。下面我们看下负载均衡策略的具体实现。
负载均衡策略原理
看负载均衡策略的具体实现前,我们看下RocketMQ中的负载均衡策略顶层接口
/**
* Strategy Algorithm for message allocating between consumers
*/
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
* 给消费者id分配消费队列
*/
List<MessageQueue> allocate(
final String consumerGroup, //消费者组
final String currentCID, //当前消费者id
final List<MessageQueue> mqAll, //所有的队列
final List<String> cidAll //所有的消费者
);
}
他默认共有6种负载均衡策略实现。
Push消息客户端使用的队列平均分配
策略。我们本文主要分析队列平均分配
负载均衡策略的实现。
为了说明这种分配算法的分配规则,现在对 16 个队列,进行编号,用 q0~q15 表示, 消费者用 c0~c2 表示。 AllocateMessageQueueAveragely分配算法的队列负载机制如下:
c0:q0 q1 q2 q3 q4 q5
c1: q6 q7 q8 q9 q10
c2: q11 q12 q13 q14 q15
其算法的特点是用总数除以消费者个数,余数按消费者顺序分配给消费者,故 c0 会多 分配一个队列,而且队列分配是连续的。
List<String> result = new ArrayList<>();
// 消息队列%消费者 是否能够正好整数倍分配完整
int mod = mqAll.size() % cidAll.size();
//平均每个消费者消费的队列大小
int averageSize = 0;
//计算当前消费者需要消费的队列大小
//如果需要消费的队列数 小于 消费者数量 则每个(编号小于队列编号的)消费者需要消费1个队列
if (mqAll.size() <= cidAll.size()) {
averageSize = 1;
} else {
//如果队列不能被正好整数被分配完,并且当前消费者需要比整数个消费多一个
if (mod > 0 && index < mod) {
averageSize = mqAll.size() / cidAll.size() + 1;
} else {
////如果队列不能被正好整数被分配完,并且当前消费者不需要比整数个消费多一个(当前消费者消费队列数不加1),刚好消费整数个
averageSize = mqAll.size() / cidAll.size();
}
}
//计算消费者需要开始消费的队列下标。
int startIndex;
//消费者不能正好整数倍消费完成,并且需要多消费一个队列的情况下 比如是第3个消费者 平均大小是1 则开始位置是2*1=2
if (mod > 0 && index < mod) {
//计算当前消费者的 需要消费队列大小
startIndex = index * averageSize;
} else {
// 总共3个队列 2个消费者 mod = 1 则第2个消费者的开始位置为 1*1 + 1 = 2
// 总共5个队列 3个消费者 mod = 2 则第2个消费者的开始位置为 2*1 + 1 = 3
startIndex = index * averageSize + mod;
}
// startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
//消费队列的范围 总共3个队列 2个消费者 mode = 1 则第2个消费者的开始位置为 1*1 + 1 = 2
//如果消费者需要消费的数量不会加1 则消费范围为averageSize, 但是也可能存在一个消费者
System.out.println(mqAll.size() - startIndex);
System.out.println(averageSize);
//范围比较 存在一种情况 消费者数量比队列数量多的情况 则存在部分消费者消费不到队列情况,
// 则会使得 averageSize=1 但是 (mqAll.size() - startIndex) =0的情况 这样就范围就是0了。
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
//按范围获取队列,保证连续性质
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
上面是平均分配法具体实现,它是按范围分配,一个范围内的消费队列分配给一个消费者。队列分配好之后,会更新到本地注册表,这时候就是当前消费者最新需要消费的队列。
更新本地注册表后,主要是移除老的拉取消息任务,新增新的拉取消息任务
。
上面的逻辑可以回答上文第1,2个问题了,当前消费者需要拉取消息消费的队列以及新的消费者加入或者移除都会走上面的流程进行负载均衡重加载,达到更新当前消费者实例需要拉取最新的队列目的。
消费者负载均衡能力有哪些扩展应用?
有个疑问?如果我们需要对消息有灰度
消费能力的需求,我们要怎么对负载均衡策略
进行定制化改造
来支持这个能力呢?
总结
看完负载均衡策略的实现,我们发现他完全异步执行
的,负载均衡和拉取消息完全解耦
,这样做既兼顾了性能
,也可以达到最终一致性
的目的。
在RocketMQ中比较重要,在很多中间件都需要使用,比如Dubbo,kafka等等,看完就会发现思想都是相通的。