1.3 网络响应端设计
1.3.1 总体架构分析
总结下:这个图很清晰的说明我们客户发送一个请求,给服务端, KafkaRquestHadnler 销毁 requestQueue 里面的数据,然后处理完后将结果放到 responseQueue 里面,Processor 会遍历响应队列,然后返回给客户端
1.3.2 源码分析
def run() { while(true) { try { var req : RequestChannel.Request = null while (req == null) { // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = SystemTime.nanoseconds req = requestChannel.receiveRequest(300) val idleTime = SystemTime.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) } if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) return } req.requestDequeueTimeMs = SystemTime.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) // TODO: 交给另外一个线程 apis.handle(req) } catch { case e: Throwable => error("Exception when handling request", e) } } }
这里就是从队里里面拿取数据,然后交给 api 进行处理,
def receiveRequest(timeout: Long): RequestChannel.Request = requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
这里就是我们的数据就是我们在请求端进行放入,然后在这里将数据获取出来。
def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] val numBytesAppended = produceRequest.sizeInBytes val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition { case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic)) } // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1)) var errorInResponse = false mergedResponseStatus.foreach { case (topicAndPartition, status) => if (status.error != ErrorMapping.NoError) { errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( produceRequest.correlationId, produceRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) } } def produceResponseCallback(delayTimeMs: Int) { if (produceRequest.requiredAcks == 0) { // no operation needed if producer request.required.acks = 0; however, if there is any error in handling // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata if (errorInResponse) { val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) => topicAndPartition -> ErrorMapping.exceptionNameFor(status.error) }.mkString(", ") info( s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " + s"from client id ${produceRequest.clientId} with ack=0\n" + s"Topic and partition to exceptions: $exceptionsSummary" ) requestChannel.closeConnection(request.processor, request) } else { requestChannel.noOperation(request.processor, request) } } else { val response = ProducerResponse(produceRequest.correlationId, mergedResponseStatus, produceRequest.versionId, delayTimeMs) requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } } // When this callback is triggered, the remote API call has completed request.apiRemoteCompleteTimeMs = SystemTime.milliseconds quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, numBytesAppended, produceResponseCallback) } if (authorizedRequestInfo.isEmpty) { sendResponseCallback(Map.empty) } else { val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId // call the replica manager to append messages to the replicas replicaManager.appendMessages( produceRequest.ackTimeoutMs.toLong, produceRequest.requiredAcks, internalTopicsAllowed, authorizedRequestInfo, sendResponseCallback) // if the request is put into the purgatory, it will have a held reference // and hence cannot be garbage collected; hence we clear its data here in // order to let GC re-claim its memory since it is already appended to log produceRequest.emptyData() } }
这里通过写副本,副本写成功之后,完成回调。
二、Kafka 副本机制
2.1 总体架构分析
Kafka 提供了数据复制算法保证,如果 leader 发生故障或挂掉,一个新 leader 被选举并被接受客户端的消息成功写入。Kafka 确保从同步副本列表中选举一个副本为 leader,或者说 follower 追赶 leader 数据。leader 负责维护和跟踪 ISR(In-Sync Replicas 的缩写,表示副本同步队列。当 producer 发送一条消息到 broker 后,leader 写入消息并复制到所有 follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的 follower 限制,重要的是快速检测慢副本,如果 follower“落后”太多或者失效,leader 将会把它从 ISR 中删除
2.2 Leader-based 的副本机制
- Kafka 中分成两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
- Kafka 中,追随者副本是不对外提供服务的。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。(因此
目前kafka只能享受到副本机制带来的第 1 个好处,也就是提供数据冗余实现高可用性和高持久性
) - 领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
2.3 Kafka ISR 机制
每个分区都有一个 ISR(in-sync Replica) 列表,用于维护所有同步的、可用的副本。首领副本必然是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步副本:与 Zookeeper 之间有一个活跃的会话,即必须定时向 Zookeeper 发送心跳;
- 在规定的时间内从首领副本那里低延迟地获取过消息。
- 如果副本不满足上面条件的话,就会被从 ISR 列表中移除,直到满足条件才会被再次加入。
2.4 LEO (last end offset)、HW (HighWatermark)
2.4.1 高水位原理图
由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据。而 Kafka 的这种使用 ISR 的方式则很好的均衡了确保数据不丢失以及吞吐率。Kafka 的 ISR 的管理最终都会反馈到 Zookeeper 节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个 Zookeeper 的节点进行维护:
- Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 Zookeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。
- leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR, 如果发现 ISR 变化,则会将新的 ISR 的信息返回到 Zookeeper 的相关节点中。副本不同步的异常情况
- 慢副本:在一定周期时间内 follower 不能追赶上 leader。最常见的原因之一是 I / O 瓶颈导致 follower 追加复制消息速度慢于从 leader 拉取速度。
- 卡住副本:在一定周期时间内 follower 停止从 leader 拉取请求。follower replica 卡住了是由于 GC 暂停或 follower 失效或死亡。
- 新启动副本:当用户给主题增加副本因子时,新的 follower 不在同步副本列表中,直到他们完全赶上了 leader 日志。
三、Kafka Segement
3.1 总体架构分析
3.2 稀疏索引
索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中 message 的物理偏移地址。其中以索引文件中元数据 3,497 为例,依次在数据文件中表示第 3 个 message(在全局 partiton 表示第 368772 个 message)、以及该消息的物理偏移地址为 497
3.3 传输协议
message 物理结构
参数说明
例如读取 offset=368776 的 message,需要通过下面 2 个步骤查找。第一步查找 segment file ,其中 00000000000000000000.index 表示最开始的文件,起始偏移量(offset)为 0.第二个文件 00000000000000368769.index 的消息量起始偏移量为 368770 = 368769 + 1.同样,第三个文件 00000000000000737337.index 的起始偏移量为 737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据 offset 「二分查找」文件列表,就可以快速定位到具体文件。当 offset=368776 时定位到 00000000000000368769.index|log 第二步通过 segment file 查找 message 通过第一步定位到 segment file,当 offset=368776 时,依次定位到 00000000000000368769.index 的元数据物理位置和 00000000000000368769.log 的物理偏移地址,然后再通过 00000000000000368769.log 顺序查找直到 offset=368776 为止。从上述图 3 可知这样做的优点,segment index file 采取稀疏索引存储方式,它减少索引文件大小,通过 mmap 可以直接内存操作,稀疏索引为数据文件的每个对应 message 设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
3.4 OS cache
页缓存技术 + 磁盘顺序写
你在写磁盘文件的时候,可以直接写入 os cache 中,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 os cache 里的数据真的刷入到磁盘中
「顺序写磁盘」
另外还有非常关键的一点,Kafka 在写数据的时候是以磁盘顺序写的方式来落盘的,也就是说,仅仅将数据追加到文件的末尾(append),而不是在文件的随机位置来修改数据。对于普通的机械硬盘如果你要是随机写的话,确实性能极低,这里涉及到磁盘寻址的问题。但是如果只是追加文件末尾按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能本身是差不多的。
3.5 日志刷盘
熟悉 Linux 操作系统原理的都知道,当我们把数据写入到文件系统之后,数据其实在操 作系统的 page cache 里面,并没有刷到磁盘上去。如果此时操作系统挂了,其实数据就丢了。
两种实现方式
一方面,应用程序可以调用 fsync 这个系统调用来强制刷盘;
另一方面,操作系统有后 台线程,定期刷盘。
「性能衡量」
如果应用程序每写入 1 次数据,都调用一次 fsync,那性能损耗就很大,所以一般都会 在性能和可靠性之间进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统 不挂,数据就不会丢。另外, kafka 是多副本的,当你配置了同步复制之后。多个副本的数据都在 page cache 里 面,出现多个副本同时挂掉的概率比 1 个副本挂掉,概率就小很多了。
参数设置
对于 kafka 来说,也提供了相关的配置参数,来让你在性能与可靠性之间权衡:
❝log.flush.interval.messages 在将消息刷新到磁盘之前,在日志分区上累积的消息数量 log.flush.interval.ms 在刷新到磁盘之前,任何 topic 中的消息保留在内存中的最长时间(以毫秒为单位)。如果未设置,则使用 log.flush.scheduler.interval.ms 中的值 log.flush.scheduler.interval.ms 日志刷新器检查是否需要将所有日志刷新到磁盘的频率
❞
如果应用程序每写入 1 次数据,都调用一次 fsync,那性能损耗就很大,所以一般都会 在性能和可靠性之间进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统 不挂,数据就不会丢。另外, kafka 是多副本的,当你配置了同步复制之后。多个副本的数据都在 page cache 里 面,出现多个副本同时挂掉的概率比 1 个副本挂掉,概率就小很多了。