解析Kafka High Available 二

本文涉及的产品
云原生网关 MSE Higress,422元/月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: Broker Failover过程 一、Controller对Broker failure的处理过程 1、假设只有一台broker宕机 2、Controller在ZooKeeper的/brokers/ids节点上注册Watch。

Broker Failover过程

一、Controller对Broker failure的处理过程

    1、假设只有一台broker宕机
    2、Controller在ZooKeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机,其在ZooKeeper对应的Znode会自动被删除,ZooKeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
    3、Controller决定set_p,该集合包含了宕机的Broker上的所有Partition分区
        3.1、set_p包含了, 所有topic下面分布在该broker上的partition分区leader
    4、对set_p中的每一个Partition:

        4.1、从/brokers/topics/[topic]/partitions/[partition]/state读取某一个Partition的ISR(容灾备份分区列表)。
            a、在这里需要注意的一个问题的是:这里所指的该broker上的partition,代表的就是某一个topic的其中一个partition分区的leader
            b、至于已该broker上的replica(其他partition的备份)是不关心的.
    5、决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)(ISR里面只有一个replica,是被选为leader,因为只有一个了)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1(表示某个topic下面的某一个partition分区已经完全损坏,该分区不能在使用)。
    6、将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有Controller版本在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1。
    7、直接通过RPC(远程网络调用)向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。

二、创建/删除Topic

    1、Controller在ZooKeeper的/brokers/topics节点上注册Watch,一旦某个Topic被创建或删除,则Controller会通过Watch得到新创建/删除的Topic的Partition/Replica分配。
        1.1、这句话写的真的很绕,通俗的说,就是controller在topics节点上注册了一个哨兵,只要topics发生改变其发生改变的topic已经啊topic下面的partition和rpelica都会被controller知道.
    2、对于删除Topic操作,Topic工具会将该Topic名字存于/admin/delete_topics。
        2.1、若delete.topic.enable为true,则Controller注册在/admin/delete_topics上的Watch被fire,Controller通过回调向对应的Broker发送StopReplicaRequest(停止从;leader哪里进行主从同步),
        2.2、若为false则Controller不会在/admin/delete_topics上注册Watch,也就不会对该事件作出反应。
    3、对于创建Topic操作,Controller从/brokers/ids读取当前所有可用的Broker列表,对于set_p中的每一个Partition:(为什么是set_p?)
        3.1、分配给该Partition(分区)的所有Replica(备份)(称为AR)中任选一个可用的Broker作为新的Leader,并将AR(备份列表)设置为新的ISR
            (因为该Topic是新创建的,所以AR中所有的Replica都没有数据,可认为它们都是同步的,也即都在ISR中,任意一个Replica都可作为Leader)
    4、将新的Leader和ISR写入/brokers/topics/[topic]/partitions/[partition]
    5、直接通过RPC向相关的Broker发送LeaderAndISRRequest。
      

三、Broker响应请求流程

      四个角色
        1、acceptor 请求接受这 
                Acceptor的主要职责是监听并接受客户端(请求发起方,包括但不限于Producer,Consumer,Controller,Admin Tool)的连接请求,并建立和客户端的数据传输通道,然后为该客户端指定一个Processor,至此它对该客户端该次请求的任务就结束了,它可以去响应下一个客户端的连接请求了
        2、Processor 请求中转处理者                                    
            Processor主要负责从客户端读取数据并将响应返回给客户端,它本身并不处理具体的业务逻辑,并且其内部维护了一个队列来保存分配给它的所有SocketChannel。Processor的run方法会循环从队列中取出新的SocketChannel并将其SelectionKey.OP_READ注册到selector上,然后循环处理已就绪的读(请求)和写(响应)。Processor读取完数据后,将其封装成Request对象并将其交给RequestChannel。
        3、RequestChannel 中转站
            RequestChannel是Processor和KafkaRequestHandler交换数据的地方,它包含一个队列requestQueue用来存放Processor加入的Request,KafkaRequestHandler会从里面取出Request来处理;同时它还包含一个respondQueue,用来存放KafkaRequestHandler处理完Request后返还给客户端的Response。
        4、KafkaRequestHandler 业务处理器
            循环从RequestChannel中取Request并交给kafka.server.KafkaApis处理具体的业务逻辑。
    Processor会通过processNewResponses方法依次将requestChannel中responseQueue保存的Response取出,并将对应的SelectionKey.OP_WRITE事件注册到selector上。当selector的select方法返回时,对检测到的可写通道,调用write方法将Response返回给客户端
  

