UpdateMetadataRequest
更新Broker
上的元数据。
Controller事件处理线程
会把事件封装成对应的请求,然后将请求写入对应的Broker
的请求阻塞队列,然后RequestSendThread
不断从阻塞队列中获取待发送的请求。
先解释下controllerBrokerStateInfo
,它就是个 POJO类,可以理解为集群每个broker
对应一个controllerBrokerStateInfo
.
然后再看下ControllerChannelManager
,从名字可以看出它管理Controller
和集群Broker
之间的连接,并为每个Broker
创建一个RequestSendThread
线程。
再小结一下
接着上个小结,事件处理线程将事件队列里面的事件处理之后再进行对应的请求封装,塞入需要通知的集群Broker
对应的阻塞队列中,然后由每个Broker
专属的requestSendThread
发送请求至对应的Broker
。
总的步骤如下图:
现在应该已经清楚Controller
大概是如何运作的,整体看起来还是生产者-消费者模型。
接下来就进入源码环节。
Controller选举流程源码分析
事件处理的流程都是一样的,只是具体处理的事件逻辑不同,我们从Controller选举
入手,来走一遍处理流程。
ControllerChangeHandler
选举会触发此handler
,可以看到直接往ControllerEventManager
的事件队列里塞。
这个QueueEvent
和ControllerEventManager
,我们先来看看是啥。不过在此之前先了解下ControllerEvent
和ControllerEventProcessor
。
ControllerEvent:事件
ControllerEventProcessor : 事件处理接口
此接口的唯一实现类是 KafkaController
。
ControllerEventManager:事件处理器
此类主要用来管理事件处理线程和事件队列。
QueuedEvent:封装了ControllerEvent的类
主要是记录了下入队时间,并且提供了事件需要调用的方法。
ControllerEventThread:事件处理线程
整体而言还是很简单的,从队列拿事件,然后处理。
KafkaController#process
就是个switch,根据事件调用对应的processXXXX
方法。
最后我们来看下元数据到底有啥和KafkaController
的一些字段。
ControllerContext:元数据
主要有运行中的Broker
、所有主题信息、主题分区副本信息等。
KafkaController
基本上关键的字段都解释了,关于状态机那一块篇幅有限,之后再说。
最后
整体的流程就是将Controller
相关操作都封装成一个个事件,然后将事件入队,由一个事件处理线程来处理,保证数据的安全(从这也可以看出,不是多线程就是好,有利有弊最终还是看场景)。
最后在通知集群中Broker
的过程是每个Broker
配备一个发送线程,因为发送是同步的,因此每个Broker
线程隔离可以防止某个Broker
阻塞而导致整体都阻塞的情况。
前面有说到Kafka Controller
强依赖 ZooKeeper
。但是现在社区打算移除 ZooKeeper
,因为ZooKeeper
不适合频繁写,并且是CP的。而且用Kafka
还需要维护ZooKeeper
集群,提升了系统的复杂度和运维难度,降低了系统的稳定性。
像位移信息,已经通过内部主题的方式保存,绕开了ZooKeeper
。
社区打算通过类 Raft 共识算法来选举Controller
,并且把元数据存储在 Log 中的方式来做。
我是 yes,从一点点到亿点点,我们下篇见。