字节终面:说说Kakfa副本状态机的实现原理?

简介: ReplicaStateMachine是内部组件,一般用户感觉不到存在,但搞懂它,对从根本定位一些数据不一致问题大有裨益。部署3-Broker(A、B和C)Kafka集群,版本2.0.0。在这3个Broker上创建一个单分区、双副本主题。

读这源码有何用?


ReplicaStateMachine是内部组件,一般用户感觉不到存在,但搞懂它,对从根本定位一些数据不一致问题大有裨益。


部署3-Broker(A、B和C)Kafka集群,版本2.0.0。在这3个Broker上创建一个单分区、双副本主题。


若两个副本分别位于A、B,而Controller在C


当关闭A、B后,zk会显示该主题的Leader是-1,ISR为空


但若两个副本依处A、B,而Controller在B


依次关闭A、B,该主题在zk中的Leader和ISR就变成B。和上一case不符


虽非特严重问题,但毕竟是数据不一致,查看源码后,定位导致不一致原因:


在第一种情况下,Controller会调用ReplicaStateMachine,调整该主题副本的状态,进而变更Leader和ISR

第二种情况下,Controller执行Failover,但并未在新Controller组件初始化时进行状态转换,因而出现了不一致

不阅读这部分源码,就无法定位问题根因。


定义与初始化


ReplicaStateMachine:副本状态机抽象类,定义了一些常用方法(如startup、shutdown等),以及handleStateChanges

10.png



ZkReplicaStateMachine:副本状态机具体实现类,重写了handleStateChanges方法,实现了副本状态之间的状态转换。

11.png12.png





ReplicaState:副本状态集合,Kafka目前共定义了7种副本状态。


ReplicaStateMachine只需接收一个ControllerContext对象实例,ControllerContext封装了Controller端保存的所有集群元数据信息。


构造一个ZKReplicaStateMachine实例,除了ControllerContext实例,比较重要的属性还有:


KafkaZkClient对象实例


负责与ZooKeeper进行交互


ControllerBrokerRequestBatch实例


用于给集群Broker发送控制类请求(LeaderAndIsrRequest、StopReplicaRequest和UpdateMetadataRequest)


ControllerBrokerRequestBatch,将给定Request发送给指定Broker,它是如何发送请求的呢(结合ControllerBrokerStateInfo)


在副本状态转换操作的逻辑中,关键是为Broker上的副本更新信息,而这是通过Controller给Broker发送请求实现的,因此,你最好了解下这里的请求发送逻辑。


副本状态机是在何时进行初始化的?


KafkaController对象在构建时,就会初始化一个ZkReplicaStateMachine实例:

13.png



若一个Broker没被选举为Controller,它也会构建KafkaController对象实例吗?


Yes!所有Broker在启动时,都会创建KafkaController实例,也随之创建ZKReplicaStateMachine实例。但只有在Controller所在的Broker,副本状态机才会被启动:

14.png



当Broker被成功推举为Controller后,onControllerFailover方法会被调用,进而启动该Broker早已创建好的副本状态机和分区状态机。


副本状态及状态管理流程


副本状态机一旦被启动,就要管理副本状态的转换。


研究管理状态前,要先明白:


当前都有哪些状态


含义分别是什么


源码中的ReplicaState定义了如下副本状态:


15.png


ReplicaState接口及其实现对象定义了每种状态的序号,以及合法的前置状态。以OnlineReplica为例:


16.png

17.png



其validPreviousStates属性是个集合类型,说明Kafka只允许副本从这4种态变更到OnlineReplica态。


其余副本状态的代码逻辑类似,关注validPreviousStates字段即可知晓每个状态合法的前置状态。


最终完整的状态转换规则:


18.png


单向箭头表示只允许单向状态转换


双向箭头则表示转换方向可以是双向


状态管理流程


当副本对象首次被创建后,置NewReplica态

初始化后,当副本对象能够对外提供服务,状态机将其调整为OnlineReplica,并一直以该状态持续工作

若副本所在Broker关闭或不能正常工作,副本要从OnlineReplica变更为OfflineReplica。

一旦开启如删除主题这样操作,状态机会将副本状态跳转到ReplicaDeletionStarted,表明副本删除已开启:


删除成功,置ReplicaDeletionSuccessful

不满足删除条件(如所在Broker处下线状态),置ReplicaDeletionIneligible,以便重试

当副本对象被删除后,其状态变更为NonExistentReplica,副本状态机将移除该副本数据。


具体实现类:ZkReplicaStateMachine


副本状态机的具体实现类。


状态转换方法


logFailedStateChange

20.png



logInvalidTransition


logSuccessfulTransition


getTopicPartitionStatesFromZk


doRemoveReplicasFromIsr


removeReplicasFromIsr


doHandleStateChanges


handleStateChanges方法

handleStateChange处理状态的变更,对外提供状态转换操作的入口方法:


def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit

21.png



调用doHandleStateChanges执行副本状态转换

给集群中相应Broker批量发送请求

执行第1步时,会将replicas按Broker ID分组。


<主题名,分区号,副本Broker ID>表示副本对象

22.png



假设replicas为集合:


<test, 0, 0>

<test, 0, 1>

<test, 1, 0>

<test, 1, 1>)


则调用doHandleStateChanges前,会将replicas按Broker ID分组成:


