Apache Kafka源码分析 – Broker Server

简介:

1. Kafka.scala

在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装

   1: val kafkaServerStartble = new KafkaServerStartable(serverConfig)
   2: kafkaServerStartble.startup

 

   1: package kafka.server
   2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
   3:   private var server : KafkaServer = null
   4:  
   5:   private def init() {
   6:     server = new KafkaServer(serverConfig)
   7:   }
   8:  
   9:   def startup() {
  10:     try {
  11:       server.startup()
  12:     }
  13:     catch {...}
  14:   }
  15: }

2. KafkaServer

KafkaServer代表一个kafka broker, 这是kafka的核心. 
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧

   1: package kafka.server
   2: /**
   3:  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
   4:  * to start up and shutdown a single Kafka node.
   5:  */
   6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
   7:   var socketServer: SocketServer = null
   8:   var requestHandlerPool: KafkaRequestHandlerPool = null
   9:   var logManager: LogManager = null
  10:   var kafkaHealthcheck: KafkaHealthcheck = null
  11:   var topicConfigManager: TopicConfigManager = null
  12:   var replicaManager: ReplicaManager = null
  13:   var apis: KafkaApis = null
  14:   var kafkaController: KafkaController = null
  15:   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
  16:   var zkClient: ZkClient = null
  17:  
  18:   /**
  19:    * Start up API for bringing up a single instance of the Kafka server.
  20:    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
  21:    */
  22:   def startup() {
  23:     /* start scheduler */
  24:     kafkaScheduler.startup()
  25:     
  26:     /* setup zookeeper */
  27:     zkClient = initZk()
  28:  
  29:     /* start log manager */
  30:     logManager = createLogManager(zkClient)
  31:     logManager.startup()
  32:  
  33:     socketServer = new SocketServer(config.brokerId,
  34:                                     config.hostName,
  35:                                     config.port,
  36:                                     config.numNetworkThreads,
  37:                                     config.queuedMaxRequests,
  38:                                     config.socketSendBufferBytes,
  39:                                     config.socketReceiveBufferBytes,
  40:                                     config.socketRequestMaxBytes)
  41:     socketServer.startup()
  42:  
  43:     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
  44:     kafkaController = new KafkaController(config, zkClient)
  45:     
  46:     /* start processing requests */
  47:     apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
  48:     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
  49:    
  50:     replicaManager.startup()
  51:  
  52:     kafkaController.startup()
  53:     
  54:     topicConfigManager = new TopicConfigManager(zkClient, logManager)
  55:     topicConfigManager.startup()
  56:     
  57:     /* tell everyone we are alive */
  58:     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
  59:     kafkaHealthcheck.startup()
  60:   }

2.1 KafkaScheduler

KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现

   1: package kafka.utils
   2:  
   3: /**
   4:  * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
   5:  * 
   6:  * It has a pool of kafka-scheduler- threads that do the actual work.
   7:  * 
   8:  * @param threads The number of threads in the thread pool
   9:  * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.
  10:  * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.
  11:  */
  12: @threadsafe
  13: class KafkaScheduler(val threads: Int, 
  14:                      val threadNamePrefix: String = "kafka-scheduler-", 
  15:                      daemon: Boolean = true) extends Scheduler with Logging {
  16:   @volatile private var executor: ScheduledThreadPoolExecutor = null   
  17:   override def startup() {
  18:     this synchronized {
  19:       executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor
  20:       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
  21:       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
  22:       executor.setThreadFactory(new ThreadFactory() {
  23:                                   def newThread(runnable: Runnable): Thread = 
  24:                                     Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
  25:                                 })
  26:     }
  27:   }
  28:  
  29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
  30:   val runnable = new Runnable { //将fun封装成Runnable
  31:     def run() = {
  32:       try {
  33:         fun()
  34:       } catch {...} 
  35:       finally {...}
  36:     }
  37:   }
  38:   if(period >= 0) //在pool中进行delay schedule
  39:     executor.scheduleAtFixedRate(runnable, delay, period, unit)
  40:   else
  41:     executor.schedule(runnable, delay, unit)
  42: }

2.2 Zookeeper Client

由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信

2.3 logManager

The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. 
Apache Kafka源码分析 – Log Management

 

2.4 ReplicaManager

在0.8中新加入的replica相关模块

Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
Apache Kafka源码分析 – ReplicaManager

 

2.5 Kafka Socket Server

首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的

socketServer = new SocketServer(config.brokerId...)

KafkaApis
该类封装了所有request的处理逻辑

KafkaRequestHandler

 

2.6 offsetManager

offsetManager = createOffsetManager()
定期清除过期的offset数据,即compact操作,

