4.网络通信
在KafKaServer启动的时候,初始化并启动了一个SocketServer服务,用于接收客户端的连接、处理客户端请求、发送响应等。
同时创建了一个KafkaRequestHandlerPool用于管理KafkaRequestHandler。
SocketServer是基于java NIO实现的网络通信组件;
其线程模型为:
一个Acceptor线程负责接收客户端所有的连接;
N个Processor线程,每个Processor有多个Selector,负责从每个连接中读取请求;
M个Handler线程处理请求,并将产生的结果返回给Processor线程。而Handler是由KafkaRequestHanderPool管理,在Processor和Handler之间通过RequestChannel来缓冲请求,每个Handler从RequestChannel.requestQueue接受RequestChannel.Request,并把Request交由KafkaApis的Handler()方法处理,经过处理之后把对应的Response存进RequestChannel.responseQueues队列。
1.Acceptor
Acceptor是一个线程类,主要职责是监听并接受客户端的请求,建立和客户端的数据传输通道ServerSocketChannel,然后为客户端制定一个Processor;
2.Processor
也是一个线程类,主要用于从客户端读取请求数据和将相应的响应结果返回给客户端。
3.RequestChannel
RequestChannel是为了给Processor线程与Handler线程之间通信提供数据缓冲,是通信过程中Request和Response缓存的通道,是ProCSSor线程与Handler线程交换数据的地方;
4.SocketServer启动过程
在启动一个KafKa代理的时候会实例化并启动一个SocketServer服务;
SocketServer启动之后就可以通过Acceptor接收客户端的请求;
5.日志管理器
1. KafKa日志结构
KafKa消息是以主题为基本单位进行组织的,各个主题之间相互独立。
每个主题在逻辑结构上又可以由一个或者多个分区构成,分区数可以在创建主题的时候指定,也可以在主题创建之后再进行修改。
可以通过Kafka自带的用于主题管理操作的脚本kafka-topics.sh来修改某个主题的分区数,但是只能增加一个主题的分区数目,而不能减少其分区数目。
每个分区可以有一个或者多个副本,从副本中选出一个副本作为Leader,Leader负责与客户端进行读写操作,其他副本作为Follower。
生产者将消息发送到Leader副本的代理节点,而Follower副本从Leader副本同步数据;
从存储结构上,分区的每个副本在逻辑上对应一个Log对象,每个Log对象又划分为多个LogSegment,每个LogSegment包括一个日志文件和两个索引文件,其中两个索引文件分别为偏移量索引文件和时间戳索引文件。Log负责对 LogSegment的管理,在Log对象中维护了一个ConcurrentSkipListMap,保存该主题所有分区对应的所有LogSegment;
KafKa将日志文件封装为一个FileMessageSet对象,两个索引文件封装为OffsetIndex和TimeIndex对象;
数据文件用于存储消息,每条消息有一个固定长度的消息头和一个可变长度(N字节)的净荷(payload)组成,
2.日志管理器启动过程
3.日志加载以及恢复
4.日志清理
KafKa将一个主题的每个分区副本分成多个日志段文件,这样通过定时日志清理操作,将旧的日志文件及时的清理并释放出空间,以避免磁盘上的日志段文件过大而导致新的日志无法写入。同时分成多个日志段文件而不是一个文件也便于清理操作。
我们可以通过日志段的更新时间或者是日志段的大小控制进行日志的清理;
KafKa提供了日志删除(delete)和日志压缩(compact)两种清理日志的策略,通过参数cleanup.policy来指定日志清理的策略。
【日志删除】
在日志管理器启动的时候会启动一个后台定时任务用于定时删除日志段文件
【日志压缩】
另外一种日志清理的策略是日志压缩,这种策略是一种更细粒度的清理策略,它基于消息的Key,通过压缩每个Key对应的消息只保留最后一个版本的数据,该Key对应的其他版本在压缩的时候会被清除,类似数据库的更新操作;
6.副本管理器
引入副本机制使得KafKa能够在整个集群中只要保证至少有一个代理存活就不会影响整个集群的工作,从而大大提高了KafKa集群的可靠性和稳定性。
KafKa对代理的存活必须满足两个条件:
(1)一个存活的节点必须与ZooKeeper保持连接,维护与ZooKeeper的Session(通过ZooKeeper的心跳机制来实现)
(2)如果一个节点作为Follower副本,该节点必须即时的与分区的Leader副本保持消息绒布,不能落后太久;
1.分区
KafKa将一个主题在逻辑上分为一个或者多个分区,每个分区在物理存储上对应一个目录,
分区目录下存储的是该分区的日志段,包括日志数据文件和两个索引文件。每个分区又对应一个或者多个副本。
需要注意的是:分区的个数可以大于节点数,但是副本数不能大于节点数,因为副本需要分布在不同的节点上,这样才能达到备份的目的;
每个主题的某一个分区只能被同一个消费组下的其中一个消费者消费,因此我们说分区是消费并行度的基本单位。同时,对于上层应用而言,也是最小的存储单元。
尽管每个分区是有一系列有序的顺序端组成,从消费者角度来讲,我们订阅消费一个主题,也就是订阅了该主题的所有分区,当然也可以指定订阅主题的某个分区。
从生产者的角度来讲,我们可以通过指定消息的Key以及分区分配策略将消息发送到主题相应的分区当中;
KafKa将分区抽象为一个Partition对象,Partition定义了一个assignedReplicaMap引用用于保存该分区的所有副本,assignedReplicaMap是一个Pool类型的对象,并维护了该分区铜鼓的副本集合inSyncReplicas,同时Partition对象定义了分区对副本操作的方法,包括创建副本、副本角色切换、ISR列表维护以及调用日志管理器(LogManager)追加消息等。
2.副本
一个分区可以有一个或者多个副本,根据是否接受读写请求,又分为Leader副本和Follower副本,一个分区有1个Leader副本,有0个或者多个Follower副本;
Leader副本处理分区的所有的读写请求并维护自身以及Follower副本的状态信息,如LEO、HW等。Follower副本作为消费者从Leader副本拉取消息进行同步,当Leader失效的时候,通过分区Leader选举器从副本列表中选出一个副本作为新的Leader;
KafKa将副本抽象为一个Replica对象;
3.副本管理器启动过程
每个代理启动的时候都会启动一个副本管理器;
4.副本过期检查
副本管理器启动的时候启动了一个对副本过期检查的定时任务,该定时任务调用副本管理器的maybeShrinkIsr方法定期进行副本过期检查,其功能就是检查分区ISR是否需要进行收缩,即从ISR踢出与Leader数据不同步的副本;
5.追加消息
当生产者发送消息(ProduceRequest)或者消费者提交偏移量到内部主题的时候,由副本管理器的appendMessage将消息追加到相应分区的Leader副本中。
6.拉取消息
副本管理器除了负责将消息写入到Leader副本之外,同时还负责处理KafkaApis的FetchRequest请求,从分区Leader副本获取消息;
从KafKa中拉取消息的角色有两个,一个是KafKa的普通消费者,另一个就是Follower副本;
副本管理器通过FetchRequest请求的replicaId来区分拉取请求的角色;
因为每个副本有replicaId属性,即副本的replicaId总是非负数,而消费者的replicaId的值为-1;
7.副本同步过程
8.副本角色转换
当分区ISR发生变化的时候,控制器会向分区各副本对应的代理发出LeaderAndISRRequest请求,各代理的副本管理器接收到请求之后调用becomeLeaderOrFollower()方法进行处理;
9.关闭副本
关闭副本操作通常有两种方式:
第一种:将副本下线;
第二种:将副本下线并删除;
7.Handler
Handler其实是KafkaRequestHandler的简称,KafkaRequestHandler是一个线程类,负责从RequestChannel中读取请求然后交由KafkaApis处理;
8.动态配置管理器
动态配置管理器(DynamicConfiManager)主要用来对相关配置的变化进行处理,KafKa将可以通过ZooKeeper进行管理的配置划分为4个类型,称为配置类型(ConfigType)或者配置级别,每个配置类称为一个实体,这4个类型分别为Topic(主题级别)、Client(客户端级别)、User(用户级别)、和Broker(代理级别);
Topic(主题级别):监听器会调用主题级别配置处理器TopicConfigHandler进行处理
Client(客户端级别):ClientIdConfigHandler
User(用户级别):UserConfigHandler进行处理
和Broker(代理级别):通知处理器会调用代理级别的配置处理器BrokerConfigHandler对配置进行处理
9.代理健康检测
KafKa集群依赖于ZooKeeper进行管理,每个代理启动的时候都向ZooKeeper进行一系列元数据的注册,即在ZooKeeper相应目录下创建一个临时节点,当代理与ZooKeeper连接断开之后,相应的临时节点也会被删除;
KafKa健康检测机制实现类是KafkaHealthcheck,该类实例化的时候创建一个SessionExpireListener监听器,该监听器实现了IZKStateListener接口,
10. KafKa内部监控
KafKa使用Yammer Metrics进行内部状态的监控,用来收集报告KafkaServer端和客户端的metrics信息。Yammer Metrics是Yammer提供的一个Java库,用于检测JVM上相关服务运行的状态。