Apache Kafka源码分析 - kafka controller

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介:

前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即KafkaApis 
剩下的,其实关键就是controller,以及partition和replica的状态机 
这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作

 

最关键的一句,

private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
    onControllerResignation, config.brokerId)

 

kafka.server.ZookeeperLeaderElector

参数的含义比较明显,注意两个callback,是关键
class ZookeeperLeaderElector(controllerContext: ControllerContext,
                             electionPath: String,
                             onBecomingLeader: () => Unit,
                             onResigningAsLeader: () => Unit,
                             brokerId: Int)

做两件事, 
1. 建立watcher,去listen “election path” in ZK, "/controller"; 
当controller发生变化是,可以做相应的处理

controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)

LeaderChangeListener

监听这个path的两个行为,
handleDataChange,即controller发生了变化,这个比较简单,只需要把local的leaderid更新一下就好
leaderId = KafkaController.parseControllerId(data.toString)
handleDataDeleted,这种情况即,controller被删除了,一般是挂了
挂了就重新做elect,但是多个判断,如果本来我就是leader,那先调用onResigningAsLeader,即onControllerResignation,做些清理的工作,比如deregister watcher,关闭各种状态机
if(amILeader)
   onResigningAsLeader()
elect

 

2. elect, 试图去创建EphemeralPath,从而成为Controller

用createEphemeralPathExpectConflictHandleZKBug,试图去创建EphemeralNode, 
如果不成功说明已经被别人抢占 
成功就说明我成为了controller,调用onBecomingLeader(),即onControllerFailover

 

KafkaController.onControllerFailover

这个callback就是controller初始化的关键,

复制代码
/**
   * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
   * It does the following things on the become-controller state change -
   * 1. Register controller epoch changed listener
   * 2. Increments the controller epoch
   * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
   *    leaders for all existing partitions.
   * 4. Starts the controller's channel manager
   * 5. Starts the replica state machine
   * 6. Starts the partition state machine
   * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
   * This ensures another controller election will be triggered and there will always be an actively serving controller
   */
  def onControllerFailover() {
    if(isRunning) {
      info("Broker %d starting become controller state transition".format(config.brokerId))  //开始日志
      //read controller epoch from zk
      readControllerEpochFromZookeeper()
      // increment the controller epoch
      incrementControllerEpoch(zkClient)  //增加ControllerEpoch,以区分过期
      // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
      registerReassignedPartitionsListener()
      registerPreferredReplicaElectionListener()
      partitionStateMachine.registerListeners() 
      replicaStateMachine.registerListeners() 
      initializeControllerContext() //从zk读出broker,topic的信息以初始化controllerContext
      replicaStateMachine.startup() //启动状态机
       partitionStateMachine.startup()
      // register the partition change listeners for all existing topics on failover
      controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))  //为每个topic增加partition变化的watcher
      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) //完成日志
       brokerState.newState(RunningAsController)
      maybeTriggerPartitionReassignment()
      maybeTriggerPreferredReplicaElection()
      /* send partition leadership info to all live brokers */
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) //如注释说,用于成为controller,需要把partition leadership信息发到所有brokers,大家要以我为准
      if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去
        info("starting the partition rebalance scheduler")
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
      }
      deleteTopicManager.start()
    }
    else
      info("Controller has been shut down, aborting startup/failover")
  }
复制代码

 

这里最后再讨论一下createEphemeralPathExpectConflictHandleZKBug 
这个函数看着比较诡异,handleZKBug,到底是zk的什么bug? 
注释给出了,https://issues.apache.org/jira/browse/ZOOKEEPER-1740

这个问题在于“The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.” 
即zk的session过期和ephemeral node删除并不是一个原子操作,所以带来的问题的过程如下,

1. client session超时,它会假设在zk,这个ephemeral node已经被删除,但是zk当前由于某种原因hang住了,比如very long fsync operations,所以其实这个ephemeral node并没有被删除

2. 但client是认为ephemeral node已经被删除,所以它会尝试重新创建这个ephemeral node,但得到的结果是NodeExists,因为这个node并没有被删除,那么client想既然有就算了

3. 这个时候,zk从very long fsync operations的hang中恢复,它会继续之前没有完成的操作,把ephemeral node删掉

4. 这样client就会发现ephemeral node不存在了,虽然session并没有超时

所以这个函数就是为了规避这个问题, 
方法就是,当我发现NodeExists时,说明zk当前hang住了,这个时候我需要等待,并反复尝试,直到zk把这个node删除后,我再重新创建

复制代码
def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = {
    while (true) {
      try {
        createEphemeralPathExpectConflict(zkClient, path, data)
        return
      } catch {
        case e: ZkNodeExistsException => {
          // An ephemeral node may still exist even after its corresponding session has expired
          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
          // and hence the write succeeds without ZkNodeExistsException
          ZkUtils.readDataMaybeNull(zkClient, path)._1 match {
            case Some(writtenData) => {
              if (checker(writtenData, expectedCallerData)) {
                info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session, ".format(data, path)
                  + "hence I will backoff for this node to be deleted by Zookeeper and retry")

                Thread.sleep(backoffTime)
              } else {
                throw e
              }
            }
            case None => // the node disappeared; retry creating the ephemeral node immediately
          }
        }
        case e2: Throwable => throw e2
      }
    }
  }
复制代码

这个方式有个问题就是,如果zk hang住,这里的逻辑是while true,这个函数也会一直hang住


本文章摘自博客园,原文发布日期:2015-11-05

目录
相关文章
|
12天前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
40 7
|
12天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
53 5
|
9天前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
10天前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
34 0
|
3月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
44 1
|
22天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
517 13
Apache Flink 2.0-preview released
|
26天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
60 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
206 2
|
3月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
47 3

推荐镜像

更多
下一篇
无影云桌面