【Kafka源码】万字长文详解Kafka网络模型、副本机制(下)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Kafka源码】万字长文详解Kafka网络模型、副本机制

1.3 网络响应端设计

1.3.1 总体架构分析

640.png

总结下:这个图很清晰的说明我们客户发送一个请求,给服务端, KafkaRquestHadnler 销毁 requestQueue 里面的数据,然后处理完后将结果放到 responseQueue 里面,Processor 会遍历响应队列,然后返回给客户端

1.3.2 源码分析

def run() {
    while(true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = SystemTime.nanoseconds
          req = requestChannel.receiveRequest(300)
          val idleTime = SystemTime.nanoseconds - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }
        if(req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(
            id, brokerId))
          return
        }
        req.requestDequeueTimeMs = SystemTime.milliseconds
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        // TODO: 交给另外一个线程
        apis.handle(req)
      } catch {
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }

这里就是从队里里面拿取数据,然后交给 api 进行处理,

def receiveRequest(timeout: Long): RequestChannel.Request =
    requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

这里就是我们的数据就是我们在请求端进行放入,然后在这里将数据获取出来。

def handleProducerRequest(request: RequestChannel.Request) {
    val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
    val numBytesAppended = produceRequest.sizeInBytes
    val (authorizedRequestInfo, unauthorizedRequestInfo) =  produceRequest.data.partition  {
      case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic))
    }
    // the callback for sending a produce response
    def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
      var errorInResponse = false
      mergedResponseStatus.foreach { case (topicAndPartition, status) =>
        if (status.error != ErrorMapping.NoError) {
          errorInResponse = true
          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
            produceRequest.correlationId,
            produceRequest.clientId,
            topicAndPartition,
            ErrorMapping.exceptionNameFor(status.error)))
        }
      }
      def produceResponseCallback(delayTimeMs: Int) {
        if (produceRequest.requiredAcks == 0) {
          // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
          // the request, since no response is expected by the producer, the server will close socket server so that
          // the producer client will know that some error has happened and will refresh its metadata
          if (errorInResponse) {
            val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>
              topicAndPartition -> ErrorMapping.exceptionNameFor(status.error)
            }.mkString(", ")
            info(
              s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +
                s"from client id ${produceRequest.clientId} with ack=0\n" +
                s"Topic and partition to exceptions: $exceptionsSummary"
            )
            requestChannel.closeConnection(request.processor, request)
          } else {
            requestChannel.noOperation(request.processor, request)
          }
        } else {
          val response = ProducerResponse(produceRequest.correlationId,
                                          mergedResponseStatus,
                                          produceRequest.versionId,
                                          delayTimeMs)
          requestChannel.sendResponse(new RequestChannel.Response(request,
                                                                  new RequestOrResponseSend(request.connectionId,
                                                                                            response)))
        }
      }
      // When this callback is triggered, the remote API call has completed
      request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
      quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,
                                                                   numBytesAppended,
                                                                   produceResponseCallback)
    }
    if (authorizedRequestInfo.isEmpty) {
      sendResponseCallback(Map.empty)
    } else {
      val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
      // call the replica manager to append messages to the replicas
      replicaManager.appendMessages(
        produceRequest.ackTimeoutMs.toLong,
        produceRequest.requiredAcks,
        internalTopicsAllowed,
        authorizedRequestInfo,
        sendResponseCallback)
      // if the request is put into the purgatory, it will have a held reference
      // and hence cannot be garbage collected; hence we clear its data here in
      // order to let GC re-claim its memory since it is already appended to log
      produceRequest.emptyData()
    }
  }

这里通过写副本,副本写成功之后,完成回调。

二、Kafka 副本机制

2.1 总体架构分析

640.png

