【kafka源码】/log_dir_event_notification的LogDir脱机事件通知

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【kafka源码】/log_dir_event_notification的LogDir脱机事件通知

提示:本文可能已过期,请点击原文查看:【kafka源码】/log_dir_event_notification的LogDir脱机事件通知


前言


我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点

这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现异常时(比如磁盘损坏,文件读写失败,等等异常): 向zk中谢增一个子节点/log_dir_event_notification/log_dir_event_序列号 ;Controller监听到这个节点的变更之后,会向Brokers们发送LeaderAndIsrRequest请求; 然后做一些副本脱机的善后操作


源码分析

这里说的dirLog是 server.properties中配置的log.dir 例如


image.png

image.png

副本异常处理

首先我们找到有使用这个节点的源码;

kafka启动之初有调用

ReplicaManager.startup()

  def startup(): Unit = {
    // 省略...
    //当inter-broker protocol (IBP) < 1.0的时候,如果存在logDir的一些异常则直接让整个Broker启动失败;
    val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
    logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
    logDirFailureHandler.start()
  }
  private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
    override def doWork(): Unit = {
      //从队列 offlineLogDirQueue 取数据
      val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
      if (haltBrokerOnDirFailure) {
        fatal(s"Halting broker because dir $newOfflineLogDir is offline")
        Exit.halt(1)
      }
      handleLogDirFailure(newOfflineLogDir)
    }
  }
// logDir should be an absolute path
  // sendZkNotification is needed for unit test
  def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = {
       // 省略...
    logManager.handleLogDirFailure(dir)
    if (sendZkNotification)
      zkClient.propagateLogDirEvent(localBrokerId)
    warn(s"Stopped serving replicas in dir $dir")
  }

代码比较长,就直接概况一下好了:

主要是当读取或操作LogDir的时候出现了异常就会执行到这里,有可能是磁盘脱机了,或者文件突然没有读取写入权限等等之类的一些IOException异常;那么 Broker就需要做一些处理;如下


做个判断inter.broker.protocol.version 协议版本 < 1.0 的时候 时候直接退出;那个时候还不支持单Broker上存在多个logDir;

副本停止fetche数据

标记分区下线

可能移除一些监控信息

如果当前的log_dir 都脱机(或者异常了), 那么久可以直接shutdown这台机器了

如果还有其他的log_dir 还有在线的, 那么继续做一些其他的清理操作;

创建持久序列节点/log_dir_event_notification/log_dir_event_+序列号;数据是 BrokerID;例如:

/log_dir_event_notification/log_dir_event_0000000003

{"version":1,"broker":20003,"event":1}

PS: log_dir 是可以在一台Broker配置多个路径的 ,用逗号隔开

LogDir发生异常

比如说在 给文件加锁的时候lockLogDirs,磁盘损坏了就抛出异常IOException

  /**
   * Lock all the given directories
   */
  private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
    dirs.flatMap { dir =>
      try {
        val lock = new FileLock(new File(dir, LockFile))
        if (!lock.tryLock())
          throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent +
            ". A Kafka instance in another process or thread is using this directory.")
        Some(lock)
      } catch {
        case e: IOException =>
          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e)
          None
      }
    }
  }
  def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
    error(msg, e)
    if (offlineLogDirs.putIfAbsent(logDir, logDir) == null)
      offlineLogDirQueue.add(logDir)
  }

offlineLogDirQueue添加了一个异常队列之后就回到上面的副本异常处理代码了, 上面可是一致在queue.take()

Controller监听zk节点变更

KafkaController.processLogDirEventNotification

  private def processLogDirEventNotification(): Unit = {
    if (!isActive) return
    val sequenceNumbers = zkClient.getAllLogDirEventNotifications
    try {
      val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
      //尝试将这台Broker上的所有副本 走一下状态流转 到 OnlineReplica
      onBrokerLogDirFailure(brokerIds)
    } finally {
      // delete processed children
      zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion)
    }
  }

主要将从zk节点 /log_dir_event_notification/log_dir_event_序列号 中获取到的数据的Broker上的所有副本进行一个副本状态流转 ->OnlineReplica ;关于状态机的流转请看 【kafka源码】Controller中的状态机


给所有broker 发送LeaderAndIsrRequest请求,让brokers们去查询他们的副本的状态,如果副本logDir已经离线则返回KAFKA_STORAGE_ERROR异常;

完事之后会删除节点


相关文章
|
4月前
|
消息中间件 存储 Kafka
Kafka日志处理:深入了解偏移量查找与切分文件
**摘要:** 本文介绍了如何在Kafka中查找偏移量为23的消息,涉及ConcurrentSkipListMap的查询、索引文件的二分查找及日志分段的物理位置搜索。还探讨了Kafka日志分段的切分策略,包括大小、时间、索引大小和偏移量达到特定阈值时的切分条件。理解这些对于优化Kafka的性能和管理日志至关重要。
152 2
|
1月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
31 4
|
1月前
|
存储 消息中间件 大数据
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
37 1
|
1月前
|
存储 消息中间件 大数据
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
24 1
|
2月前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
25 3
|
2月前
|
消息中间件 存储 监控
Kafka的logs目录下的文件都是什么日志?
Kafka的logs目录下的文件都是什么日志?
99 11
|
3月前
|
消息中间件 监控 Kafka
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
189 3
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
5月前
spdlog 日志库部分源码说明——让你可以自定义的指定自动切换日志时间
spdlog 日志库部分源码说明——让你可以自定义的指定自动切换日志时间
128 7