开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息消费负载和重新分布机制】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12502
消息消费负载和重新分布机制
负载均衡的过程与算法介绍
RocketMQ 消息队列重新分配是由 RebalanceService 线程负责的。线程的启动会随着MQClientinstance的启动而启动。
MQClientinstance 其中会有这么一行代码:
this.rebalanceService.start();
也就是start会启动 rebalanceService 线程。
这个线程在启动之后,就会去调用客户端的 dorebalance 方法,下面是这个方法的代码:
public void doRebalance(){
for (Map.Entry entry : this.consumerTable.entryset()){
MQConsumerInner impl = entry .getValue();
if ( impl !=null){
try { impl.doRebalance();
catch ( Throwable e) {
log.error( "doRebalance exception", e);}
它会进行负载均衡,重新分布的时候,以上代码部分会去遍历每个主题的订阅的队列,然后重新进行一个负载啊,所以整个地方是根据主题的一个订阅信息。然后会去遍历,针对每一个消费方去进行一个负载。
进入 doRebalance 之后,发现有两种方式,一种是推送方式(DefaultMQPushConsumerImpl),一种是拉取方式(DefaultMQPullConsumerImpl),
推送方式。代码如下:
public void doRebalance(final boolean isOrder) {
Map subTable = this.getSubscriptionInner();
if ( subTable != null){
for (final Map.Entry entry : subTable.entrySet()){
final String topic = entry.getKey();
try {
this.rebalancByTopic(topic, isOrder);
}catch (Throwable e) {
if(!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)){
Log.warn("rebalanceByTopic Exception", e);}
真正的负载会根据主题(Topic)去进行负载,这个地方传了一个topic, 所以进入 rebalanceByTopic,
负载均衡对于广播模式(messageModel)来讲其实没有太大的意义,因为广播模式的每一个客户端都要去消费当前主题下所有的队列,最多做更新的处理。
case CLUSTERING:{
Set mqSet = this.topicSubscribeInfoTable.get(topic);
List cidAll =
this.mQClientFactory.findConsumerIdList(topic,consumerGroup);
if(null == mqSet){
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)){
Log.warn("doRebalance,{},but the topic[{}] not exist.",
consumerGroup,topic);}
}
if(null == cidAll){
log.warn("doRebalance, {} {},get consumer id list failed", consumerGroup,topic);}
而真正的负载均衡是发生在集群模式下。它首先是拿到当前主题的这个队列(topic),再去拿到 cidAll ,也就是当前的所有主题客户端,
如下两个进行排序:
Collections.sort(mqAll) Collections.sort(cidAll);
排完序之后,根据负载均衡的策略去进行一个重新的一个分布。
负载均衡的策略有五种。比较常用的有两种,一种叫做AllocateMessageQueueAveragely
,另外一种就是AllocateMessageQueueAveragelyByCircle。
RocketMQ 默认提供5中负载均衡分配算法,下面是常见的两种:
AllocateMessageQueueAveragely : 平均分配
举例:8个队列q1,q2,q3,q4, q5 ,q6 ,q7,q8 ,消费者3个:c1,c2,c3分配如下:
c1:q1,q2 ,q3; c2:q4, q5 ,q6;c3:q7,q8
AllocateMessageQueueAveragelyByCircle:平均轮询分配
举例:8个队列q1,q2,q3,q4, q5 ,q6,q7 ,q8 ,消费者3个:c1,c2,c3分配如下:
c1:q1,q4 ,q7;c2:q2,q5,q8;c3:q3, q6
这两种的区别是:举一个例子:比如现在有八个队列,三个消费者,如果是第一种方式,真正负载的情况是:就是 C1 客户端负责消费一23,C2 负责消费456,C3 负责消费78;如果是第二种方式,在这种情况下,它的这三个消费者的负载是这样的: C1负责消费 Q1,Q4,Q7。C2 负责消费258, C3 负责消费36,它是一个循环的过程。这是两种比较常用的负载均衡的方式,在 MQ 当中这两种方式都可以使用。
注意事项:消息队列的分配遵循一个消费者可以分配到多个队列,但是同一个消息队列只能分配给一个消费者。所以如果出现消费者的个数大于队列的个数,那有些消费者就无法消费消息。以上就是关于负载均衡的过程以及具体负载均衡的算法。