Kafka 提供了数据复制算法保证,如果 leader 发生故障或挂掉,一个新 leader 被选举并被接受客户端的消息成功写入。Kafka 确保从同步副本列表中选举一个副本为 leader,或者说 follower 追赶 leader 数据。leader 负责维护和跟踪 ISR(In-Sync Replicas 的缩写,表示副本同步队列。当 producer 发送一条消息到 broker 后,leader 写入消息并复制到所有 follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的 follower 限制,重要的是快速检测慢副本,如果 follower“落后”太多或者失效,leader 将会把它从 ISR 中删除

2.2 Leader-based 的副本机制

640.png

  1. Kafka 中分成两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
  2. Kafka 中,追随者副本是不对外提供服务的。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。(因此目前kafka只能享受到副本机制带来的第 1 个好处,也就是提供数据冗余实现高可用性和高持久性
  3. 领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。

2.3 Kafka ISR 机制


每个分区都有一个 ISR(in-sync Replica) 列表,用于维护所有同步的、可用的副本。首领副本必然是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步副本:与 Zookeeper 之间有一个活跃的会话,即必须定时向 Zookeeper 发送心跳;

  1. 在规定的时间内从首领副本那里低延迟地获取过消息。
  2. 如果副本不满足上面条件的话,就会被从 ISR 列表中移除,直到满足条件才会被再次加入。

2.4 LEO (last end offset)、HW (HighWatermark)

2.4.1 高水位原理图

640.png

由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据。而 Kafka 的这种使用 ISR 的方式则很好的均衡了确保数据不丢失以及吞吐率。Kafka 的 ISR 的管理最终都会反馈到 Zookeeper 节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个 Zookeeper 的节点进行维护:

  • Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 Zookeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。
  • leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR, 如果发现 ISR 变化,则会将新的 ISR 的信息返回到 Zookeeper 的相关节点中。副本不同步的异常情况
  • 慢副本:在一定周期时间内 follower 不能追赶上 leader。最常见的原因之一是 I / O 瓶颈导致 follower 追加复制消息速度慢于从 leader 拉取速度。
  • 卡住副本:在一定周期时间内 follower 停止从 leader 拉取请求。follower replica 卡住了是由于 GC 暂停或 follower 失效或死亡。
  • 新启动副本:当用户给主题增加副本因子时,新的 follower 不在同步副本列表中,直到他们完全赶上了 leader 日志。

三、Kafka Segement

3.1 总体架构分析

640.png

3.2 稀疏索引

640.png

索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中 message 的物理偏移地址。其中以索引文件中元数据 3,497 为例,依次在数据文件中表示第 3 个 message(在全局 partiton 表示第 368772 个 message)、以及该消息的物理偏移地址为 497

3.3 传输协议

message 物理结构

640.png

参数说明

640.png

例如读取 offset=368776 的 message,需要通过下面 2 个步骤查找。第一步查找 segment file ,其中 00000000000000000000.index 表示最开始的文件,起始偏移量(offset)为 0.第二个文件 00000000000000368769.index 的消息量起始偏移量为 368770 = 368769 + 1.同样,第三个文件 00000000000000737337.index 的起始偏移量为 737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据 offset 「二分查找」文件列表,就可以快速定位到具体文件。当 offset=368776 时定位到 00000000000000368769.index|log 第二步通过 segment file 查找 message 通过第一步定位到 segment file,当 offset=368776 时,依次定位到 00000000000000368769.index 的元数据物理位置和 00000000000000368769.log 的物理偏移地址,然后再通过 00000000000000368769.log 顺序查找直到 offset=368776 为止。从上述图 3 可知这样做的优点,segment index file 采取稀疏索引存储方式,它减少索引文件大小,通过 mmap 可以直接内存操作,稀疏索引为数据文件的每个对应 message 设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。


3.4 OS cache

页缓存技术 + 磁盘顺序写

你在写磁盘文件的时候,可以直接写入 os cache 中,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 os cache 里的数据真的刷入到磁盘中

640.png

「顺序写磁盘」

另外还有非常关键的一点,Kafka 在写数据的时候是以磁盘顺序写的方式来落盘的,也就是说,仅仅将数据追加到文件的末尾(append),而不是在文件的随机位置来修改数据。对于普通的机械硬盘如果你要是随机写的话,确实性能极低,这里涉及到磁盘寻址的问题。但是如果只是追加文件末尾按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能本身是差不多的。

3.5 日志刷盘

熟悉 Linux 操作系统原理的都知道,当我们把数据写入到文件系统之后,数据其实在操 作系统的 page cache 里面,并没有刷到磁盘上去。如果此时操作系统挂了,其实数据就丢了。

两种实现方式

一方面,应用程序可以调用 fsync 这个系统调用来强制刷盘;

另一方面,操作系统有后 台线程,定期刷盘。

「性能衡量」

如果应用程序每写入 1 次数据,都调用一次 fsync,那性能损耗就很大,所以一般都会 在性能和可靠性之间进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统 不挂,数据就不会丢。另外, kafka 是多副本的,当你配置了同步复制之后。多个副本的数据都在 page cache 里 面,出现多个副本同时挂掉的概率比 1 个副本挂掉,概率就小很多了。

参数设置

对于 kafka 来说,也提供了相关的配置参数,来让你在性能与可靠性之间权衡:

log.flush.interval.messages 在将消息刷新到磁盘之前,在日志分区上累积的消息数量 log.flush.interval.ms 在刷新到磁盘之前,任何 topic 中的消息保留在内存中的最长时间(以毫秒为单位)。如果未设置,则使用 log.flush.scheduler.interval.ms 中的值 log.flush.scheduler.interval.ms 日志刷新器检查是否需要将所有日志刷新到磁盘的频率

如果应用程序每写入 1 次数据,都调用一次 fsync,那性能损耗就很大,所以一般都会 在性能和可靠性之间进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统 不挂,数据就不会丢。另外, kafka 是多副本的,当你配置了同步复制之后。多个副本的数据都在 page cache 里 面,出现多个副本同时挂掉的概率比 1 个副本挂掉,概率就小很多了。

相关文章
|
1月前
|
安全 网络安全 数据安全/隐私保护
访问控制列表(ACL)是网络安全中的一种重要机制,用于定义和管理对网络资源的访问权限
访问控制列表(ACL)是网络安全中的一种重要机制,用于定义和管理对网络资源的访问权限。它通过设置一系列规则,控制谁可以访问特定资源、在什么条件下访问以及可以执行哪些操作。ACL 可以应用于路由器、防火墙等设备,分为标准、扩展、基于时间和基于用户等多种类型,广泛用于企业网络和互联网中,以增强安全性和精细管理。
183 7
|
1天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
13 1
|
6天前
|
监控 安全 BI
什么是零信任模型?如何实施以保证网络安全?
随着数字化转型,网络边界不断变化,组织需采用新的安全方法。零信任基于“永不信任,永远验证”原则,强调无论内外部,任何用户、设备或网络都不可信任。该模型包括微分段、多因素身份验证、单点登录、最小特权原则、持续监控和审核用户活动、监控设备等核心准则,以实现强大的网络安全态势。
|
2月前
|
机器学习/深度学习 算法 数据安全/隐私保护
基于BP神经网络的苦瓜生长含水量预测模型matlab仿真
本项目展示了基于BP神经网络的苦瓜生长含水量预测模型,通过温度(T)、风速(v)、模型厚度(h)等输入特征,预测苦瓜的含水量。采用Matlab2022a开发,核心代码附带中文注释及操作视频。模型利用BP神经网络的非线性映射能力,对试验数据进行训练,实现对未知样本含水量变化规律的预测,为干燥过程的理论研究提供支持。
|
1月前
|
存储 网络协议 安全
30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场
本文精选了 30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场。
86 2
|
1月前
|
运维 网络协议 算法
7 层 OSI 参考模型:详解网络通信的层次结构
7 层 OSI 参考模型:详解网络通信的层次结构
146 1
|
2月前
|
网络协议 前端开发 Java
网络协议与IO模型
网络协议与IO模型
129 4
网络协议与IO模型
|
2月前
|
机器学习/深度学习 网络架构 计算机视觉
目标检测笔记(一):不同模型的网络架构介绍和代码
这篇文章介绍了ShuffleNetV2网络架构及其代码实现,包括模型结构、代码细节和不同版本的模型。ShuffleNetV2是一个高效的卷积神经网络,适用于深度学习中的目标检测任务。
103 1
目标检测笔记(一):不同模型的网络架构介绍和代码
|
1月前
|
网络协议 算法 网络性能优化
计算机网络常见面试题(一):TCP/IP五层模型、TCP三次握手、四次挥手,TCP传输可靠性保障、ARQ协议
计算机网络常见面试题(一):TCP/IP五层模型、应用层常见的协议、TCP与UDP的区别,TCP三次握手、四次挥手,TCP传输可靠性保障、ARQ协议、ARP协议
|
1月前
|
机器学习/深度学习 人工智能 算法
【车辆车型识别】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+算法模型
车辆车型识别,使用Python作为主要编程语言,通过收集多种车辆车型图像数据集,然后基于TensorFlow搭建卷积网络算法模型,并对数据集进行训练,最后得到一个识别精度较高的模型文件。再基于Django搭建web网页端操作界面,实现用户上传一张车辆图片识别其类型。
85 0
【车辆车型识别】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+算法模型