Kafka
Kafka是一个分布式、支持分区的、多副本的, 基于ZooKeeper 协调的分布式消息系统。
它最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于Hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,Web/Nginx日志、访问日志,消息服务等等,用 Scala语言编写 。属于Apache基金会的顶级开源项目。
先看一下Kafka的架构图 :
Kafka的核心概念
在Kafka中有几个核心概念:
- Broker :消息中间件处理节点,一个Kafka节点就是一个Broker,一个或者多个Broker可以组成一个Kafka集群
- Topic :Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
- Producer :消息生产者,向Broker发送消息的客户端
- Consumer :消息消费者,从Broker读取消息的客户端
- ConsumerGroup :每个Consumer属于一个特定的ConsumerGroup,一条消息可以被多个不同的ConsumerGroup消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息
- Partition :物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的
- Leader :每个Partition有多个副本,其中有且仅有一个作为Leader,Leader是负责数据读写的Partition。
- Follower :Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,Leader会把这个Follower从 ISR列表 中删除,重新创建一个Follower。
- Offset :偏移量。Kafka的存储文件都是按照offset.kafka来命名,用Offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。
可以这么来理解Topic,Partition和Broker:
一个Topic,代表逻辑上的一个业务数据集,比如订单相关操作消息放入订单Topic,用户相关操作消息放入用户Topic,对于大型网站来说,后端数据都是海量的,订单消息很可能是非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在Topic内部划分多个Partition来分片存储数据,不同的Partition可以位于不同的机器上,相当于 分布式存储 。每台机器上都运行一个Kafka的进程Broker。
Kafka核心总控制器Controller
在Kafka集群中会有一个或者多个Broker,其中有一个Broker会被选举为控制器(Kafka Controller),可以理解为 Broker-Leader ,它负责管理整个 集群中所有分区和副本的状态。
Partition-Leader
Controller选举机制
在Kafka集群启动的时候,选举的过程是集群中每个Broker都会尝试在ZooKeeper上创建一个 /controller
临时节点,ZooKeeper会保证有且仅有一个Broker能创建成功,这个Broker就会成为集群的总控器Controller。
当这个Controller角色的Broker宕机了,此时ZooKeeper临时节点会消失,集群里其他Broker会一直监听这个临时节 点,发现临时节点消失了,就竞争再次创建临时节点,就是我们上面说的选举机制,ZooKeeper又会保证有一个Broker成为新的Controller。具备控制器身份的Broker需要比其他普通的Broker多一份职责,具体细节如下:
- 监听Broker相关的变化 。为ZooKeeper中的
/brokers/ids/
节点添加BrokerChangeListener
,用来处理Broker增减的变化。 - 监听Topic相关的变化 。为ZooKeeper中的
/brokers/topics
节点添加TopicChangeListener
,用来处理Topic增减的变化;为ZooKeeper中的/admin/delete_topics
节点添加TopicDeletionListener
,用来处理删除Topic的动作。 - 从ZooKeeper中读取获取当前所有与Topic、Partition以及Broker有关的信息并进行相应的管理 。对于所有Topic所对应的ZooKeeper中的
/brokers/topics/
节点添加PartitionModificationsListener
,用来监听Topic中的分区分配变化。 - 更新集群的元数据信息,同步到其他普通的Broker节点中
Partition副本选举Leader机制
Controller感知到分区Leader所在的Broker挂了,Controller会从ISR列表(参数 unclean.leader.election.enable=false
的前提下)里挑第一个Broker作为Leader(第一个Broker最先放进ISR列表,可能是同步数据最多的副本),如果参数unclean.leader.election.enable
为true,代表在ISR列表里所有副本都挂了的时候可以在ISR列表以外的副本中选Leader,这种设置,可以提高可用性,但是选出的新Leader有可能数据少很多。副本进入ISR列表有两个条件:
- 副本节点不能产生分区,必须能与ZooKeeper保持会话以及跟Leader副本网络连通
- 副本能复制Leader上的所有写操作,并且不能落后太多。(与Leader副本同步滞后的副本,是由
replica.lag.time.max.ms
配置决定的,超过这个时间都没有跟Leader同步过的一次的副本会被移出ISR列表)
消费者消费消息的Offset记录机制
每个Consumer会定期将自己消费分区的Offset提交给Kafka内部Topic:consumer_offsets,提交过去的时候,key是consumerGroupId+topic+分区号
,value就是当前Offset的值,Kafka会定期清理Topic里的消息,最后就保留最新的那条数据。
因为_consumer\_offsets
可能会接收高并发的请求,Kafka默认给其分配50个分区(可以通过 offsets.topic.num.partitions
设置),这样可以通过加机器的方式抗大并发。
消费者Rebalance机制
Rebalance就是说 如果消费组里的消费者数量有变化或消费的分区数有变化,Kafka会重新分配消费者与消费分区的关系 。比如consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他。
注意:Rebalance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,Kafka不会进行Rebalance。
如下情况可能会触发消费者Rebalance:
- 消费组里的Consumer增加或减少了
- 动态给Topic增加了分区
- 消费组订阅了更多的Topic
Rebalance过程中,消费者无法从Kafka消费消息,这对Kafka的TPS会有影响,如果Kafka集群内节点较多,比如数百 个,那重平衡可能会耗时极多,所以应尽量避免在系统高峰期的重平衡发生。
Rebalance过程如下
当有消费者加入消费组时,消费者、消费组及组协调器之间会经历以下几个阶段:
第一阶段:选择组协调器
组协调器GroupCoordinator:每个consumer group都会选择一个Broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者Rebalance。consumer group
中的每个consumer启动时会向Kafka集群中的某个节点发送FindCoordinatorRequest
请求来查找对应的组协调器GroupCoordinator
,并跟其建立网络连接。
组协调器选择方式:通过如下公式可以选出consumer消费的Offset要提交到_consumer\_offsets
的哪个分区,这个分区Leader对应的Broker就是这个consumer group
的coordinator公式:
hash(consumer group id) % 对应主题的分区数
第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的GroupCoordinator
之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator
发送JoinGroupRequest
请求,并处理响应。然后GroupCoordinator
从一个consumer group
中选择第一个加入group的consumer作为Leader(消费组协调器),把consumer group
情况发送给这个Leader,接着这个Leader会负责制定分区方案。
第三阶段(SYNC GROUP)
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的Leader Broker进行网络连接以及消息消费。
消费者Rebalance分区分配策略
主要有三种Rebalance的策略:range 、 round-robin 、 sticky 。默认情况为range分配策略 。
假设一个主题有10个分区(0-9),现在有三个consumer消费:
range策略: 按照分区序号排序分配 ,假设n=分区数/消费者数量 = 3
, m=分区数%消费者数量 = 1
,那么前 m 个消 费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区。比如分区0~ 3给一个consumer,分区4~ 6给一个consumer,分区7~9给一个consumer。
round-robin策略: 轮询分配 ,比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、 8给一个consumer
sticky策略: 初始时分配策略与round-robin
类似,但是在rebalance的时候,需要保证如下两个原则:
- 分区的分配要尽可能均匀 。
- 分区的分配尽可能与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标 。这样可以最大程度维持原来的分区分配的策略。比如对于第一种range情况的分配,如果第三个consumer挂了,那么重新用sticky策略分配的结果如下:consumer1除了原有的0~ 3,会再分配一个7 consumer2除了原有的4~ 6,会再分配8和9。
Producer发布消息机制剖析
1、写入方式
producer采用push模式将消息发布到broker,每条消息都被append到patition中,属于顺序写磁盘( 顺序写磁盘 比 随机写 效率要高,保障 kafka 吞吐率 )。
2、消息路由
producer发送消息到broker时,会根据分区算法选择将其存储到哪一个partition。其路由机制为:
hash(key)%分区数
3、写入流程
- producer先从ZooKeeper的 "
/brokers/…/state
" 节点找到该partition的leader - producer将消息发送给该leader
- leader将消息写入本地log
- followers从leader pull消息,写入本地log后向leader发送ACK
- leader收到所有ISR中的replica的ACK后,增加HW(high watermark,最后commit的offset)并向producer发送ACK
HW与LEO
HW俗称高水位 ,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW, consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状 态。
对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW, 此时消息才能被consumer消费。
这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broker的读取请求,没有HW的限制。
日志分段存储
Kafka一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段存储的, 每个段的消息都存储在不一样的log文件里,Kafka规定了一个段位的log文件最大为1G,做这个限制目的是为了方便把log文件加载到内存去操作:
1 # 部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件, 2 # 如果要定位消息的offset会先在这个文件里快速定位,再去log文件里找具体消息 3 00000000000000000000.index 4 # 消息存储文件,主要存offset和消息体 5 00000000000000000000.log 6 # 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset到timeindex文件, 7 # 如果需要按照时间来定位消息的offset,会先在这个文件里查找 8 00000000000000000000.timeindex 9 10 00000000000005367851.index 11 00000000000005367851.log 12 00000000000005367851.timeindex 13 14 00000000000009936472.index 15 00000000000009936472.log 16 00000000000009936472.timeindex
这个9936472之类的数字,就是代表了这个日志段文件里包含的起始 Offset,也就说明这个分区里至少都写入了接近1000万条数据了。Kafka Broker有一个参数,log.segment.bytes
,限定了每个日志段文件的大小,最大就是1GB。
一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做log rolling
,正在被写入的那个日志段文件,叫做active log segment
。
最后附一张ZooKeeper节点数据图