scheduler.schedule(name = "offsets-cache-compactor",
                     fun = compact,
                     period = config.offsetsRetentionCheckIntervalMs,
                     unit = TimeUnit.MILLISECONDS)

以及consumer相关的一些offset操作,不细究了,因为我们不用highlevel consumer

 

2.7 KafkaController

kafkaController = new KafkaController(config, zkClient, brokerState)

Apache Kafka源码分析 – Controller

0.8后,为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性

2.8 TopicConfigManager

 
topicConfigManager = new TopicConfigManager(zkClient, logManager)

TopicConfigManager用于处理topic config的change,kafka除了全局的配置,还有一种叫Topic-level configuration

> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --config max.message.bytes=128000

比如你可以这样设置,那么这些topic config如何生效的?

topic-level config默认是被存储在,

/brokers/topics/<topic_name>/config
但是topic很多的情况下,为了避免创建太多的watcher,

所以单独创建一个目录

/brokers/config_changes

来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化

/brokers/config_changes/config_change_13321

并且这个通知有个suffix,用于区别是否已处理过

复制代码
/**
   * Process the given list of config changes
   */
  private def processConfigChanges(notifications: Seq[String]) {
    if (notifications.size > 0) {
      info("Processing config change notification(s)...")
      val now = time.milliseconds
      val logs = logManager.logsByTopicPartition.toBuffer
      val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
      for (notification <- notifications) {
        val changeId = changeNumber(notification)
        if (changeId > lastExecutedChange) {  //未处理过
          val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
          if(jsonOpt.isDefined) {
            val json = jsonOpt.get
            val topic = json.substring(1, json.length - 1) // hacky way to dequote,从通知中获取topic name
            if (logsByTopic.contains(topic)) {
              /* combine the default properties with the overrides in zk to create the new LogConfig */
              val props = new Properties(logManager.defaultConfig.toProps)
              props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
              val logConfig = LogConfig.fromProps(props)
              for (log <- logsByTopic(topic))
                log.config = logConfig    //真正的更新log配置
              info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
              purgeObsoleteNotifications(now, notifications) //删除过期的notification,10分钟
            }
          }
          lastExecutedChange = changeId
        }
      }
    }
  }
复制代码
这个failover也没问题,反正配置设置多次也是无害的,每次启动都会把所有没过期的notification处理一遍

并且broker重启后是会从zk中, loading完整的配置的,所以也ok的,这个主要用于实时更新topic的配置

 

2.8 KafkaHealthcheck

kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)

这个很简单,就像注释的,告诉所有人我还活着。。。

实现就是在,

 /brokers/[0...N] --> advertisedHost:advertisedPort

register一个ephemeral znode,当SessionExpired时,再去register,典型zk应用
所以只需要watch这个路径就是知道broker是否还活着

2.9 ContolledShutdown

对于0.8之前,broker的startup和shutdown都很简单,把上面这些组件初始化,或stop就可以了

但是0.8后,增加replica,所以broker不能自己直接shutdown,需要先通知controller,controller做完处理后,比如partition leader的迁移,或replica offline,然后才能shutdown

private def controlledShutdown()

挺长的,逻辑就是找到controller,发送ControlledShutdownRequest,然后等待返回,如果失败,就是unclean shutdown


本文章摘自博客园,原文发布日期: 2014-02-14

目录
相关文章
|
7月前
|
消息中间件 监控 Java
Apache Kafka 分布式流处理平台技术详解与实践指南
本文档全面介绍 Apache Kafka 分布式流处理平台的核心概念、架构设计和实践应用。作为高吞吐量、低延迟的分布式消息系统,Kafka 已成为现代数据管道和流处理应用的事实标准。本文将深入探讨其生产者-消费者模型、主题分区机制、副本复制、流处理API等核心机制,帮助开发者构建可靠、可扩展的实时数据流处理系统。
675 4
|
9月前
|
消息中间件 存储 监控
Apache Kafka 3.0与KRaft模式的革新解读
在该架构中,Kafka集群依旧包含多个broker节点,但已不再依赖ZooKeeper集群。被选中的Kafka集群Controller将从KRaft Quorum中加载其状态,并在必要时通知其他Broker节点关于元数据的变更。这种设计支持更多分区与快速Controller切换,并有效避免了因数据不一致导致的问题。
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
788 5
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
713 1
|
6月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1116 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
550 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
8月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
940 9
Apache Flink:从实时数据分析到实时AI
|
8月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
809 0
|
7月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
2519 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
1054 33
The Past, Present and Future of Apache Flink

推荐镜像

更多
下一篇
开通oss服务