Apache Kafka源码分析 – Broker Server-阿里云开发者社区

开发者社区> 寒凝雪> 正文

Apache Kafka源码分析 – Broker Server

简介:
+关注继续查看

1. Kafka.scala

在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装

   1: val kafkaServerStartble = new KafkaServerStartable(serverConfig)
   2: kafkaServerStartble.startup

 

   1: package kafka.server
   2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
   3:   private var server : KafkaServer = null
   4:  
   5:   private def init() {
   6:     server = new KafkaServer(serverConfig)
   7:   }
   8:  
   9:   def startup() {
  10:     try {
  11:       server.startup()
  12:     }
  13:     catch {...}
  14:   }
  15: }

2. KafkaServer

KafkaServer代表一个kafka broker, 这是kafka的核心. 
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧

   1: package kafka.server
   2: /**
   3:  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
   4:  * to start up and shutdown a single Kafka node.
   5:  */
   6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
   7:   var socketServer: SocketServer = null
   8:   var requestHandlerPool: KafkaRequestHandlerPool = null
   9:   var logManager: LogManager = null
  10:   var kafkaHealthcheck: KafkaHealthcheck = null
  11:   var topicConfigManager: TopicConfigManager = null
  12:   var replicaManager: ReplicaManager = null
  13:   var apis: KafkaApis = null
  14:   var kafkaController: KafkaController = null
  15:   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
  16:   var zkClient: ZkClient = null
  17:  
  18:   /**
  19:    * Start up API for bringing up a single instance of the Kafka server.
  20:    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
  21:    */
  22:   def startup() {
  23:     /* start scheduler */
  24:     kafkaScheduler.startup()
  25:     
  26:     /* setup zookeeper */
  27:     zkClient = initZk()
  28:  
  29:     /* start log manager */
  30:     logManager = createLogManager(zkClient)
  31:     logManager.startup()
  32:  
  33:     socketServer = new SocketServer(config.brokerId,
  34:                                     config.hostName,
  35:                                     config.port,
  36:                                     config.numNetworkThreads,
  37:                                     config.queuedMaxRequests,
  38:                                     config.socketSendBufferBytes,
  39:                                     config.socketReceiveBufferBytes,
  40:                                     config.socketRequestMaxBytes)
  41:     socketServer.startup()
  42:  
  43:     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
  44:     kafkaController = new KafkaController(config, zkClient)
  45:     
  46:     /* start processing requests */
  47:     apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
  48:     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
  49:    
  50:     replicaManager.startup()
  51:  
  52:     kafkaController.startup()
  53:     
  54:     topicConfigManager = new TopicConfigManager(zkClient, logManager)
  55:     topicConfigManager.startup()
  56:     
  57:     /* tell everyone we are alive */
  58:     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
  59:     kafkaHealthcheck.startup()
  60:   }

2.1 KafkaScheduler

KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现

   1: package kafka.utils
   2:  
   3: /**
   4:  * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
   5:  * 
   6:  * It has a pool of kafka-scheduler- threads that do the actual work.
   7:  * 
   8:  * @param threads The number of threads in the thread pool
   9:  * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.
  10:  * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.
  11:  */
  12: @threadsafe
  13: class KafkaScheduler(val threads: Int, 
  14:                      val threadNamePrefix: String = "kafka-scheduler-", 
  15:                      daemon: Boolean = true) extends Scheduler with Logging {
  16:   @volatile private var executor: ScheduledThreadPoolExecutor = null   
  17:   override def startup() {
  18:     this synchronized {
  19:       executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor
  20:       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
  21:       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
  22:       executor.setThreadFactory(new ThreadFactory() {
  23:                                   def newThread(runnable: Runnable): Thread = 
  24:                                     Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
  25:                                 })
  26:     }
  27:   }
  28:  
  29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
  30:   val runnable = new Runnable { //将fun封装成Runnable
  31:     def run() = {
  32:       try {
  33:         fun()
  34:       } catch {...} 
  35:       finally {...}
  36:     }
  37:   }
  38:   if(period >= 0) //在pool中进行delay schedule
  39:     executor.scheduleAtFixedRate(runnable, delay, period, unit)
  40:   else
  41:     executor.schedule(runnable, delay, unit)
  42: }

2.2 Zookeeper Client

由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信

2.3 logManager

The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. 
Apache Kafka源码分析 – Log Management

 

2.4 ReplicaManager

在0.8中新加入的replica相关模块

Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
Apache Kafka源码分析 – ReplicaManager

 

2.5 Kafka Socket Server

首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的

socketServer = new SocketServer(config.brokerId...)

KafkaApis
该类封装了所有request的处理逻辑

KafkaRequestHandler

 

2.6 offsetManager

offsetManager = createOffsetManager()
定期清除过期的offset数据,即compact操作,

scheduler.schedule(name = "offsets-cache-compactor",
                     fun = compact,
                     period = config.offsetsRetentionCheckIntervalMs,
                     unit = TimeUnit.MILLISECONDS)

以及consumer相关的一些offset操作,不细究了,因为我们不用highlevel consumer

 

2.7 KafkaController

kafkaController = new KafkaController(config, zkClient, brokerState)

Apache Kafka源码分析 – Controller

0.8后,为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性

2.8 TopicConfigManager


topicConfigManager = new TopicConfigManager(zkClient, logManager)

