Apache Kafka源码分析 - autoLeaderRebalanceEnable

简介:

在broker的配置中,auto.leader.rebalance.enable (false)

那么这个leader是如何进行rebalance的?

首先在controller启动的时候会打开一个scheduler,

复制代码
if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去
        info("starting the partition rebalance scheduler")
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
      }
复制代码

定期去做,

checkAndTriggerPartitionRebalance

这个函数逻辑,就是找出所有发生过迁移的replica,即

topicsNotInPreferredReplica

并且判断如果满足imbalance比率,即自动触发leader rebalance,将leader迁回perfer replica

关键要理解什么是preferred replicas?

preferredReplicasForTopicsByBrokers =
          controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
            case(topicAndPartition, assignedReplicas) => assignedReplicas.head
          }
 partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] 
TopicAndPartition可以通过topic name和partition id来唯一标识一个partition,Seq[int],表示brokerids,表明这个partition的replicas在哪些brokers上面

从partition的ReplicaAssignment里面过滤掉delete的topic,然后按照assignedReplicas.head进行groupby,就是按照Seq中的第一个brokerid 
意思就是说,默认每个partition的preferred replica就是第一个被assign的replica

groupby的结果就是,每个broker,和应该以该broker作为leader的所有partition,即

case(leaderBroker, topicAndPartitionsForBroker)

那么找出里面当前leader不是preferred的,即发生过迁移的, 
很简单,直接和leaderAndIsr里面的leader进行比较,如果不相等就说明发生过迁徙

复制代码
topicsNotInPreferredReplica =
              topicAndPartitionsForBroker.filter {
                case(topicPartition, replicas) => {
                  controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
                  controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
                }
              }
复制代码

并且只有当某个broker上的imbalanceRatio大于10%的时候,才会触发rebalance

imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker

对每个partition的迁移过程, 
首先preferred的broker要是活着的,并且当前是没有partition正在进行reassign或replica election的,说明这个过程是不能并行的,同时做reassign很容易冲突

复制代码
// do this check only if the broker is live and there are no partitions being reassigned currently
                  // and preferred replica election is not in progress
                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
                      controllerContext.partitionsBeingReassigned.size == 0 &&
                      controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
                      !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                      controllerContext.allTopics.contains(topicPartition.topic)) {
                    onPreferredReplicaElection(Set(topicPartition), true)
复制代码

onPreferredReplicaElection

还是通过partitionStateMachine,来改变partition的状态

partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)

partitionStateMachine会另外分析,这里只需要知道,当前partition的状态是,OnlinePartition –> OnlinePartition 
并且是以preferredReplicaPartitionLeaderSelector,作为leaderSelector的策略

 

PreferredReplicaPartitionLeaderSelector

策略很简单,就是把leader换成preferred replica

复制代码
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
    val preferredReplica = assignedReplicas.head  //取AR第一个replica作为preferred
    // check if preferred replica is the current leader
    val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
    if (currentLeader == preferredReplica) { //如果当前leader就是preferred就不需要做了
      throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" .format(preferredReplica, topicAndPartition))
    } else {
      info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + " Trigerring preferred replica leader election")
      // check if preferred replica is not the current leader and is alive and in the isr
      if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { //判断当前preferred replica所在broker是否活,是否在isr中
        (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderAndIsr.zkVersion + 1), assignedReplicas) //产生新的leaderAndIsr
      } else {
        throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
          "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
      }
    }
  }
}

2015-10-27
目录
相关文章
|
1月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
3月前
|
消息中间件 Kafka Linux
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
65 0
|
9天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
3月前
|
消息中间件 Java Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
31 0
|
2月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
438 2
2024年了,如何更好的搭建Kafka集群?
|
3月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
44 0
|
6月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
243 0
|
3月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
151 0
|
4月前
|
消息中间件 存储 算法
Kafka Raft集群搭建
Kafka Raft集群搭建
74 0
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
43 1

推荐镜像

更多