Kafka的心跳处理机制竟然用到了时间轮算法?

简介: Kafka的心跳处理机制竟然用到了时间轮算法?

Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重平衡。在2.4.x以下其版本中,消费组一旦进入重平衡状态,该消费组内所有消费者全部暂停消费,直到重平衡完成。


本文将来探讨Kafka的心跳机制的具体实现。本文的组织结构如下:


  • 源码解读Kafka心跳机制
  • Kafka心跳架构设计亮点(时间轮调度算法实现原理图)


温馨提示:如果大家对源码阅读不感兴趣,可以直接跳到本文的第二部分,用流程图、数据结构图阐述心跳的实现机制。

1、源码分析Kafka心跳机制


在介绍源码分析之前介绍笔直的一条源码分析经验:找准入口,了解调用链路。故笔者会先寻找归纳出Kafka心跳处理的所有入口。


1.1Kafka心跳入口总结


Kafka心跳包的处理流程如下图所示:

c95795dd26c3605cd898ed9bf1b7f26c.png

图的右边是kafka心跳在服务端的核心处理流程,而左边主要展示kafka中所有的心跳请求,根据上图得知Kafka触发心跳处理的主要请求分别如下:


  1. KafkaConsume主动发送心跳包 消费者会以3s的频率向服务端发送心跳包,服务端对应的入口为 KafkaApis的handleHeartbeatRequest方法。
  2. 消费者加入消费组 在消费端重平衡过程中,客户端主动向其组协调器发起Join_Group(加入消费组)时,组协调器会认为收到一个有效的心跳包,服务端对应的处理入口:KafkaApis的handleJoinGroup方法。
  3. 消费者获取队列负载结果 在重平衡的第二个阶段,消费组的Leader在计算出分区负载结果后会发给组协调器,消费组中的其他成员需要发生Sync_Group请求获取负载结果,组协调器同样认为收到了一个有效的心跳包。服务端对应的处理入口:KafkaApis的handleSyncGroupRequest。
  4. 消费者提交位点 消费者组协调器收到消费者提交位点请求,同样可以认定消费者是存活的。位点提交的处理入口:KafkaApis的handlerCommitOffsets方法。
  5. __consumers_offsets主题的ISR的Leader发生变化
    如果__consumers_offsets主题中的各个分区Leader发生变化,与特定分区的组协调器需要重新选举,与此组协调器相关的消费者将触发重平衡。


上述任何一种请求,都能表明消费端是存活的,故能有效阻止服务端将客户端端心跳设置为过期,进入下一个心跳检测周期。


上述各个入口,特别是__consumers_offsets的ISR对消费组的影响,后续会专门展开研究,现在我们将重心转移到服务端是如何处理一个心跳包的。


1.2 源码分析Kafka心跳处理机制


从上面的流程图可以得出,Kafka收到一个心跳包后的处理入口为GroupCoordinator的completeAndScheduleNextExpiration方法,核心代码如下图所示:

3f95084817417a0612b30f9f03dd08b2.png

在介绍该方法之前首先介绍一个该方法的入参含义:


  • GroupMetadata group 消费组的元信息。
  • MemberMetadata member 消费者的元信息。
  • long timeoutMs 心跳超时时间,默认为10s,这个参数是由消费端的session.timeout.ms参数设置,默认为10s。

Step1:为消费组设置唯一标识:groupId + "-" + memberId构成。

Step2:将hearbeatSatisfied设置为true,表示该消费者收到一个有效的心跳包。

Step3:收到一个有效的心跳包,通知定时调度器停止本次的心跳过期检测。

Step4:构建一个DelayedHearbeat,进入下一个心跳检测周期。


接下来将分别对Step3、Step4展开详细介绍。


1.2.1 心跳检测正常处理逻辑


在收到一个心跳包时,尝试将本次检测设置成功,具体的实现由DelayedOperation的checkAndComplete方法,代码如下:


953f77c0d280dcb3357738701d652c32.png

Kafka使用一个数据结构来存储需要跟踪的所有消费者,在这里成为Watch机制。


