Apache Kafka源码分析 – Broker Server

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
注册配置 MSE Nacos/ZooKeeper,182元/月
简介:

1. Kafka.scala

在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装

   1: val kafkaServerStartble = new KafkaServerStartable(serverConfig)
   2: kafkaServerStartble.startup

 

   1: package kafka.server
   2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
   3:   private var server : KafkaServer = null
   4:  
   5:   private def init() {
   6:     server = new KafkaServer(serverConfig)
   7:   }
   8:  
   9:   def startup() {
  10:     try {
  11:       server.startup()
  12:     }
  13:     catch {...}
  14:   }
  15: }

2. KafkaServer

KafkaServer代表一个kafka broker, 这是kafka的核心. 
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧

   1: package kafka.server
   2: /**
   3:  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
   4:  * to start up and shutdown a single Kafka node.
   5:  */
   6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
   7:   var socketServer: SocketServer = null
   8:   var requestHandlerPool: KafkaRequestHandlerPool = null
   9:   var logManager: LogManager = null
  10:   var kafkaHealthcheck: KafkaHealthcheck = null
  11:   var topicConfigManager: TopicConfigManager = null
  12:   var replicaManager: ReplicaManager = null
  13:   var apis: KafkaApis = null
  14:   var kafkaController: KafkaController = null
  15:   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
  16:   var zkClient: ZkClient = null
  17:  
  18:   /**
  19:    * Start up API for bringing up a single instance of the Kafka server.
  20:    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
  21:    */
  22:   def startup() {
  23:     /* start scheduler */
  24:     kafkaScheduler.startup()
  25:     
  26:     /* setup zookeeper */
  27:     zkClient = initZk()
  28:  
  29:     /* start log manager */
  30:     logManager = createLogManager(zkClient)
  31:     logManager.startup()
  32:  
  33:     socketServer = new SocketServer(config.brokerId,
  34:                                     config.hostName,
  35:                                     config.port,
  36:                                     config.numNetworkThreads,
  37:                                     config.queuedMaxRequests,
  38:                                     config.socketSendBufferBytes,
  39:                                     config.socketReceiveBufferBytes,
  40:                                     config.socketRequestMaxBytes)
  41:     socketServer.startup()
  42:  
  43:     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
  44:     kafkaController = new KafkaController(config, zkClient)
  45:     
  46:     /* start processing requests */
  47:     apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
  48:     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
  49:    
  50:     replicaManager.startup()
  51:  
  52:     kafkaController.startup()
  53:     
  54:     topicConfigManager = new TopicConfigManager(zkClient, logManager)
  55:     topicConfigManager.startup()
  56:     
  57:     /* tell everyone we are alive */
  58:     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
  59:     kafkaHealthcheck.startup()
  60:   }

2.1 KafkaScheduler

KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现

   1: package kafka.utils
   2:  
   3: /**
   4:  * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
   5:  * 
   6:  * It has a pool of kafka-scheduler- threads that do the actual work.
   7:  * 
   8:  * @param threads The number of threads in the thread pool
   9:  * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.
  10:  * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.
  11:  */
  12: @threadsafe
  13: class KafkaScheduler(val threads: Int, 
  14:                      val threadNamePrefix: String = "kafka-scheduler-", 
  15:                      daemon: Boolean = true) extends Scheduler with Logging {
  16:   @volatile private var executor: ScheduledThreadPoolExecutor = null   
  17:   override def startup() {
  18:     this synchronized {
  19:       executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor
  20:       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
  21:       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
  22:       executor.setThreadFactory(new ThreadFactory() {
  23:                                   def newThread(runnable: Runnable): Thread = 
  24:                                     Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
  25:                                 })
  26:     }
  27:   }
  28:  
  29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
  30:   val runnable = new Runnable { //将fun封装成Runnable
  31:     def run() = {
  32:       try {
  33:         fun()
  34:       } catch {...} 
  35:       finally {...}
  36:     }
  37:   }
  38:   if(period >= 0) //在pool中进行delay schedule
  39:     executor.scheduleAtFixedRate(runnable, delay, period, unit)
  40:   else
  41:     executor.schedule(runnable, delay, unit)
  42: }

2.2 Zookeeper Client

由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信

2.3 logManager

The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. 
Apache Kafka源码分析 – Log Management

 

2.4 ReplicaManager

在0.8中新加入的replica相关模块

Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
Apache Kafka源码分析 – ReplicaManager

 

2.5 Kafka Socket Server

首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的

socketServer = new SocketServer(config.brokerId...)

KafkaApis
该类封装了所有request的处理逻辑

KafkaRequestHandler

 

2.6 offsetManager

offsetManager = createOffsetManager()
定期清除过期的offset数据,即compact操作,

scheduler.schedule(name = "offsets-cache-compactor",
                     fun = compact,
                     period = config.offsetsRetentionCheckIntervalMs,
                     unit = TimeUnit.MILLISECONDS)