Map(


- 0 -> Set(<test, 0, 0>, <test, 1, 0>),

- 1 -> Set(<test, 0, 1>, <test, 1, 1>)


之后调用doHandleStateChanges


doHandleStateChanges


25.png

尝试获取给定副本对象在Controller端元数据缓存中的当前状态:若未保存某副本对象的状态,将其初始化为NonExistentReplica态


根据不同ReplicaState中定义的合法前置状态集合及传入的目标状态(targetState),将给定副本对象集合划分成两部分:


能合法转换的副本对象集合


执行非法状态转换的副本对象集合


doHandleStateChanges为该集合类的每个副本对象记录一条错误日志


代码携带能执行合法转换的副本对象集合,进入不同代码分支。当前Kafka为副本定义7类状态,因此,共有7条分支


包括:


副本被创建时被转换到NewReplica态

副本正常工作时被转换到OnlineReplica态

副本停止服务后被转换到OfflineReplica态

分支1:转换到NewReplica

26.png


26.png


尝试从元数据缓存中,获取这些副本对象的分区信息数据,包括分区的Leader副本在哪个Broker,ISR中都有哪些副本等。


若找不到对应分区数据,直接把副本状态更新为NewReplica。否则,代码就要给该副本所在Broker发送请求,让它知道该分区的信息。还要给集群所有运行中的Broker发送请求,让它们感知到新副本加入。


分支2:转换到OnlineReplica态

副本对象正常工作时所处状态:

27.png28.png





遍历副本对象,依次执行:


获取元数据中该副本所属的分区对象及该副本的当前状态

查看当前状态是否是NewReplica

是,获取分区的副本列表,并判断该副本是否在于当前副本列表:不在,就记录错误日志并更新元数据中的副本列表

若状态不是NewReplica,说明这是已存在的副本对象,则源码会获取对应分区的详细数据,然后向该副本对象所在的Broker发送LeaderAndIsrRequest请求,令其同步获知,并保存该分区数据

将该副本对象状态变更为OnlineReplica。至此,该副本处于正常工作状态。


分支3:转换到OfflineReplica状态


30.png31.png



给所有符合状态转换的副本所在Broker,发送StopReplicaRequest,告诉这些Broker停掉对应副本

根据分区是否保存Leader信息,将副本集合划分成:有Leader副本集,无Leader副本集合。有无Leader信息并不仅仅包含Leader,还有ISR和controllerEpoch等数据

遍历有Leader子集合,向这些副本所在Broker发送LeaderAndIsrRequest请求,去更新停止副本操作之后的分区信息,再把这些分区状态置OfflineReplica

遍历无Leader子集合,执行与上步类似操作。只是对无Leader,因未执行任何Leader选举操作,所以给这些副本所在Broker发送的不是LeaderAndIsrRequest请求,而是UpdateMetadataRequest请求,显式告知它们更新对应分区的元数据,再把副本状态置OfflineReplica

把副本状态变更为OfflineReplica=停止对应副本+更新远端Broker元数据


总结

Kafka的副本状态机实现原理及源码:


副本状态机:ReplicaStateMachine是Kafka Broker端源码中控制副本状态流转的实现类。每个Broker启动时都会创建ReplicaStateMachine实例,但只有Controller组件所在的Broker才会启动它。

副本状态:当前,Kafka定义了7类副本状态。同时,它还规定了每类状态合法的前置状态。

handleStateChanges:用于执行状态转换的核心方法。底层调用doHandleStateChanges方法,以7路case分支的形式穷举每类状态的转换逻辑。

32.png

目录
相关文章
|
2月前
|
图形学
计算机辅助设计的基本原理与应用 - 副本
计算机辅助设计的基本原理与应用 - 副本
|
8月前
|
消息中间件 存储 缓存
字节面试官狂问我:kafka 是什么?有什么作用?
目录一览 万字19图带走Kafka 友情提示:内容太多,先码再看 1.什么是消息中间件? 2.kafka 是什么?有什么作用? 3.kafka 的架构是怎么样的?
|
8月前
|
消息中间件 算法 容灾
7年工作经验面试被问:谈谈你对Kafka副本Leader选举原理的理解?
一位7年工作经验的小伙伴,面试被问到这样一道题,说:”谈谈你对Kafka副本Leader选举原理的理解“。当时,他想,这Kafka用的不就是Zookeeper 的选举吗?难道Kafka又自己搞了一套。没错,这回Kafka自己造了一个轮子。 那么今天,我给大家来聊一聊我对Kafka副本Leader选举原理的理解。
63 1
|
10月前
|
存储 XML Java
何为消息持久化?
持久化(Persistence),即把数据(如内存中的对象)保存到可永久保存的存储设备中(如磁盘)。持久化的主要应用是将内存中的对象存储在关系型的数据库中,当然也可以存储在磁盘文件中、XML数据文件中等等。
72 0
|
12月前
|
消息中间件 缓存 算法
腾讯三面:说说Kafka的分区状态机的实现原理?
1 我为何读这源码? PartitionStateMachine,分区状态机负责管理Kafka分区状态的转换,类似ReplicaStateMachine。
84 0
|
消息中间件 缓存 数据库
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
382 0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
|
存储 缓存 运维
第七节:X-Paxos 三副本与高可用(一)|学习笔记
快速学习第七节:X-Paxos 三副本与高可用(一)
254 0
第七节:X-Paxos 三副本与高可用(一)|学习笔记
|
存储 容灾 关系型数据库
第七节:X-Paxos 三副本与高可用(二)|学习笔记
快速学习第七节:X-Paxos 三副本与高可用(二)
75 0
第七节:X-Paxos 三副本与高可用(二)|学习笔记
|
存储 缓存 AliSQL
第七节:X-Paxos 三副本与高可用(四)|学习笔记
快速学习第七节:X-Paxos 三副本与高可用(四)
90 0
第七节:X-Paxos 三副本与高可用(四)|学习笔记
|
存储 监控 开发者
第七节:X-Paxos 三副本与高可用(三)|学习笔记
快速学习第七节:X-Paxos 三副本与高可用(三)
88 0
第七节:X-Paxos 三副本与高可用(三)|学习笔记