kafka分为客户端和服务端,通常我们知道broker是服务端,而生产者和消费者作为客户端。因此在服务端就必定需要解决并发和网络IO的问题。因此不可避免需要用到SocketChannel和ServerSocketChannel,可以看到kafka就使用了ServerSocketChannel,采用Netty来解决这个问题,这里socketServer采用了1个Acceptor,多个Processor。同时将请求发送到请求通道RequestChannel中。而我们知道RequestChannel中有一个请求队列和多个响应队列,通常响应队列是3个,这个参数是在kafka的配置中配置的。通过kafka请求处理池进行请求转发,将请求转发到KafkaApis中,然后kafkaApis根据请求进行具体处理。而kafkaApis的请求处理依赖偏移量管理OffsetManager、LogManager、ReplicaManager等。
而我们知道在KafkaServer中除了上面的操作,还有Controller的操作,首先会创建KafkaController对象,这个过程中会创建一个ControllerContext,用于缓存控制器中的各种处理数据,包括选举次数、代理列表、主题列表、各个主题分区等以及状态机。然后进行KafkaController的初始化操作,在这个过程中需要首先经过ZookeeperLeaderElector选举,分为两种情况:当前控制器选举成功和选举失败,如果当前控制器选举成功则调用onControllerFailOver方法,如果当前控制器选举失败,则调用onControllerResignation方法。
选举过程中,是以leaderId是否为-1进行判断的,如果是-1,这说明没有成功选举出Leader,如果不为-1,则说明已经选举出Leader,此时会调用onBecomingLeader方法。
Kafka的健康检测是通过KafkaHealthCheck中在zookeeper上创建瞬时节点来实现的,如果存在瞬时节点,则说明健康,否者说明不健康。而在RocketMQ中,则是通过发送心跳包来进行判断是否健康的。