【kafka源码】Topic的创建源码分析(附视频)

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
全局流量管理 GTM,标准版 1个月
简介: 【kafka源码】Topic的创建源码分析(附视频)

提示:本文可能已过期,请点击原文查看:创建Topic的源码解析


文章目录

脚本参数

创建Topic脚本

创建Topic 源码分析

1. 源码入口

2. 创建AdminClientTopicService 对象

2.1 先创建 Admin

3. AdminClientTopicService.createTopic 创建Topic

3.1 KafkaAdminClient.createTopics(NewTopic) 创建Topic

4. 发起网络请求

5. Controller角色的服务端接受请求处理逻辑

5.1 KafkaApis.handle(request) 根据请求传递Api调用不同接口

5.2 KafkaApis.handleCreateTopicsRequest 处理创建Topic的请求

5.3 adminManager.createTopics()

5.4 写入zookeeper数据

6. Controller监听 `/brokers/topics/Topic名称`, 通知Broker将分区写入磁盘

6.1 onNewPartitionCreation 状态流转

7. Broker收到LeaderAndIsrRequest 创建本地Log

源码总结

Q&A

创建Topic的时候 在Zk上创建了哪些节点

创建Topic的时候 什么时候在Broker磁盘上创建的日志文件

如果我没有指定分区数或者副本数,那么会如何创建

如果我手动删除了`/brokers/topics/`下的某个节点会怎么样?

如果我手动在zk中添加`/brokers/topics/{TopicName}`节点会怎么样

如果写入`/brokers/topics/{TopicName}`节点之后Controller挂掉了会怎么样

附件

--config 可生效参数


image.png

image.png

Topic创建流程分析+常见问题


全套视频请关注公众号:石臻臻的杂货铺(首发)


脚本参数

sh bin/kafka-topic -help 查看更具体参数


下面只是列出了跟--create 相关的参数


参数 描述 例子

--bootstrap-server 指定kafka服务 指定连接到的kafka服务; 如果有这个参数,则 --zookeeper可以不需要 –bootstrap-server localhost:9092

--zookeeper 弃用, 通过zk的连接方式连接到kafka集群; –zookeeper localhost:2181 或者localhost:2181/kafka

--replication-factor 副本数量,注意不能大于broker数量;如果不提供,则会用集群中默认配置 –replication-factor 3

--partitions 分区数量 当创建或者修改topic的时候,用这个来指定分区数;如果创建的时候没有提供参数,则用集群中默认值; 注意如果是修改的时候,分区比之前小会有问题

--replica-assignment 副本分区分配方式;创建topic的时候可以自己指定副本分配情况; --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本

--config 用来设置topic级别的配置以覆盖默认配置;只在–create 和–bootstrap-server 同时使用时候生效; 可以配置的参数列表请看文末附件 例如覆盖两个配置 --config retention.bytes=123455 --config retention.ms=600001

--command-config 用来配置客户端Admin Client启动配置,只在–bootstrap-server 同时使用时候生效; 例如:设置请求的超时时间 --command-config config/producer.proterties; 然后在文件中配置 request.timeout.ms=300000

--create 命令方式; 表示当前请求是创建Topic --create

创建Topic脚本

zk方式(不推荐)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test

需要注意的是–zookeeper后面接的是kafka的zk配置, 假如你配置的是localhost:2181/kafka 带命名空间的这种,不要漏掉了

kafka版本 >= 2.2 支持下面方式(推荐)

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test

当前分析的kafka源码版本为 kafka-2.5


创建Topic 源码分析

温馨提示: 如果阅读源码略显枯燥,你可以直接看源码总结以及后面部分


首先我们找到源码入口处, 查看一下 kafka-topic.sh脚本的内容

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

最终是执行了kafka.admin.TopicCommand这个类,找到这个地方之后就可以断点调试源码了,用IDEA启动


image.png

image.png

记得配置一下入参

比如: --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic test_create_topic3

image.png

1. 源码入口