以及consumer相关的一些offset操作,不细究了,因为我们不用highlevel consumer

 

2.7 KafkaController

kafkaController = new KafkaController(config, zkClient, brokerState)

Apache Kafka源码分析 – Controller

0.8后,为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性

2.8 TopicConfigManager

 
topicConfigManager = new TopicConfigManager(zkClient, logManager)

TopicConfigManager用于处理topic config的change,kafka除了全局的配置,还有一种叫Topic-level configuration

> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --config max.message.bytes=128000

比如你可以这样设置,那么这些topic config如何生效的?

topic-level config默认是被存储在,

/brokers/topics/<topic_name>/config
但是topic很多的情况下,为了避免创建太多的watcher,

所以单独创建一个目录

/brokers/config_changes

来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化

/brokers/config_changes/config_change_13321

并且这个通知有个suffix,用于区别是否已处理过

复制代码
/**
   * Process the given list of config changes
   */
  private def processConfigChanges(notifications: Seq[String]) {
    if (notifications.size > 0) {
      info("Processing config change notification(s)...")
      val now = time.milliseconds
      val logs = logManager.logsByTopicPartition.toBuffer
      val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
      for (notification <- notifications) {
        val changeId = changeNumber(notification)
        if (changeId > lastExecutedChange) {  //未处理过
          val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
          if(jsonOpt.isDefined) {
            val json = jsonOpt.get
            val topic = json.substring(1, json.length - 1) // hacky way to dequote,从通知中获取topic name
            if (logsByTopic.contains(topic)) {
              /* combine the default properties with the overrides in zk to create the new LogConfig */
              val props = new Properties(logManager.defaultConfig.toProps)
              props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
              val logConfig = LogConfig.fromProps(props)
              for (log <- logsByTopic(topic))
                log.config = logConfig    //真正的更新log配置
              info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
              purgeObsoleteNotifications(now, notifications) //删除过期的notification,10分钟
            }
          }
          lastExecutedChange = changeId
        }
      }
    }
  }
复制代码
这个failover也没问题,反正配置设置多次也是无害的,每次启动都会把所有没过期的notification处理一遍

并且broker重启后是会从zk中, loading完整的配置的,所以也ok的,这个主要用于实时更新topic的配置

 

2.8 KafkaHealthcheck

kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)

这个很简单,就像注释的,告诉所有人我还活着。。。

实现就是在,

 /brokers/[0...N] --> advertisedHost:advertisedPort

register一个ephemeral znode,当SessionExpired时,再去register,典型zk应用
所以只需要watch这个路径就是知道broker是否还活着

2.9 ContolledShutdown

对于0.8之前,broker的startup和shutdown都很简单,把上面这些组件初始化,或stop就可以了

但是0.8后,增加replica,所以broker不能自己直接shutdown,需要先通知controller,controller做完处理后,比如partition leader的迁移,或replica offline,然后才能shutdown

private def controlledShutdown()

挺长的,逻辑就是找到controller,发送ControlledShutdownRequest,然后等待返回,如果失败,就是unclean shutdown


本文章摘自博客园,原文发布日期: 2014-02-14

目录
相关文章
|
2月前
|
消息中间件 监控 Java
Apache Kafka 分布式流处理平台技术详解与实践指南
本文档全面介绍 Apache Kafka 分布式流处理平台的核心概念、架构设计和实践应用。作为高吞吐量、低延迟的分布式消息系统,Kafka 已成为现代数据管道和流处理应用的事实标准。本文将深入探讨其生产者-消费者模型、主题分区机制、副本复制、流处理API等核心机制,帮助开发者构建可靠、可扩展的实时数据流处理系统。
336 4
|
4月前
|
消息中间件 存储 监控
Apache Kafka 3.0与KRaft模式的革新解读
在该架构中,Kafka集群依旧包含多个broker节点,但已不再依赖ZooKeeper集群。被选中的Kafka集群Controller将从KRaft Quorum中加载其状态,并在必要时通知其他Broker节点关于元数据的变更。这种设计支持更多分区与快速Controller切换,并有效避免了因数据不一致导致的问题。
|
12月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
615 5
|
12月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
448 1
|
1月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
321 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
284 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
3月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
488 9
Apache Flink:从实时数据分析到实时AI
|
3月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
439 0
|
2月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
1094 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
3月前
|
存储 人工智能 数据处理
对话王峰:Apache Flink 在 AI 时代的“剑锋”所向
Flink 2.0 架构升级实现存算分离,迈向彻底云原生化,支持更大规模状态管理、提升资源效率、增强容灾能力。通过流批一体与 AI 场景融合,推动实时计算向智能化演进。生态项目如 Paimon、Fluss 和 Flink CDC 构建湖流一体架构,实现分钟级时效性与低成本平衡。未来,Flink 将深化 AI Agents 框架,引领事件驱动的智能数据处理新方向。
400 6

推荐镜像

更多
下一篇
oss云网关配置