KAFKA 海量吞吐低延迟技术解密:KafkaController

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 1、导读KAFKA 是基于 Scala 语言开发的一个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用,越来越多的开源分布式处理系统如 Cloudera、Storm、Spark、Flink 等都支持与 KAFKA 集成。本文将基于 KAFKA v1.1.0 版本源码,探讨 KafkaController 的启动流

1、导读

KAFKA 是基于 Scala 语言开发的一个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用,越来越多的开源分布式处理系统如 Cloudera、Storm、Spark、Flink 等都支持与 KAFKA 集成。本文将基于 KAFKA v1.1.0 版本源码,探讨 KafkaController 的启动流程、选举流程、脑裂问题和事件队列模型。笔者水平有限,若有不当之处,敬请指正。

2、Controller 是什么

很多时候业务为了保证 7x24 小时提供服务,假如一台机器宕机,仍然能有其它机器顶替它继续工作,一般采用 Leader/Follower 模式,即常说的主从模式。正常情况下主机提供服务,备机只负责监听主机状态,当主机故障时,业务可以自动切换到备机继续提供服务,而在切换过程中从备机中选出主机的过程就是 Leader 选举。

KAFKA 集群的多个服务代理节点(节点,Broker)中,有一个 Broker 会被选举为控制器(Controller)。Controller 负责管理整个集群中所有分区和副本的状态变化,并时刻检测集群状态的变化,例如 Broker 故障、分区的 Leader 副本选举和分区重分配。每个 KAFKA 集群只有一个 Controller,以维护集群的单一一致视图。虽然 Controller 成为单点,但 KAFKA 有处理单点故障的机制。

3、Controller 的启动流程

KAFKA Server 启动时会做一系列的初始化,例如:初始化 ZooKeeper、启动 KafkaController、启动 GroupCoordinator、启动 ReplicaManager 等等。

3.1、初始化 ZooKeeper

初始化 ZooKeeper 主要是建立连接,注册监听器,其中值得关注的是在初始化原生ZooKeeper对象的同时会注册一个全局的事件监听器(Watcher),负责监听ZK节点数据变更、ZK的会话状态等等。

// sessionTimeoutMs: server.properties 配置文件中的 zookeeper.session.timeout.ms
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)

ZooKeeperClientWatcher 负责监听ZK节点数据变更、会话状态等等。

private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
    override def process(event: WatchedEvent): Unit = {
      debug(s"Received event: $event")
      Option(event.getPath) match {
        case None =>
          val state = event.getState
          stateToMeterMap.get(state).foreach(_.mark())
          inLock(isConnectedOrExpiredLock) {
            isConnectedOrExpiredCondition.signalAll()
          }
          if (state == KeeperState.AuthFailed) {
            error("Auth failed.")
            stateChangeHandlers.values.foreach(_.onAuthFailure())
          } else if (state == KeeperState.Expired) {
            // 监听Broker与ZK的会话过期事件
            scheduleSessionExpiryHandler()
          }
        case Some(path) =>
          // 监听ZK节点状态,将事件分发至相应的Handler
          (event.getType: @unchecked) match {
            case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
            case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation())
            case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
            case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
          }
      }
    }
  }

其中 zNodeChangeHandlers、zNodeChildChangeHandlers 均是 ConcurrentHashMap 结构,在后续的 KAFKA 的启动过程中会将相应的 Handler 添加进来,ZK节点监听器监听到节点发生变化后,KafkaController 负责从 ConcurrentHashMap 结构中根据 ZK 节点路径取出对应的处理器(Handler)处理事件。

private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala

3.2、启动内存队列异步事件管理器

所有 Broker 会定期检查 ZooKeeper 以表明自己是健康的。如果 Broker 的响应时间超过 zookeeper.session.timeout.ms(默认18,000 毫秒),则意味着 Broker 与 ZooKeeper 的会话过期,随后将触发 Controller 选举。

eventManager 的对象类型是 ControllerEventManager,用于统一管理监听器触发事件,当ZK监听到节点变化后,其运用单线程方式从内存队列中取出对应的 Handler 处理事件。「Controller 的事件队列模型」章节会展开讲述。