image.png

上面的源码主要作用是


根据是否有传入参数--zookeeper 来判断创建哪一种 对象topicService

如果传入了--zookeeper 则创建 类 ZookeeperTopicService的对象

否则创建类AdminClientTopicService的对象(我们主要分析这个对象)

根据传入的参数类型判断是创建topic还是删除等等其他 判断依据是 是否在参数里传入了--create

2. 创建AdminClientTopicService 对象

val topicService = new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))


2.1 先创建 Admin

object AdminClientTopicService {
    def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {
      bootstrapServer match {
        case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
        case None =>
      }
      Admin.create(commandConfig)
    }
    def apply(commandConfig: Properties, bootstrapServer: Option[String]): AdminClientTopicService =
      new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))
  }

如果有入参--command-config ,则将这个文件里面的参数都放到map commandConfig里面, 并且也加入bootstrap.servers的参数;假如配置文件里面已经有了bootstrap.servers配置,那么会将其覆盖

将上面的commandConfig 作为入参调用Admin.create(commandConfig)创建 Admin; 这个时候调用的Client模块的代码了, 从这里我们就可以看出,我们调用kafka-topic.sh脚本实际上是kafka模拟了一个客户端Client来创建Topic的过程;


image.png

image.png

3. AdminClientTopicService.createTopic 创建Topic

topicService.createTopic(opts)

  case class AdminClientTopicService private (adminClient: Admin) extends TopicService {
    override def createTopic(topic: CommandTopicPartition): Unit = {
      //如果配置了副本副本数--replication-factor 一定要大于0
      if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
        throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
       //如果配置了--partitions 分区数 必须大于0
      if (topic.partitions.exists(partitions => partitions < 1))
        throw new IllegalArgumentException(s"The partitions must be greater than 0")
    //查询是否已经存在该Topic
      if (!adminClient.listTopics().names().get().contains(topic.name)) {
        val newTopic = if (topic.hasReplicaAssignment)
          //如果指定了--replica-assignment参数;则按照指定的来分配副本
          new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
        else {
          new NewTopic(
            topic.name,
            topic.partitions.asJava,
            topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
        }
        // 将配置--config 解析成一个配置map
        val configsMap = topic.configsToAdd.stringPropertyNames()
          .asScala
          .map(name => name -> topic.configsToAdd.getProperty(name))
          .toMap.asJava
        newTopic.configs(configsMap)
        //调用adminClient创建Topic
        val createResult = adminClient.createTopics(Collections.singleton(newTopic))
        createResult.all().get()
        println(s"Created topic ${topic.name}.")
      } else {
        throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
      }
    }

检查各项入参是否有问题

adminClient.listTopics(),然后比较是否已经存在待创建的Topic;如果存在抛出异常;

判断是否配置了参数--replica-assignment ; 如果配置了,那么Topic就会按照指定的方式来配置副本情况

解析配置--config 配置放到configsMap中; configsMap给到NewTopic对象

调用adminClient.createTopics创建Topic; 它是如何创建Topic的呢?往下分析源码

3.1 KafkaAdminClient.createTopics(NewTopic) 创建Topic

    @Override
    public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                           final CreateTopicsOptions options) {
       //省略部分源码...
        Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
            new ControllerNodeProvider()) {
            @Override
            public CreateTopicsRequest.Builder createRequest(int timeoutMs) {
                return new CreateTopicsRequest.Builder(
                    new CreateTopicsRequestData().
                        setTopics(topics).
                        setTimeoutMs(timeoutMs).
                        setValidateOnly(options.shouldValidateOnly()));
            }
            @Override
            public void handleResponse(AbstractResponse abstractResponse) {
                //省略
            }
            @Override
            void handleFailure(Throwable throwable) {
                completeAllExceptionally(topicFutures.values(), throwable);
            }
        };
    }

这个代码里面主要看下Call里面的接口; 先不管Kafka如何跟服务端进行通信的细节; 我们主要关注创建Topic的逻辑;

  1. createRequest会构造一个请求参数CreateTopicsRequest 例如下图

