1.KafKa核心组件
KafKa的核心功能模块:
- 延迟操作组件;
- 控制器;
- 协调器;
- 网络通信;
- 日志管理器;
- 副本管理器;
- 动态配置管理器
- 心跳检测;
1.延迟操作组件
1.DelayedOperation
KafKa将一些不立即执行而要等待满足一定条件之后才触发完成的操作称为延迟操作,并将这类操作定义为一个抽象类DelayedOperation,具有延迟操作的类继承DelayedOperation
2.DelayedOperationPurgatory
https://www.jianshu.com/p/bbb1c4f45b4e
3.DelayedProduce
DelayedProduce的作用就是协助副本管理器在acks为-1的场景的时候,延迟回调responseCallback向生产者做出响应。具体表现在当消息追加到分区Leader副本之后,该分区各个Follower完成了与Leader副本消息同步之后再回调responseCallback给生产者;
4.DelayedFetch
DelayedProduce是在ProduceRequest处理中对生产者发送消息的延迟操作,自然DelayedFetch就是在FetchRequest处理的时候进行的延迟操作。在KafKa中只有消费者或者Follower副本会发起FetchRequest请求。
5.DelayedJoin
DelayedJoin是协助组协调器在消费组准备平衡操作的时候进行相应的处理。当消费组的状态转换为PreparingRebalance时候,即准备进行平衡操作,在组协调器prepareRebalance()方法中会创建一个DelayedJoin对象,并交由DelayedOperationPurgatory负责监视管理;
在消费组进行平衡操作时之所以需要使用DelayedJoin处理,是为了让组协调器等待当前消费组下所有的消费者都请求加入到消费组,即发起了JoinGroupRequest请求。每次组协调器处理完JoinGroupRequest的时候都会检测DelayedJoin是否满足了完成执行的条件;
6.DelayedHearbeat
DelayedHearbeat用于协调消费者与组协调器心跳检测相关的延迟操作,DelayedHearbeat相关功能的实现是调用GroupCoordinator的相应方法
7.DelayedCreateTopics
在创建主题的时候,需要为主题的每个分区分配到Leader之后,才调用回调函数将创建主题结果返回给客户端。
DelayedCreateTopics延迟操作是等待该主题的所有分区副本分配到Leader或者等待超时之后调用回调函数返回给客户端;
2.控制器
http://blog.csdn.net/zhanglh046/article/details/72821995
在启动KafKa集群的时候,每一个代理都会实例化并且启动一个KafKaController,并将该代理的brokerId注册到ZooKeeper的相应节点中。KafKa集群中各代理会根据选举机制选出其中一个代理作为Leader,即Leader控制器。当控制器发生宕机之后其他代理再次竞选出新的控制器。
控制器负责主题的创建与删除、分区和副本的管理以及代理故障转移处理等。
当一个代理被选举成为控制器的时候,该代理对应的KafkaController就会注册(Register)控制器相应额操作权限,同时标记自己是Leader。当代理不再成为控制器的时候,就要注销掉DeRegister相应的权限;
实现这些功能的程序入口是KafKa核心core模块下的kafka.controller.KafkaController类。
1.控制器初始化
每个代理在启动的时候都会实例化并启动一个KafkaController,KafKaController实例化的时候主要完成如下的工作;
2.控制器选举过程
每个代理启动的时候就会创建一个KafKaController实例,当KafKaController启动之后就会从所有的代理中选择一个代理作为控制器,控制器是所有代理的Leader,因此这里也称之为Leader选举。
除了在启动的时候会导致选举之外,当控制器所在的代理发生故障或者ZooKeeper通过心跳机制感知控制器与自己的连接Session过期的时候,也会再次从所有代理中选出一个节点作为集群的控制器;
KafKa控制器的选举依赖于ZooKeeper。在集群的整个运行过程中,代理在ZooKeeper不同节点上注册相应的监听器。各监听器各司其职,当所有监听的节点状态发生变化的时候就会触发相关的函数进行处理。
KafKa控制器选举的核心思想就是各代理通过争抢/controller节点请求写入到自身的信息,先成功写入的代理即为Leader。
3.故障转移
触发控制器进行选举有三种情况:
1.在控制器启动的时候;
2.当控制器发生故障转移的时候;
3.当心跳检测超时的时候;
可以说控制器故障转移的本质就是控制权的转移,而控制权的转移也就是重新选出新的控制器。
4.代理上线和下线
1.代理上线
当有新的代理上线的时候,在代理启动的时候会向ZooKeeper的/brokers/ids节点下注册该代理的brokerId,此时会被副本状态机在ZooKeeper所注册的BrokerChangerListener监听器监听到该节点信息的变化
2.代理下线
当代理下线的时候,该代理在ZooKeeper的/brokers/ids节点注册的与该代理对应的节点将被删除,此时BrokerChangerListener的handleChildCHANGE方法将会被触发;
5.主题管理
控制器负责对主题、分区副本的管理操作。
分区和副本是主题的固有属性,因此在讲解控制器对主题管理的时候将同时讲解控制器对分区副本创建以及删除的管理操作。
控制器对分区、副本的管理在逻辑上体现在分区状态机以及副本状态机对ZooKeeper的/brokers/topics节点以及子节点注册的一系列监听器上;
1.创建主题:
当创建一个主题的时候,无论是通过KafKa API还是通过命令行创建主题,同步返回创建主题成功的时候,其实仅仅都是在ZooKeeper的/brokers/topics节点成功创建了该主题对应的子节点。而服务端创建主题的操作是异步交由控制器去完成的。
2.删除主题:
客户端执行删除主题的操作的时候仅仅是在ZooKeeper的/admin/delete_topics路径下创建一个与待删除主题同名的节点,返回该主题被标记为删除,保证本步操作成功执行的前提是配置项delete.topic.enable的值被设置为true。
例如,删除主题“topic-foo”的节点,则客户端执行删除操作的时候会在/admin/delete_topics路径下创建一个名为“topic-foo” 的节点。而实际删除主题的逻辑是异步交由KafKa控制器负责执行的;
6.分区管理
KafKa控制器(Leader控制器)对分区的管理包括对分区的创建以及删除的管理,分区Leader选举的管理,分区自动平衡、分区副本重新分配的管理等等。
分区平衡
在onControllerFailover操作的时候会启动一个分区自动平衡的定时任务,该定时任务会定期的检查集群上面各代理分布是否失去了平衡,该过程是通过调用控制器的checkAndTriggerPartitionRebalance方法完成的;
分区自动平衡需要保证分区的副本分配在不提供的代理节点上
分区充分配
3.协调器
KafKa提供了消费者协调器(ConsumerCoordinator)、组协调器(GroupCoordinator)以及任务管理协调器(WorkCoordinator)3协调器(coordinator)。其中任务管理协调器被KafKa Connect用于对works的管理。
鉴于旧版高级消费者存在的问题,新版消费者进行了重新设计,为了解决消费者依赖ZooKeeper所带来的问题,KafKa在服务端引入了组协调器(GroupCoordinator),每个KafKaServer启动的时候都会创建一个GroupCoordinator实例,用于管理部分消费组和该消费组下每个消费者的消费偏移量。
同时在客户端引入了消费者协调器(ConsumerCoordinator),每个KafKaConsumer实例化的时候会实例化一个ConsumerCoordinator对象,消费者协调器负责同一个消费组下各消费者与服务端组协调器之间的通信;
1.消费者协调器
消费者协调器(ConsumerCoordinator)是KafkaConsumer的一个成员变量,该KafKaConsumer通过消费者协调器与服务端的组协调器进行通信。由于消费者协调器是KafKaConsumer私有的,因此消费者协调器中存储的信息也只有与之对应的消费者可见,不同消费者是看不到彼此的消费者协调器中的信息的。
消费者协调器负责处理更新消费者缓存的Metadata请求,负责向组协调器发起加入消费组的请求,负责对本消费者加入消费组前、后相应的处理,负责请求离开消费组(如当消费者取消订阅的时候),还负责向组协调器发送提交消费偏移量的请求。并通过一个心跳检测定时任务来检测组协调器的运行状况,或者是让组协调器感知自己的运行状况。
同时Leader消费者的消费者协调器还负责执行分区的分配,当消费者协调器向组协调器请求加入消费组之后,组协调器会为同一个组下的消费者选出一个Leader;
通过这种方式,将分区分配的职责交由客户端自己处理,从而减轻了服务端的负担;
2.组协调器
组协调器(GroupCoordinator)负责对其管理的组员提交的相关请求进行处理,这里的组员即消费者。它负责管理与消费者之间建立连接,并从与之连接的消费者中选出一个消费者作为Leader消费者,Leader消费者负责消费者分区的分配,在SyncGroupRequest请求时发送给组协调器,组协调器会在请求处理后返回响应时下发给其管理的所有消费者。同时组协调器还管理与之连接的消费者的消费偏移量的提交,将每个消费者消费偏移量保存到KafKa的内部主题当中,并通过心跳检测来检测消费者与自己的连接状态;
组协调器依赖的组件
每一个KafkaServer启动的时候都会实例化并启动一个组协调器,每个组协调器负责一部分消费组的管理。
消费者入组过程
消费偏移量管理
新版的KafKaConsumer将消费偏移量保存到KafKa的一个内部主题中,当消费者正常运行或者进行平衡操作的时候都需要向组协调器提交当前的消费偏移量。