四、LeaderAndIsrRequest响应过程

    对于收到的LeaderAndIsrRequest,Broker主要通过ReplicaManager的becomeLeaderOrFollower处理,流程如下:
        1、校验求中的controllerEpoch是否小于当前最新的controllerEpoch,小于则表示该条请求已经过期(已经有新的controller被选出) return 错误 否则继续
        2、对于请求中partitionStateInfos中的每一个元素,即((topic, partitionId), partitionStateInfo):
        3、检查每日一个元素的partitionStateInfo中的leader时间戳,是否小于当前broker所有用的该partition的leader时间戳小,小于则已过期,需要吧错误代码存入Response中,并且continue检查下一个元素,否则继续
        4、检查该元素的partitionStatrInfo中的broker_id 是否包含本broker_id,若是不包含,则记录一个error或者wore,然后continue下一个元素 若是包含 ,则继续
            在一开始搭建HA的时候已经确定好了每一个topic的partition(分区)置于那一部分broker之中,也就是有一个Replica list,所以,要是该list中本就不包含本台broker的id则说明这个分区不应该出现在这个broker上
            同样的也会有一份replica list 来决定每一个partition的备份群归属地,所以需要进行校
        5、该partition及partitionStateInfo存入一个名为partitionState的HashMap中
        6、筛选出partitionState中Leader与当前Broker ID相等的所有记录存入partitionsTobeLeader中,其它记录存入partitionsToBeFollower中 区分主从
        7、若partitionsTobeLeader不为空,则对其执行makeLeaders方。
        8、若partitionsToBeFollower不为空,则对其执行makeFollowers方法。
        9、若highwatermak线程还未启动,则将其启动,并将hwThreadInitialized设为true。
        10、关闭所有Idle状态的Fetcher。
        11、LeaderAndIsrRequest处理过程如下图所示
            

五、Broker启动过程

      Broker启动后首先根据其ID在ZooKeeper的/brokers/idszonde下创建临时子节点(Ephemeral node),创建成功后Controller的ReplicaStateMachine注册其上的Broker Change Watch会被fire,从而通过回调KafkaController.onBrokerStartup方法完成以下步骤:
      向所有新启动的Broker发送UpdateMetadataRequest,其定义如下。
      将新启动的Broker上的所有Replica设置为OnlineReplica状态,同时这些Broker会为这些Partition启动high watermark线程。
      通过partitionStateMachine触发OnlinePartitionStateChange。
  