image.png

选择ControllerNodeProvider这个节点发起网络请求

image.png

可以清楚的看到, 创建Topic这个操作是需要Controller来执行的;

image.png

4. 发起网络请求

==>服务端客户端网络模型


5. Controller角色的服务端接受请求处理逻辑

首先找到服务端处理客户端请求的 源码入口 ⇒ KafkaRequestHandler.run()


主要看里面的 apis.handle(request) 方法; 可以看到客户端的请求都在request.bodyAndSize()里面

image.png

5.1 KafkaApis.handle(request) 根据请求传递Api调用不同接口

进入方法可以看到根据request.header.apiKey 调用对应的方法,客户端传过来的是CreateTopics

image.png

5.2 KafkaApis.handleCreateTopicsRequest 处理创建Topic的请求

def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
    // 部分代码省略
  //如果当前Broker不是属于Controller的话,就抛出异常
    if (!controller.isActive) {
      createTopicsRequest.data.topics.asScala.foreach { topic =>
        results.add(new CreatableTopicResult().setName(topic.name).
          setErrorCode(Errors.NOT_CONTROLLER.code))
      }
      sendResponseCallback(results)
    } else {
     // 部分代码省略
    }
      adminManager.createTopics(createTopicsRequest.data.timeoutMs,
          createTopicsRequest.data.validateOnly,
          toCreate,
          authorizedForDescribeConfigs,
          handleCreateTopicsResults)
    }
  }

判断当前处理的broker是不是Controller,如果不是Controller的话直接抛出异常,从这里可以看出,CreateTopic这个操作必须是Controller来进行, 出现这种情况有可能是客户端发起请求的时候Controller已经变更;

鉴权 【Kafka源码】kafka鉴权机制

调用adminManager.createTopics()

5.3 adminManager.createTopics()

创建主题并等等主题完全创建,回调函数将会在超时、错误、或者主题创建完成时触发


该方法过长,省略部分代码

