RocketMQ源码(三)简单探索Producer和Consumer与Queue之间的负载均衡策略
在RocketMQ架构中,我们都知道一个topic下可以创建多个queue,生产者通过负载均衡策略可以将消息均匀的分发在各个queue中,而这些queue
可以通过负载均衡给多个消费者订阅从而提升消费效率,本文将从以下两个方面从源码角度分析producer和consumer的负载均衡原理:
- Producer如何将消息负载均衡发送给queue?
- Consumer如何通过负载均衡并发消费queue的消息?
Tip
:以下是本人经过多年的工作经验集成的JavaWeb脚手架,封装了各种通用的starter可开箱即用,同时列举了互联网各种高性能场景的使用示例。
// Git代码
https://gitee.com/yeeevip/yeee-memo
https://github.com/yeeevip/yeee-memo
1 Producer下的负载均衡
在Producer下主要指的是通过负载均衡算法去选择一个queue去发送消息,这里默认使用的策略是轮询的方式按顺序选择Queue。
1.1 源码分析
1
在没有指定Queue发送消息时,会调用到MQFaultStrategy.selectOneMessageQueue方法
2
selectOneMessageQueue中调用tpInfo.selectOneMessageQueue()
3
进而使用轮询算法从该Topic下的所有Queue里选择一个queue去发送消息
1.1.1 核心代码
public class MQFaultStrategy {
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
...
...
return tpInfo.selectOneMessageQueue();
}
}
/*
轮询算法:维护一个全局的index,每次都+1再与Queue的size求余
*/
public class TopicPublishInfo {
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = index % this.messageQueueList.size();
return this.messageQueueList.get(pos);
}
}
2 Consumer下的负载均衡
由于Consumer在广播模式下会把消息发给所有消费者,所以这里我们讨论的是集群模式下queue与消费者的负载均衡;
Rocketmq中规定一个Queue只能被一个消费者订阅,而一个消费者可以订阅多个Queue,这样自然会涉及到消费者与Queue分配协调策略。
当Broker扩容或缩容、Queue扩容等场景都可能导致消费者所订阅Topic的队列数量发生变化,也会再次重新分配。
2.1 分配策略算法
Rocketmq提供了多种Queue分配策略算法,如下
- AllocateMessageQueueAveragely
平均算法: 算出平均值,将连续的队列按平均值分配给每个消费者。
如果能够整除,则按顺序将平均值个Queue分配,如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配
- AllocateMessageQueueAveragelyByCircle
环形平均算法:将消费者按顺序形成一个环形,然后按照这个环形顺序逐个给消费者分配一个Queue
- AllocateMessageQueueConsistentHash
一致性hash算法:先将消费端的hash值放于环上,同时计算队列的hash值,以顺时针方向,分配给离队列hash值最近的一个消费者节点
2.2 源码分析
1
创建DefaultMQPushConsumer时默认使用AllocateMessageQueueAveragely分配策略,即平均分配算法
2
consumer调用start启动时会调用到RebalanceService.start()开启一个任务
3
RebalanceService会调用到RebalanceImpl.rebalanceByTopic执行具体的平衡策略
4
进而拿到所有的Consumer和Queue使用设置的AllocateMessageQueueStrategy算法去分配订阅关系
2.2.1 核心代码
/*
1.初始化:默认使用AllocateMessageQueueAveragely算法分配Queue
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}
}
/*
2.开启一个RebalanceService任务执行分配策略
*/
public class MQClientInstance {
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
...
// Start rebalance service
this.rebalanceService.start();
...
default:
break;
}
}
}
}
/*
3.RebalanceImpl.rebalanceByTopic执行具体的分配逻辑
*/
public abstract class RebalanceImpl {
private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
boolean balanced = true;
switch (messageModel) {
...
case CLUSTERING: {
// 拿到所有的Queue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 拿到所有的消费者ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
...
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
// 初始化设置的分配算法:即AllocateMessageQueueAveragely平均分配算法
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 调用分配算法的具体实现
allocateResult = strategy.allocate(...mqAll, cidAll);
} catch (Throwable e) {
log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
return false;
}
...
}
break;
}
default:
break;
}
return balanced;
}
}
/*
4.执行AllocateMessageQueueAveragely平均分配算法的具体实现
*/
public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
List<MessageQueue> result = new ArrayList<>();
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
}
最后
至此,通过源码探索我们就大致了解了Producer及Consumer的负载均衡策略,其中Producer主要是通过轮询方式依次把消息发送到Queue,而Consumer默认使用的
平均分配算法去解决消费者节点与Queue之间的分配订阅关系。
Tip
:以下是本人经过多年的工作经验集成的JavaWeb脚手架,封装了各种通用的starter可开箱即用,同时列举了互联网各种高性能场景的使用示例。
// Git代码
https://gitee.com/yeeevip/yeee-memo
https://github.com/yeeevip/yeee-memo