实现要点:根据key获取WatchList,然后从获取的WatchList中内部的ConcurrentMap中再按照Key获取对应与当前消费者对应的Watch。


  • 如果没有找到对应消费者的Watch,则直接返回,无需检测,说明已经成功检测。
  • 如果找到了对应消费者的Watch,则执行被watch的tryCompleteWatched方法。


Watch的数据结构如下:


dc0c10f1dc1e07b016c00510beeb3940.png

接下来重点关注Watches的tryCompleteWatched方法,该方法的详细调用代码如下图所示:

73c4693b5249074697ba4249108c511e.png

这边先重点介绍一下组协调器判断一次成功的心跳检测的三个标准中满足一个即可(GroupCoordinator的tryCompleteHeartbeat方法):


  • 如果消费组的状态处于Dead
  • 如果消费组的状态为Pending(消费组在重平衡中)
  • hearbeatSatisfied为true,即收到了一个有效的心跳包。


上述代码的实现比较简单,这里就不一一罗列,其核心关键点如下:


  • 删除对应的Watch,表示一次心跳检测成功。
  • Watchs中存储的对象是DelayedOperation(Kafka延迟类型的父类)的子类,在心跳检测中具体为DelayedHeartbeat。
  • 最终执行DelayedOperation的是TimeTask的cancel方法(取消延迟任务),就是从延迟调度中移除自己,表示没有超时,结束本轮的超时检测,具体的存储结构,将在下文详介绍如果开启新一轮心跳检测时再详细讲解。


为了方便大家阅读源码,其主要的调用时序图如下:

3a3a996a7fb3ce83a5ebc78aba70c097.png


1.2.2 开启下一轮心跳检测


1.2.2.1将延迟任务放入时间轮


在接受到一个新的心跳包首先用于清除上一轮设置的延迟任务,然后需要开启一个新的延迟任务,接下来我们将来具体看看Kafka如何开启新一轮心跳检测机制,**其本质上是Kafka的延迟(定时)实现原理。**代码入口如下图所示:

492d5cb615832cc7768c361002339e77.png


开启下一轮调度时首先将Member的heartbeatSatisfied设置为false。


其核心思想是创建一个心跳延迟任务DelayedHeartbeat,并对其检测是否完成或者添加Watch,启动心跳延迟或者等待下一个心跳包的到来。


其实看到这里,我们应该能得到一个关于Kafka心跳检测机制的实现思路:


  • 开启一个延迟任务,延迟检查时间为心跳过期时间,一旦延迟任务执行,则意外着心跳超时。
  • 当收到一个心跳包时,需要取消上一次设置的延迟任务。
  • 使用循环使用延迟任务,从而实现类似定时任务的效果。


接下来我们详细探讨一下DelayedOperationPurgatory的tryCompleteElseWatch方法,其代码如下图所示:


d81d26116d61d47183d0deef113b8fce.png

Step1:尝试调用DelayedHeartbeat的tryComplete方法,判断是否可以判断完成,这里主要是消费组是否为重平衡或者状态为Dead,如果上述情况不满足,则会返回false,因为在发起下一轮心跳包时已将heartbeatSatisfied设置为false。


Step2:为该消费者添加到Watch中,表示kafka需要跟踪该消费者的心跳。


Step3:再次调用maybeTryComplete方法,再尝试判断是否该心跳检测完成。


Step4:如果没有完成,则该任务延迟任务(DelayedHeartbeat)添加到定时调度中。


接下来将进入到Kafka心跳的核心机制,即延迟任务的实现机制

2881e1e094f72645363df2268311878a.png

每一个待执行的延迟任务被封装在TimeTaskEntry中,这个一个典型的双链表,数据结构说明说明如下:

f6e633ddc48c6c34d35e21040eb88f44.png

并持有一个关键字段:该定时任务的过期时间,等于系统当前时间+过期时间,在心跳检测场景中默认为10s。


继续跟踪SystemTimer的addTimerTaskEntry,其代码如下:

4184ecb435cc408fc4c788ceec9b7aa2.png