def createTopics(timeout: Int,
                   validateOnly: Boolean,
                   toCreate: Map[String, CreatableTopic],
                   includeConfigsAndMetatadata: Map[String, CreatableTopicResult],
                   responseCallback: Map[String, ApiError] => Unit): Unit = {
    // 1. map over topics creating assignment and calling zookeeper
    val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
    val metadata = toCreate.values.map(topic =>
      try {
          //省略部分代码
         //检查Topic是否存在
         //检查 --replica-assignment参数和 (--partitions || --replication-factor ) 不能同时使用
         // 如果(--partitions || --replication-factor ) 没有设置,则使用 Broker的配置(这个Broker肯定是Controller)
    // 计算分区副本分配方式
        createTopicPolicy match {
          case Some(policy) =>
          //省略部分代码
            adminZkClient.validateTopicCreate(topic.name(), assignments, configs)
            if (!validateOnly)
              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
          case None =>
            if (validateOnly)
             //校验创建topic的参数准确性
              adminZkClient.validateTopicCreate(topic.name, assignments, configs)
            else
              //把topic相关数据写入到zk中
              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
        }
  }

做一些校验检查

①.检查Topic是否存在

②. 检查--replica-assignment参数和 (--partitions || --replication-factor ) 不能同时使用

③.如果(--partitions || --replication-factor ) 没有设置,则使用 Broker的配置(这个Broker肯定是Controller)

④.计算分区副本分配方式


createTopicPolicy 根据Broker是否配置了创建Topic的自定义校验策略; 使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy接口;并 在服务器配置 create.topic.policy.class.name=自定义类; 比如我就想所有创建Topic的请求分区数都要大于10; 那么这里就可以实现你的需求了


createTopicWithAssignment把topic相关数据写入到zk中; 进去分析一下


5.4 写入zookeeper数据

我们进入到adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)看看有哪些数据写入到了zk中;

  def createTopicWithAssignment(topic: String,
                                config: Properties,
                                partitionReplicaAssignment: Map[Int, Seq[Int]]): Unit = {
    validateTopicCreate(topic, partitionReplicaAssignment, config)
    // 将topic单独的配置写入到zk中
    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
    // 将topic分区相关信息写入zk中
    writeTopicPartitionAssignment(topic, partitionReplicaAssignment.mapValues(ReplicaAssignment(_)).toMap, isUpdate = false)
  }

源码就不再深入了,这里直接详细说明一下


写入Topic配置信息


先调用SetDataRequest请求往节点/config/topics/Topic名称 写入数据; 这里

一般这个时候都会返回 NONODE (NoNode);节点不存在; 假如zk已经存在节点就直接覆盖掉

节点不存在的话,就发起CreateRequest请求,写入数据; 并且节点类型是持久节点

这里写入的数据,是我们入参时候传的topic配置--config; 这里的配置会覆盖默认配置


写入Topic分区副本信息


将已经分配好的副本分配策略写入到 /brokers/topics/Topic名称 中; 节点类型 持久节点


image.png

image.png

具体跟zk交互的地方在

ZookeeperClient.send() 这里包装了很多跟zk的交互;

image.png

6. Controller监听 /brokers/topics/Topic名称, 通知Broker将分区写入磁盘

Controller 有监听zk上的一些节点; 在上面的流程中已经在zk中写入了 /brokers/topics/Topic名称 ; 这个时候Controller就监听到了这个变化并相应;


KafkaController.processTopicChange

  private def processTopicChange(): Unit = {
    //如果处理的不是Controller角色就返回
    if (!isActive) return
    //从zk中获取 `/brokers/topics 所有Topic
    val topics = zkClient.getAllTopicsInCluster
    //找出哪些是新增的
    val newTopics = topics -- controllerContext.allTopics
    //找出哪些Topic在zk上被删除了
    val deletedTopics = controllerContext.allTopics -- topics
    controllerContext.allTopics = topics
    registerPartitionModificationsHandlers(newTopics.toSeq)
    val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
    deletedTopics.foreach(controllerContext.removeTopic)
    addedPartitionReplicaAssignment.foreach {
      case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
    }
    info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
      s"[$addedPartitionReplicaAssignment]")
    if (addedPartitionReplicaAssignment.nonEmpty)
      onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
  }

从zk中获取 /brokers/topics 所有Topic跟当前Broker内存中所有BrokercontrollerContext.allTopics的差异; 就可以找到我们新增的Topic; 还有在zk中被删除了的Broker(该Topic会在当前内存中remove掉)


从zk中获取/brokers/topics/{TopicName} 给定主题的副本分配。并保存在内存中

image.png

  1. 执行onNewPartitionCreation;分区状态开始流转

6.1 onNewPartitionCreation 状态流转

关于Controller的状态机 详情请看: 【kafka源码】Controller中的状态机

  /**
   * This callback is invoked by the topic change callback with the list of failed brokers as input.
   * It does the following -
   * 1. Move the newly created partitions to the NewPartition state
   * 2. Move the newly created partitions from NewPartition->OnlinePartition state
   */
  private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = {
    info(s"New partition creation callback for ${newPartitions.mkString(",")}")
    partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
    partitionStateMachine.handleStateChanges(
      newPartitions.toSeq,
      OnlinePartition,
      Some(OfflinePartitionLeaderElectionStrategy(false))
    )
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
  }

将待创建的分区状态流转为NewPartition;

image.png

将待创建的副本 状态流转为NewReplica;

image.png

将分区状态从刚刚的NewPartition流转为OnlinePartition

0. 获取leaderIsrAndControllerEpochs; Leader为副本的第一个;

1. 向zk中写入/brokers/topics/{topicName}/partitions/ 持久节点; 无数据

2. 向zk中写入/brokers/topics/{topicName}/partitions/{分区号} 持久节点; 无数据