TopicConfigManager用于处理topic config的change,kafka除了全局的配置,还有一种叫Topic-level configuration

> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --config max.message.bytes=128000

比如你可以这样设置,那么这些topic config如何生效的?

topic-level config默认是被存储在,

/brokers/topics/<topic_name>/config
但是topic很多的情况下,为了避免创建太多的watcher,

所以单独创建一个目录

/brokers/config_changes

来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化

/brokers/config_changes/config_change_13321

并且这个通知有个suffix,用于区别是否已处理过

复制代码
/**
   * Process the given list of config changes
   */
  private def processConfigChanges(notifications: Seq[String]) {
    if (notifications.size > 0) {
      info("Processing config change notification(s)...")
      val now = time.milliseconds
      val logs = logManager.logsByTopicPartition.toBuffer
      val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
      for (notification <- notifications) {
        val changeId = changeNumber(notification)
        if (changeId > lastExecutedChange) {  //未处理过
          val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
          if(jsonOpt.isDefined) {
            val json = jsonOpt.get
            val topic = json.substring(1, json.length - 1) // hacky way to dequote,从通知中获取topic name
            if (logsByTopic.contains(topic)) {
              /* combine the default properties with the overrides in zk to create the new LogConfig */
              val props = new Properties(logManager.defaultConfig.toProps)
              props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
              val logConfig = LogConfig.fromProps(props)
              for (log <- logsByTopic(topic))
                log.config = logConfig    //真正的更新log配置
              info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
              purgeObsoleteNotifications(now, notifications) //删除过期的notification,10分钟
            }
          }
          lastExecutedChange = changeId
        }
      }
    }
  }
复制代码
这个failover也没问题,反正配置设置多次也是无害的,每次启动都会把所有没过期的notification处理一遍

并且broker重启后是会从zk中, loading完整的配置的,所以也ok的,这个主要用于实时更新topic的配置

 

2.8 KafkaHealthcheck

kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)

这个很简单,就像注释的,告诉所有人我还活着。。。

实现就是在,

 /brokers/[0...N] --> advertisedHost:advertisedPort

register一个ephemeral znode,当SessionExpired时,再去register,典型zk应用
所以只需要watch这个路径就是知道broker是否还活着

2.9 ContolledShutdown

对于0.8之前,broker的startup和shutdown都很简单,把上面这些组件初始化,或stop就可以了

但是0.8后,增加replica,所以broker不能自己直接shutdown,需要先通知controller,controller做完处理后,比如partition leader的迁移,或replica offline,然后才能shutdown

private def controlledShutdown()

挺长的,逻辑就是找到controller,发送ControlledShutdownRequest,然后等待返回,如果失败,就是unclean shutdown


本文章摘自博客园,原文发布日期: 2014-02-14

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
统计分析SQL Server Profiler 跟踪的SQL
--跟踪文件读入到表中分析 SELECT * INTO ZGSJY FROM fn_trace_gettable('E:\wxxcdbprofiler.trc', default); --某时间内,最耗时SQL select TOP 100 SUBSTRING(Textdata,1,660) as '名称', count(*) as '数量', sum(duration/1000) as
685 0
1月28日云栖精选夜读 | 终于等到你!阿里正式向 Apache Flink 贡献 Blink 源码
如同我们去年12月在 Flink Forward China 峰会所约,阿里巴巴内部 Flink 版本 Blink 将于 2019 年 1 月底正式开源。今天,我们终于等到了这一刻。
3865 0
从源码分析如何优雅的使用 Kafka 生产者
前言 在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
524 0
高性能网络I/O框架-netmap源码分析(5)
高性能网络I/O框架-netmap源码分析(5) 作者:gfree.wind@gmail.com 博客:blog.focus-linux.net linuxfocus.blog.chinaunix.net 微博:weibo.com/glinuxer QQ技术群:4367710 今天继续前面的netmap_ioctl netmap_ioctl 上次分析完了NIOCGINFO和NIOCREGIF两个,剩下的比较简单了。
674 0
仿酷狗音乐播放器开发日志二十二 动态调色板控件第二版(性能大幅提升附源码)
转载请说明原出处,谢谢~~         在上次写的博客《仿酷狗音乐播放器开发日志二十一 开发动态调色板控件(附源码)》发布后,我在群里和网友讨论这个控件的性能和优 缺点,发现了他很多不足,还有很多提升空间,之后我简单的修改了代码提升了控件的响应速度。
853 0
LinkedHashMap源码分析(基于JDK1.6)
LinkedHashMap类似于HashMap,但是迭代遍历它时,取得“键值对”的顺序是插入次序,或者是最近最少使用(LRU)的次序。只比HashMap慢一点;而在迭代访问时反而更快,因为它使用链表维护内部次序(HashMap是基于散列表实现的,相关HashMap的内容可以看《Java集合类》和《HashMap源码分析》)。
575 0
TreeMap源码分析——基础分析(基于JDK1.6)
常见的数据结构有数组、链表,还有一种结构也很常见,那就是树。前面介绍的集合类有基于数组的ArrayList,有基于链表的LinkedList,还有链表和数组结合的HashMap,今天介绍基于树的TreeMap。
401 0
hbase源码系列(三)Client如何找到正确的Region Server
Client如何找到正确的Region Server ?
2014 0
+关注
5854
文章
223
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载