def startup() = {
    zkClient.registerStateChangeHandler(new StateChangeHandler {
      override val name: String = StateChangeHandlers.ControllerHandler
      override def beforeInitializingSession(): Unit = {
        // 初始化 KafkaController 和 ZK 的会话信息
        // 1、重置内存中的ActiveControllerId
        // 2、重置内存中的依赖对象信息,例如监听器触发的事件、上下文信息
        // 3、重置Kafka Broker状态
        val expireEvent = new Expire
        eventManager.clearAndPut(expireEvent)
        expireEvent.waitUntilProcessed()
      }
      override def afterInitializingSession(): Unit = {
        // 事件管理器添加「注册Broker和选举Controller」事件
        // 1、ZK注册Broker信息
        // 2、选举Controller
        eventManager.put(RegisterBrokerAndReelect)
      }
    })

    // 事件管理器添加KafkaController的启动事件
    eventManager.put(Startup)
    // 单线程启动事件管理器
    eventManager.start()
  }

3.3、选举 Controller

每个 Broker 启动过程中,都会尝试在 ZK 创建 /controller 临时节点,也有可能其它 Broker 同时尝试创建,只有创建成功的那个 Broker 才会成为 Controller,而创建失败的 Broker 选举失败。「Controller 的选举流程」章节会展开讲述。

case object Startup extends ControllerEvent {
		...
    override def process(): Unit = {
      // 1、/controller 节点添加ControllerChangeHandler,监听Controller的变更事件
      zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
      // 2、选举Controller
      elect()
    }
  }

3.3、初始化上下文信息

Controller 在选举成功后会读取 ZK 中各节点的数据来完成一系列的初始化工作,包括初始化上下文信息(从ZK读取ControllerEpoch、Topic、分区、副本相关的各种元数据信息)、启动并管理分区和副本的状态机、定义监听器触发事件、定义周期性任务触发事件(如分区自平衡)。不管是监听器触发的事件,还是周期性任务触发事件,都会读取或更新 Controller 中的上下文信息。

private def onControllerFailover() {
    // 1、读取 /controller_epoch 的值到 ControllerContext(上下文信息)
    // 2、递增 ControllerEpoch,并更新到 /controller_epoch
    readControllerEpochFromZooKeeper()
    incrementControllerEpoch()

    // 3、注册 ZK 监听器触发事件(逻辑上基于事件队列模型实现,并非真正在ZK上增加Watcher)
    //    - Broker 相关变化的事件处理器
    //    - Topic 相关变化的事件处理器
    //    - ISR 集合变更的事件处理器
    //    - 优先副本选举的事件处理器
    //    - 分区重分配的事件处理器
    val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
      isrChangeNotificationHandler)
    childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
    val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
    nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)

    ...

    // 4、初始化上下文信息,ControllerContext用于维护Topic、分区和副本相关的各种元数据,
    //
    initializeControllerContext()

    ...
  }
private def initializeControllerContext() {
    // 读取 /brokers/ids,初始化可用 Broker 集合
    controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
    // 读取 /broker/topics,初始化集群中所有 Topic 集合
    controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
    // 所有 Topic 对应ZK中 /brokers/topics/<topic> 节点添加PartitionModificationsHandler,用于监听Topic中的分区分配变化
    registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
    // 读取 /broker/topics/<topic>/partitions,初始化每个Topic每个分区的AR集合
    controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
    controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
    controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
    // 所有可用 Broker 对应ZK中 /brokers/ids/<id> 节点添加BrokerModificationsHandler,用于监听Broker增减的变化
    registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
    // 初始化每个分区的 Leader、ISR 集合等信息
    updateLeaderAndIsrCache()
    // 启动 ControllerChannelManager
    startChannelManager()
    // 读取 /admin/reassign_partitions,初始化需要进行副本重分配的分区
    initializePartitionReassignment()
		...
  }

KafkaController 相关的 ZooKeeper 路径如下:

  • /controller :存放 Controller 的信息。记录 Controller 所在的 brokerid,节点类型是临时节点,与ZK会话失效时自动删除。ControllerChangeHandler 监听 Controller 的变更事件;
  • /brokers/ids :存放集群中所有可用的 Broker。BrokerChangeHandler 负责处理 Broker 的增减事件;
  • /brokers/topics :存放集群中所有的 Topic。TopicChangeHandler 负责处理 Topic 的增减事件;
  • /admin/delete_topics :存放删除的 Topic。TopicDeletionHandler 负责处理 Topic 的删除事件;
  • /brokers/topics/<topic>/partitions :存放 Topic 的分区。PartitionModificationsHandler 负责处理分区分配事件;
  • /brokers/topics/<topic>/partitions/<partition>/state :存放的是分区的 Leader 副本和 ISR 集合信息。PartitionReassignmentIsrChangeHandler 负责在副本分配到分区时,需要等待新副本追上Leader 副本后才能执行后续操作;
  • /admin/reassign_partitions :存放重新分配AR的路径,通过命令修改AR时会写入此路径。PartitionReassignmentHandler 负责执行分区重新分配;
  • /admin/preferred_replica_election :存放需要选举副本 Leader 的分区信息。PreferredReplicaElectionHandler 负责对指定分区进行有限副本选举。