3. 向zk中写入/brokers/topics/{topicName}/partitions/{分区号}/state 持久节点; 数据为leaderIsrAndControllerEpoch

image.png

向副本所属Broker发送leaderAndIsrRequest请求

向所有Broker发送UPDATE_METADATA 请求

将副本状态从刚刚的NewReplica流转为OnlineReplica ,更新下内存

关于分区状态机和副本状态机详情请看【kafka源码】Controller中的状态机


7. Broker收到LeaderAndIsrRequest 创建本地Log

上面步骤中有说到向副本所属Broker发送leaderAndIsrRequest请求,那么这里做了什么呢

其实主要做的是 创建本地Log


代码太多,这里我们直接定位到只跟创建Topic相关的关键代码来分析

KafkaApis.handleLeaderAndIsrRequest->replicaManager.becomeLeaderOrFollower->ReplicaManager.makeLeaders...LogManager.getOrCreateLog

  /**
   * 如果日志已经存在,只返回现有日志的副本否则如果 isNew=true 或者如果没有离线日志目录,则为给定的主题和给定的分区创建日志 否则抛出 KafkaStorageException
   */
  def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = {
    logCreationOrDeletionLock synchronized {
      getLog(topicPartition, isFuture).getOrElse {
        // create the log if it has not already been created in another thread
        if (!isNew && offlineLogDirs.nonEmpty)
          throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
        val logDirs: List[File] = {
          val preferredLogDir = preferredLogDirs.get(topicPartition)
          if (isFuture) {
            if (preferredLogDir == null)
              throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
            else if (getLog(topicPartition).get.dir.getParent == preferredLogDir)
              throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
          }
          if (preferredLogDir != null)
            List(new File(preferredLogDir))
          else
            nextLogDirs()
        }
        val logDirName = {
          if (isFuture)
            Log.logFutureDirName(topicPartition)
          else
            Log.logDirName(topicPartition)
        }
        val logDir = logDirs
          .toStream // to prevent actually mapping the whole list, lazy map
          .map(createLogDirectory(_, logDirName))
          .find(_.isSuccess)
          .getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
          .get // If Failure, will throw
        val log = Log(
          dir = logDir,
          config = config,
          logStartOffset = 0L,
          recoveryPoint = 0L,
          maxProducerIdExpirationMs = maxPidExpirationMs,
          producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
          scheduler = scheduler,
          time = time,
          brokerTopicStats = brokerTopicStats,
          logDirFailureChannel = logDirFailureChannel)
        if (isFuture)
          futureLogs.put(topicPartition, log)
        else
          currentLogs.put(topicPartition, log)
        info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(", ")}}.")
        // Remove the preferred log dir since it has already been satisfied
        preferredLogDirs.remove(topicPartition)
        log
      }
    }
  }

如果日志已经存在,只返回现有日志的副本否则如果 isNew=true 或者如果没有离线日志目录,则为给定的主题和给定的分区创建日志 否则抛出KafkaStorageException

详细请看 【kafka源码】LeaderAndIsrRequest请求


源码总结

如果上面的源码分析,你不想看,那么你可以直接看这里的简洁叙述


根据是否有传入参数--zookeeper 来判断创建哪一种 对象topicService

如果传入了--zookeeper 则创建 类 ZookeeperTopicService的对象

否则创建类AdminClientTopicService的对象(我们主要分析这个对象)

如果有入参--command-config ,则将这个文件里面的参数都放到mapl类型 commandConfig里面, 并且也加入bootstrap.servers的参数;假如配置文件里面已经有了bootstrap.servers配置,那么会将其覆盖

将上面的commandConfig作为入参调用Admin.create(commandConfig)创建 Admin; 这个时候调用的Client模块的代码了, 从这里我们就可以猜测,我们调用kafka-topic.sh脚本实际上是kafka模拟了一个客户端Client来创建Topic的过程;

一些异常检查

①.如果配置了副本副本数–replication-factor 一定要大于0

