Apache Kafka源码分析 - kafka controller-阿里云开发者社区

开发者社区> 寒凝雪> 正文

Apache Kafka源码分析 - kafka controller

简介:
+关注继续查看

前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即KafkaApis 
剩下的,其实关键就是controller,以及partition和replica的状态机 
这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作

 

最关键的一句,

private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
    onControllerResignation, config.brokerId)

 

kafka.server.ZookeeperLeaderElector

参数的含义比较明显,注意两个callback,是关键
class ZookeeperLeaderElector(controllerContext: ControllerContext,
                             electionPath: String,
                             onBecomingLeader: () => Unit,
                             onResigningAsLeader: () => Unit,
                             brokerId: Int)

做两件事, 
1. 建立watcher,去listen “election path” in ZK, "/controller"; 
当controller发生变化是,可以做相应的处理

controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)

LeaderChangeListener

监听这个path的两个行为,
handleDataChange,即controller发生了变化,这个比较简单,只需要把local的leaderid更新一下就好
leaderId = KafkaController.parseControllerId(data.toString)
handleDataDeleted,这种情况即,controller被删除了,一般是挂了
挂了就重新做elect,但是多个判断,如果本来我就是leader,那先调用onResigningAsLeader,即onControllerResignation,做些清理的工作,比如deregister watcher,关闭各种状态机
if(amILeader)
   onResigningAsLeader()
elect

 

2. elect, 试图去创建EphemeralPath,从而成为Controller

用createEphemeralPathExpectConflictHandleZKBug,试图去创建EphemeralNode, 
如果不成功说明已经被别人抢占 
成功就说明我成为了controller,调用onBecomingLeader(),即onControllerFailover

 

KafkaController.onControllerFailover

这个callback就是controller初始化的关键,

复制代码
/**
   * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
   * It does the following things on the become-controller state change -
   * 1. Register controller epoch changed listener
   * 2. Increments the controller epoch
   * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
   *    leaders for all existing partitions.
   * 4. Starts the controller's channel manager
   * 5. Starts the replica state machine
   * 6. Starts the partition state machine
   * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
   * This ensures another controller election will be triggered and there will always be an actively serving controller
   */
  def onControllerFailover() {
    if(isRunning) {
      info("Broker %d starting become controller state transition".format(config.brokerId))  //开始日志
      //read controller epoch from zk
      readControllerEpochFromZookeeper()
      // increment the controller epoch
      incrementControllerEpoch(zkClient)  //增加ControllerEpoch,以区分过期
      // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
      registerReassignedPartitionsListener()
      registerPreferredReplicaElectionListener()
      partitionStateMachine.registerListeners() 
      replicaStateMachine.registerListeners() 
      initializeControllerContext() //从zk读出broker,topic的信息以初始化controllerContext
      replicaStateMachine.startup() //启动状态机
       partitionStateMachine.startup()
      // register the partition change listeners for all existing topics on failover
      controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))  //为每个topic增加partition变化的watcher
      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) //完成日志
       brokerState.newState(RunningAsController)
      maybeTriggerPartitionReassignment()
      maybeTriggerPreferredReplicaElection()
      /* send partition leadership info to all live brokers */
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) //如注释说,用于成为controller,需要把partition leadership信息发到所有brokers,大家要以我为准
      if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去
        info("starting the partition rebalance scheduler")
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
      }
      deleteTopicManager.start()
    }
    else
      info("Controller has been shut down, aborting startup/failover")
  }
复制代码

 

这里最后再讨论一下createEphemeralPathExpectConflictHandleZKBug 
这个函数看着比较诡异,handleZKBug,到底是zk的什么bug? 
注释给出了,https://issues.apache.org/jira/browse/ZOOKEEPER-1740

这个问题在于“The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.” 
即zk的session过期和ephemeral node删除并不是一个原子操作,所以带来的问题的过程如下,

1. client session超时,它会假设在zk,这个ephemeral node已经被删除,但是zk当前由于某种原因hang住了,比如very long fsync operations,所以其实这个ephemeral node并没有被删除

2. 但client是认为ephemeral node已经被删除,所以它会尝试重新创建这个ephemeral node,但得到的结果是NodeExists,因为这个node并没有被删除,那么client想既然有就算了