addTimerTaskEntry的核心实现如下:


  • 尝试将延迟任务添加到时间轮,如果已经过期,则提交到线程池,触发心跳过期的逻辑,提交到线程后,DelayedOperation的run方法会被调用,最终onExpiration方法被调用。
    3db31f7fa044b1e0491c617eb28875a1.png

接下来重点谈一下往时间轮中添加任务的具体实现,核心代码见下图所示:


ec62a86cadd67f6e3cdf65f501d595f3.png

核心实现要点:


Step1:如果任务已经被取消或者已过期,返回false。如果返回false,则会触发定时任务过期。


Step2根据过期时间,放入到时间轮中指定的位置,时间轮的数据结构如下:


611f202f0cb2e7d3f283c915bed45a04.png

每一个格代表一个时间间隔,例如200ms,当前指针指向的格子,代表该格子中的所有任务过期,例如现在要要插入一个700ms过期,从当前指针的下一格开始算起,放入第4格中。


另外时间轮的总格子有限,则该时间轮能计算的最大时间是有限的,例如一个8格的时间轮,每一格代表200ms,则如果要在2s后过期,显然这个时间轮无法存储,通常的解决方案是采用多级时间轮,另外一级的时间轮,其时间精度会更粗。


结合上述关于时间轮的原理,再去看上述代码,就显得容易看懂了。


Step3:就是处理第一级时间轮无法满足过期时间,则放入到第二级时间轮中。


1.2.2.2 驱动时间轮


基于时间轮算法,除了数据按找时间轮到方向、触发时间存储在合适的刻度量,还需要驱动时间轮指针。Kafka中的驱动时间轮入口为:

19bca76f40a0190c44b2ac3aac635e37.png

具体实现代码如下:

390144dfd1c0a0893abdadb68c615ea5.png

具体就是将指针处的所有任务全部拉取出来,执行addTimeTaskEntry,其中过期的任务将提交到线程池触发延迟任务的执行。


上述代码看起来比较简单,就不一一介绍,为了方便大家读懂上面的代码,我们只需要了解一下kafka采用时间轮的实际存储数据结构,即能很容易理解上述代码:

6621498e051b75be2220f2dd0b51ced9.png

其核心特点:环形队列就是一个数组,每一个元素在Kafka中对应一个桶,每一个桶存储一个TimerTaskList(链表),每次指针指向的TimerTaskList,将该链表中的元素代表的任务全部执行。


2、图解Kafka心跳架构设计


读起源码来说或许比较枯燥,接下来给出Kafka心跳处理的图解,重点是阐述Kafka时间轮算法的核心数据结构。

6b55ad45d44be45de9738f947f8fcede.png


相关文章
|
5月前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
308 0
|
2月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
40 3
|
2月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
64 3
|
2月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
38 2
|
2月前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
97 4
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
59 3
|
3月前
|
消息中间件 存储 监控
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
65 1
|
2月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
30 0
|
2月前
|
消息中间件 安全 Kafka
Flink与Kafka的终极联盟:揭秘如何在一瞬间切换SASL机制,保护您的数据不受黑客侵袭!
【8月更文挑战第7天】Apache Flink作为高性能流处理框架,在与Kafka集成时确保数据安全至关重要。通过配置`KafkaConsumer`使用SASL机制如SCRAM-SHA-256或PLAIN,可有效防止未授权访问。SCRAM-SHA-256采用强化的身份验证流程提高安全性,而PLAIN机制则相对简单。配置涉及设置`properties`参数,包括指定`sasl.mechanism`、`security.protocol`及JAAS认证信息。合理选择和配置这些参数对于保护Flink应用与Kafka间的数据通信安全至关重要。
39 0
|
4月前
|
消息中间件 Kafka 程序员
Kafka面试必备:深度解析Replica副本的作用与机制
**Kafka的Replica副本是保证数据可靠性的关键机制。每个Partition有Leader和Follower副本,Leader处理读写请求及管理同步,Follower被动同步并准备成为新Leader。从Kafka 2.4开始,Follower在完全同步时也可提供读服务,提升性能。数据一致性通过高水位机制和Leader Epoch机制保证,后者更精确地判断和恢复数据一致性,增强系统容错能力。**
108 1
下一篇
无影云桌面