②.如果配置了–partitions 分区数 必须大于0

③.去zk查询是否已经存在该Topic

判断是否配置了参数--replica-assignment ; 如果配置了,那么Topic就会按照指定的方式来配置副本情况

解析配置--config 配置放到configsMap中; configsMap给到NewTopic对象

将上面所有的参数包装成一个请求参数CreateTopicsRequest ;然后找到是Controller的节点发起请求(ControllerNodeProvider)

服务端收到请求之后,开始根据CreateTopicsRequest来调用创建Topic的方法; 不过首先要判断一下自己这个时候是不是Controller; 有可能这个时候Controller重新选举了; 这个时候要抛出异常

服务端进行一下请求参数检查

①.检查Topic是否存在

②.检查 --replica-assignment参数和 (--partitions || --replication-factor ) 不能同时使用

如果(--partitions || --replication-factor ) 没有设置,则使用 Broker的默认配置(这个Broker肯定是Controller)

计算分区副本分配方式;如果是传入了 --replica-assignment;则会安装自定义参数进行组装;否则的话系统会自动计算分配方式; 具体详情请看 【kafka源码】创建Topic的时候是如何分区和副本的分配规则

createTopicPolicy根据Broker是否配置了创建Topic的自定义校验策略; 使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy接口;并 在服务器配置 create.topic.policy.class.name=自定义类; 比如我就想所有创建Topic的请求分区数都要大于10; 那么这里就可以实现你的需求了

zk中写入Topic配置信息 发起CreateRequest请求,这里写入的数据,是我们入参时候传的topic配置--config; 这里的配置会覆盖默认配置;并且节点类型是持久节点;path = /config/topics/Topic名称

zk中写入Topic分区副本信息 发起CreateRequest请求 ,将已经分配好的副本分配策略 写入到 /brokers/topics/Topic名称中; 节点类型 持久节点

Controller监听zk上面的topic信息; 根据zk上变更的topic信息;计算出新增/删除了哪些Topic; 然后拿到新增Topic的 副本分配信息; 并做一些状态流转

向新增Topic所在Broker发送leaderAndIsrRequest请求,

Broker收到发送leaderAndIsrRequest请求; 创建副本Log文件;


image.png

image.png

Q&A

创建Topic的时候 在Zk上创建了哪些节点

接受客户端请求阶段:


topic的配置信息 /config/topics/Topic名称 持久节点

topic的分区信息/brokers/topics/Topic名称 持久节点

Controller监听zk节点/brokers/topics变更阶段


/brokers/topics/{topicName}/partitions/持久节点; 无数据

向zk中写入/brokers/topics/{topicName}/partitions/{分区号} 持久节点; 无数据

向zk中写入/brokers/topics/{topicName}/partitions/{分区号}/state 持久节点;

创建Topic的时候 什么时候在Broker磁盘上创建的日志文件

当Controller监听zk节点/brokers/topics变更之后,将新增的Topic 解析好的分区状态流转

NonExistentPartition->NewPartition->OnlinePartition 当流转到OnlinePartition的时候会像分区分配到的Broker发送一个leaderAndIsrRequest请求,当Broker们收到这个请求之后,根据请求参数做一些处理,其中就包括检查自身有没有这个分区副本的本地Log;如果没有的话就重新创建;


如果我没有指定分区数或者副本数,那么会如何创建

我们都知道,如果我们没有指定分区数或者副本数, 则默认使用Broker的配置, 那么这么多Broker,假如不小心默认值配置不一样,那究竟使用哪一个呢? 那肯定是哪台机器执行创建topic的过程,就是使用谁的配置;

所以是谁执行的? 那肯定是Controller啊! 上面的源码我们分析到了,创建的过程,会指定Controller这台机器去进行;


如果我手动删除了/brokers/topics/下的某个节点会怎么样?

详情请看 【kafka实战】一不小心删除了/brokers/topics/下的某个Topic


如果我手动在zk中添加/brokers/topics/{TopicName}节点会怎么样