3. 这个时候,zk从very long fsync operations的hang中恢复,它会继续之前没有完成的操作,把ephemeral node删掉

4. 这样client就会发现ephemeral node不存在了,虽然session并没有超时

所以这个函数就是为了规避这个问题, 
方法就是,当我发现NodeExists时,说明zk当前hang住了,这个时候我需要等待,并反复尝试,直到zk把这个node删除后,我再重新创建

复制代码
def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = {
    while (true) {
      try {
        createEphemeralPathExpectConflict(zkClient, path, data)
        return
      } catch {
        case e: ZkNodeExistsException => {
          // An ephemeral node may still exist even after its corresponding session has expired
          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
          // and hence the write succeeds without ZkNodeExistsException
          ZkUtils.readDataMaybeNull(zkClient, path)._1 match {
            case Some(writtenData) => {
              if (checker(writtenData, expectedCallerData)) {
                info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session, ".format(data, path)
                  + "hence I will backoff for this node to be deleted by Zookeeper and retry")

                Thread.sleep(backoffTime)
              } else {
                throw e
              }
            }
            case None => // the node disappeared; retry creating the ephemeral node immediately
          }
        }
        case e2: Throwable => throw e2
      }
    }
  }
复制代码

这个方式有个问题就是,如果zk hang住,这里的逻辑是while true,这个函数也会一直hang住


本文章摘自博客园,原文发布日期:2015-11-05

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
1月28日云栖精选夜读 | 终于等到你!阿里正式向 Apache Flink 贡献 Blink 源码
如同我们去年12月在 Flink Forward China 峰会所约,阿里巴巴内部 Flink 版本 Blink 将于 2019 年 1 月底正式开源。今天,我们终于等到了这一刻。
3865 0
从源码分析如何优雅的使用 Kafka 生产者
前言 在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
522 0
仿酷狗音乐播放器开发日志二十二 动态调色板控件第二版(性能大幅提升附源码)
转载请说明原出处,谢谢~~         在上次写的博客《仿酷狗音乐播放器开发日志二十一 开发动态调色板控件(附源码)》发布后,我在群里和网友讨论这个控件的性能和优 缺点,发现了他很多不足,还有很多提升空间,之后我简单的修改了代码提升了控件的响应速度。
853 0
LinkedHashMap源码分析(基于JDK1.6)
LinkedHashMap类似于HashMap,但是迭代遍历它时,取得“键值对”的顺序是插入次序,或者是最近最少使用(LRU)的次序。只比HashMap慢一点;而在迭代访问时反而更快,因为它使用链表维护内部次序(HashMap是基于散列表实现的,相关HashMap的内容可以看《Java集合类》和《HashMap源码分析》)。
574 0
Java 读写锁 ReentrantReadWriteLock 源码分析
原文出处:https://javadoop.com/post/reentrant-read-write-lock 本文内容:读写锁 ReentrantReadWriteLock 的源码分析,基于 Java7/Java8。
789 0
TreeMap源码分析——基础分析(基于JDK1.6)
常见的数据结构有数组、链表,还有一种结构也很常见,那就是树。前面介绍的集合类有基于数组的ArrayList,有基于链表的LinkedList,还有链表和数组结合的HashMap,今天介绍基于树的TreeMap。
401 0
Apache Flink流作业提交流程分析
提交流程调用的关键方法链 用户编写的程序逻辑需要提交给Flink才能得到执行。本文来探讨一下客户程序如何提交给Flink。鉴于用户将自己利用Flink的API编写的逻辑打成相应的应用程序包(比如Jar)然后提交到一个目标Flink集群上去运行是比较主流的使用场景,因此我们的分析也基于这一场景进行。
3693 0
深入K8S Job(三):cronJob controller源码分析
源码流程图 概述 cronJob controller 的实现比较简单,使用 Cron - Wikipedia 的方法,确定调度规则,底层的调度对象就是依赖了 job,它不会去检查任何 Pod。 该 controller 也没有依赖各种 informer,就简单创建了一个循环运行的协程,每次遍历现有的 jobs & cronJobs,整理它们的关系并进行管理。
1204 0
+关注
5854
文章
223
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载