六、Controller Failover

      Controller也需要Failover。每个Broker都会在Controller Path (/controller)上注册一个Watch。当前Controller失败时,对应的Controller Path会自动消失(因为它是Ephemeral Node),此时该Watch被fire,所有“活”着的Broker都会去竞选成为新的Controller(创建新的Controller Path),但是只会有一个竞选成功(这点由ZooKeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为ZooKeeper的Watch是一次性的,被fire一次之后即失效,所以需要重新注册。
      Broker成功竞选为新Controller后会触发KafkaController.onControllerFailover方法,并在该方法中完成如下操作:
        1、读取并增加Controller Epoch。
        2、在ReassignedPartitions Patch(/admin/reassign_partitions)上注册Watch。
        3、在PreferredReplicaElection Path(/admin/preferred_replica_election)上注册Watch。
        4、通过partitionStateMachine在Broker Topics Patch(/brokers/topics)上注册Watch。
        5、若delete.topic.enable设置为true(默认值是false),则partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上注册Watch。
        6、通过replicaStateMachine在Broker Ids Patch(/brokers/ids)上注册Watch。
        7、初始化ControllerContext对象,设置当前所有Topic,“活”着的Broker列表,所有Partition的Leader及ISR等。
        8、启动replicaStateMachine和partitionStateMachine。
        9、将brokerState状态设置为RunningAsController。
        10、将每个Partition的Leadership信息发送给所有“活”着的Broker。
        11、若auto.leader.rebalance.enable配置为true(默认值是true),则启动partition-rebalance线程。
          12、若delete.topic.enable设置为true且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
  

七、Partition重新分配

      管理工具发出重新分配Partition请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发ReassignedPartitionsIsrChangeListener,从而通过执行回调函数KafkaController.onPartitionReassignment来完成以下操作:
        1、将ZooKeeper中的AR(Current Assigned Replicas)更新为OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
        2、强制更新ZooKeeper中的leader epoch,向AR中的每个Replica发送LeaderAndIsrRequest。
        3、将RAR - OAR中的Replica设置为NewReplica状态。
        4、等待直到RAR中所有的Replica都与其Leader同步。
        5、将RAR中所有的Replica都设置为OnlineReplica状态。
        6、将Cache中的AR设置为RAR。
        7、若Leader不在RAR中,则从RAR中重新选举出一个新的Leader并发送LeaderAndIsrRequest。若新的Leader不是从RAR中选举而出,则还要增加ZooKeeper中的leader epoch。
        8、将OAR - RAR中的所有Replica设置为OfflineReplica状态,该过程包含两部分。第一,将ZooKeeper上ISR中的OAR - RAR移除并向Leader发送LeaderAndIsrRequest从而通知这些Replica已经从ISR中移除;第二,向OAR -                         RAR中的Replica发送StopReplicaRequest从而停止不再分配给该Partition的Replica。
        9、将OAR - RAR中的所有Replica设置为NonExistentReplica状态从而将其从磁盘上删除。
        10、将ZooKeeper中的AR设置为RAR。
        11、删除/admin/reassign_partition。
        12、注意:最后一步才将ZooKeeper中的AR更新,因为这是唯一一个持久存储AR的地方,如果Controller在这一步之前crash,新的Controller仍然能够继续完成该过程。

        

八、Follower从Leader Fetch数据

    Follower通过向Leader发送FetchRequest获取消息              
        每个Fetch请求都要指定最大等待时间和最小获取字节数,以及由TopicAndPartition和PartitionFetchInfo构成的Map。实际上,Follower从Leader数据和Consumer从Broker Fetch数据,都是通过FetchRequest请求完成,所以在FetchRequest结构中,其中一个字段是clientID,并且其默认值是ConsumerConfig.DefaultClientId
    Leader收到Fetch请求后,Kafka通过KafkaApis.handleFetchRequest响应该请求,响应过程如下
        1、replicaManager根据请求读出数据存入dataRead中。
        2、如果该请求来自Follower则更新其相应的LEO(log end offset)以及相应Partition的High Watermark
        3、根据dataRead算出可读消息长度(单位为字节)并存入bytesReadable中。
        4、满足下面4个条件中的1个,则立即将相应的数据返回
            4.1、Fetch请求不希望等待,即fetchRequest.macWait <= 0
            4.2、Fetch请求不要求一定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo为空
            4.3、有足够的数据可供返回,即bytesReadable >= fetchRequest.minBytes
            4.4、读取数据时发生异常
        5、若不满足以上4个条件,FetchRequest将不会立即返回,并将该请求封装成DelayedFetch。检查该DeplayedFetch是否满足,若满足则返回请求,否则将该请求加入Watch列表
                  

这是我对篇博客的理解,大部分是一样的.但是我加了自己学习的批注(括号或者小点)

目录
相关文章
|
18天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
49 2
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
130 58
|
2月前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
168 0
|
3月前
|
消息中间件 域名解析 网络协议
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
|
5月前
|
消息中间件 Kafka 程序员
Kafka面试必备:深度解析Replica副本的作用与机制
**Kafka的Replica副本是保证数据可靠性的关键机制。每个Partition有Leader和Follower副本,Leader处理读写请求及管理同步,Follower被动同步并准备成为新Leader。从Kafka 2.4开始,Follower在完全同步时也可提供读服务,提升性能。数据一致性通过高水位机制和Leader Epoch机制保证,后者更精确地判断和恢复数据一致性,增强系统容错能力。**
194 1
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
46 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
268 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
134 0

推荐镜像

更多