4、Controller 的选举流程

KafkaController 触发选举的时机有三个:

  • KafkaController 启动时;
  • KafkaController 与 ZooKeeper 的会话过期;
  • ZooKeeper 中 /controller 临时节点不存在时;

每个 Broker 启动过程中,都会尝试在 ZK 创建 /controller 临时节点,同时也有可能其它 Broker 尝试创建,但只有创建成功的那个 Broker 才会成为 Controller,而创建失败的 Broker 选举失败。图 4-1 展示了 Broker0 成功创建临时节点并写入 Controller 信息;图 4-2 根据展示了当 Broker0 竞选成为 Controller,而 Broker1 和 Broker 2 竞选失败后作为 Follower 后不工作;

图 4-1

图 4-2

每个 Broker 都会在内存中保存当前 Controller 的 brokerid 的值,标识为 activeControllerId。


private def elect(): Unit = {
		...
		// 步骤1、获取ZK的/controller临时节点中brokerid的值,若节点不存在则取-1
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
      return
    }

    try {
			// 步骤2:尝试在ZK中创建/controller临时节点,若创建成功则竞选成为Controller
      zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
			info(s"${config.brokerId} successfully elected as the controller")
      activeControllerId = config.brokerId
			// 步骤3:更新/controller_epoch等信息
			onControllerFailover()
    } catch {
      ...
    }
}

private def onControllerFailover() {
    // 步骤4、读取 /controller_epoch 的值到 ControllerContext(上下文信息)
    // 步骤5、递增 ControllerEpoch,并更新到 /controller_epoch
    info("Reading controller epoch from ZooKeeper")
    readControllerEpochFromZooKeeper()
    info("Incrementing controller epoch in ZooKeeper")
    incrementControllerEpoch()
    info("Registering handlers")

		...
}

4.1、旧版的脑裂问题

早期 KAFKA 版本(例如 0.8.x)的脑裂问题 ISSUE:KAFKA-Split-Brain

上文介绍过,KAFKA 是依赖 ZooKeeper 来做 Leader/Follower HA:每个节点都尝试注册一个象征  Leader 的临时节点,其他未注册成功的则成为 Follower,并且通过监听器机制监听着 Leader 所创建的临时节点,ZooKeeper 通过内部心跳机制来确定 Leader 的状态,一旦 Leader 出现故障,ZooKeeper 能很快获悉并且通知其他的 Follower,其他 Follower 尝试竞选,这样就完成了一个新的 Leader 切换。但值得注意的是,短暂的时间内可能由于 Leader 的 GC 时间过长或者 ZooKeeper 节点间网络抖动导致心跳超时(Leader 假死),ZooKeeper 与 Leader 的会话超时随后 ZooKeeper 通知剩余的 Follower 重新竞选,Follower 中就有一个成为了“新Leader”,但“旧Leader”并未故障,此时可能有一部分的客户端已收到 ZooKeeper 的通知并连接到“新Leader”,有一部分客户端仍然连接在“旧Leader”,如果同时两个客户端分别对新旧 Leader 的同一个数据进行更新,就会出现很严重问题。

ZooKeeper 会话过期后通知 Follower 重新竞选的代码入口:kafka.zookeeper.ZooKeeperClient#scheduleSessionExpiryHandler

图 4-3 展示了一种可能发生脑裂过程,Broker 0 是 Controller,但由于负载过高发生长时间 FGC,耗时超过 zookeeper.session.timeout.ms 导致与 ZooKeeper 的心跳超时,ZooKeeper 监听到会话过期事件并通知 Broker 1 和 Broker 2,Broker 1 竞选成功成为新的 Leader(Controller),Broker 2 竞选失败继续成为 Follower,随后 Broker 0 GC 结束仍然以 Leader(Controller) 身份运行,此时集群中就有多个 Leader(Controller);

 

图 4-3

为了解决脑裂问题,新版本的 KAFKA 中新增了“控制器纪元”(ZooKeeper 中是持久节点,路径是 /controller_epoch),Integer 类型,用于跟踪并记录 Controller 发生变更的次数,表示当前的 Controller 是第几代控制器。controller_epoch 的初始值为 1,当 Controller 发生变更,每选出一个新的 Controller 就将该字段自增。每个和 Controller 交互的请求都会携带 controller_epoch 字段:

  • 若请求的 controller_epoch 值小于内存中的 epoch 值,则认为这个请求是发送给已过期的 Controller,那么这个请求会被认为是无效;
  • 若请求的 controller_epoch 值大于内存中的 epoch 值,则说明已经有新的 Controller 当选;