先说结论: 根据上面分析过的源码画出的时序图可以指定; 客户端发起创建Topic的请求,本质上是去zk里面写两个数据


topic的配置信息 /config/topics/Topic名称 持久节点

topic的分区信息/brokers/topics/Topic名称 持久节点

所以我们绕过这一步骤直接去写入数据,可以达到一样的效果;不过我们的数据需要保证准确

因为在这一步已经没有了一些基本的校验了; 假如这一步我们写入的副本Brokerid不存在会怎样,从时序图中可以看到,leaderAndIsrRequest请求; 就不会正确的发送的不存在的BrokerId上,那么那台机器就不会创建Log文件;

下面不妨让我们来验证一下;

创建一个节点/brokers/topics/create_topic_byhand_zk 节点数据为下面数据;

{"version":2,"partitions":{"2":[3],"1":[3],"0":[3]},"adding_replicas":{},"removing_replicas":{}}

image.png

这里我用的工具PRETTYZOO手动创建的,你也可以用命令行创建;

创建完成之后我们再看看本地有没有生成一个Log文件

image.png

在我们写入zk数据的时候,就已经确定好了哪个每个分区的Leader是谁了,那就是第一个副本默认为Leader


如果写入/brokers/topics/{TopicName}节点之后Controller挂掉了会怎么样

先说结论:Controller 重新选举的时候,会有一些初始化的操作; 会把创建过程继续下去


然后我们来模拟这么一个过程,先停止集群,然后再zk中写入/brokers/topics/{TopicName}节点数据; 然后再启动一台Broker;

源码分析: 我们之前分析过Controller的启动过程与选举 有提到过,这里再提一下Controller当选之后有一个地方处理这个事情

replicaStateMachine.startup()
partitionStateMachine.startup()

启动状态机的过程是不是跟上面的6.1 onNewPartitionCreation 状态流转 的过程很像; 最终都把状态流转到了OnlinePartition; 伴随着是不发起了leaderAndIsrRequest请求; 是不是Broker收到请求之后,创建本地Log文件了


附件

–config 可生效参数

请以sh bin/kafka-topic -help 为准

configurations:                      
                  cleanup.policy                        
                  compression.type                      
                  delete.retention.ms                   
                  file.delete.delay.ms                  
                  flush.messages                        
                  flush.ms                              
                  follower.replication.throttled.       
replicas                             
                index.interval.bytes                  
                leader.replication.throttled.replicas 
                max.compaction.lag.ms                 
                max.message.bytes                     
                message.downconversion.enable         
                message.format.version                
                message.timestamp.difference.max.ms   
                message.timestamp.type                
                min.cleanable.dirty.ratio             
                min.compaction.lag.ms                 
                min.insync.replicas                   
                preallocate                           
                retention.bytes                       
                retention.ms                          
                segment.bytes                         
                segment.index.bytes                   
                segment.jitter.ms                     
                segment.ms                            
                unclean.leader.election.enable


相关文章
|
6月前
|
消息中间件 存储 负载均衡
深入了解Kafka中Topic的神奇之处
深入了解Kafka中Topic的神奇之处
515 0
|
6月前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
133 1
|
6月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
488 4
|
1月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
67 4
|
2月前
|
消息中间件 Kafka Apache
kafka: invalid configuration (That topic/partition is already being consumed)
kafka: invalid configuration (That topic/partition is already being consumed)
|
4月前
|
消息中间件 监控 Kafka
查询Kafka集群中消费组(group)信息和对应topic的消费情况
查询Kafka集群中消费组(group)信息和对应topic的消费情况
2503 0
|
6月前
|
消息中间件 负载均衡 监控
【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
【4月更文挑战第13天】【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
|
6月前
|
消息中间件 存储 缓存
【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
【4月更文挑战第13天】【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
|
6月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
126 1
|
6月前
|
消息中间件 存储 负载均衡
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解
下一篇
无影云桌面