由此可见,KAFKA 通过 controller_epoch 来保证 Controller 的唯一性;

5、Controller 的事件队列模型

从上文可知,Controller 在选举成功后会读取 ZK 中各节点的数据来完成一系列的初始化工作,不管是监听器触发的事件,还是周期性任务触发事件,都会读取或更新 Controller 中的上下文信息,这样就涉及到了多线程同步,如果单纯使用锁机制来控制,那么整体性能会大打折扣,实际上,在 0.10 版本之前,都是通过锁机制的方式来处理上下文的多线程同步,在 0.11+ 版本以后改用单线程基于事件队列的模型,将每个事件都做一层封装,然后按事件发生的先后顺序暂存到 LinkedBlockingQueue 中,最后使用 ControllerEventThread 线程按照 FIFO 的原则来处理各个事件,这样不需要锁机制就可以在多线程间维护线程安全。

class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
                             eventProcessedListener: ControllerEvent => Unit) {
	...
  private val putLock = new ReentrantLock()
  private val queue = new LinkedBlockingQueue[ControllerEvent]
  private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)

  def start(): Unit = thread.start()

  def put(event: ControllerEvent): Unit = inLock(putLock) {
    queue.put(event)
  }

  class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
    override def doWork(): Unit = {
      queue.take() match {
				...
        case controllerEvent =>
          try {
            rateAndTimeMetrics(state).time {
              controllerEvent.process()
            }
          } catch {
            case e: Throwable => error(s"Error processing event $controllerEvent", e)
          }
				...
      }
    }
  }
}

在 KAFKA 早期版本中,并没有采用 KafkaController 这样一个概念来统一管理分区和副本的状态,而是依赖ZK,每个 Broker 都会在 ZK 上为分区和副本注册大量的 Watcher,当分区或副本状态变化时,会唤醒很多不必要的 Watcher,这种严重依赖 ZK 的设计会有脑裂、羊群效应,以及造成 ZK 过载的隐患;在 0.11+ 版本以后,只有 KafkaController 在 ZK 上注册相应的 Watcher,其它的 Broker 极少需要再监听 ZK 中的数据变化,不过每个 Broker 还是会对 /controller 节点添加监听器,用于监听 Controller 是否存活;

图 5-1

目录
相关文章
|
2月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
58 0
|
2月前
|
消息中间件 监控 Kafka
不愧是Alibaba技术官,Kafka的精髓全写这本“限量笔记”里,服了
分布式,是程序员必备技能之一,在面试过程中属于必备类的,在工作中更是会经常用到。而Kafka是一个分布式的基于发布订阅的消息队列,目前它的魅力是无穷的,对于Kafka的奥秘,还需要我们细细去探寻。
|
8月前
|
消息中间件 Java Kafka
微服务技术系列教程(36) - SpringCloud-使用Kafka实现消息驱动
微服务技术系列教程(36) - SpringCloud-使用Kafka实现消息驱动
55 0
|
2月前
|
消息中间件 分布式计算 大数据
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
149 0
|
24天前
|
消息中间件 存储 缓存
高性能、高可靠性!Kafka的技术优势与应用场景全解析
**Kafka** 是一款高吞吐、高性能的消息系统,擅长日志收集、消息传递和用户活动跟踪。其优点包括:零拷贝技术提高传输效率,顺序读写优化磁盘性能,持久化保障数据安全,分布式架构支持扩展,以及客户端状态维护确保可靠性。在实际应用中,Kafka常用于日志聚合、解耦生产者与消费者,以及实时用户行为分析。
55 3
|
28天前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
|
2月前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之使用DTS从RDSMySQL数据库同步数据到云Kafka,增量同步数据延迟时间超过1秒。如何诊断问题并降低延迟
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 监控 Java
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
84 0
|
2月前
|
消息中间件 存储 物联网
|
11月前
|
消息中间件 存储 Kafka
kafka是如何实现高性能高吞吐的?
以下是某网站上对该问题的总结,一共分为了以下六点,但这上面说的很浅显,我在后面加了一些自己的理解,做为解释,如有遗漏或者不对的地方欢迎大家指点,我会即时的修改,辛苦诸位老铁!
124 0

热门文章

最新文章