分区(Parition)
介绍副本之前,我们首先需要介绍分区,因为副本依赖于分区,是分区的更深一步的划分。
分区是什么
Parition是物理上的概念,每个Topic包含一个或多个Partition,每个分区可以分布到不同的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
对Kafka集群来说,分区可以实现高伸缩性,以及负载均衡,动态调节的能力。
分区写入策略
所谓分区写入策略,即是生产者将数据写入到kafka主题后,kafka如何将数据分配到不同分区中的策略。
常见的有三种策略,轮询策略,随机策略,和按键保存策略。其中轮询策略是默认的分区策略,而随机策略则是较老版本的分区策略,不过由于其分配的均衡性不如轮询策略,故而后来改成了轮询策略为默认策略。
轮询策略
所谓轮询策略,即按顺序轮流将每条数据分配到每个分区中。
举个例子,假设主题test有三个分区,分别是分区A,分区B和分区C。那么主题对接收到的第一条消息写入A分区,第二条消息写入B分区,第三条消息写入C分区,第四条消息则又写入A分区,依此类推。
轮询策略是默认的策略,故而也是使用最频繁的策略,它能最大限度保证所有消息都平均分配到每一个分区。除非有特殊的业务需求,否则使用这种方式即可。
随机策略
随机策略,也就是每次都随机地将消息分配到每个分区。其实大概就是先得出分区的数量,然后每次获取一个随机数,用该随机数确定消息发送到哪个分区。
在比较早的版本,默认的分区策略就是随机策略,但其实使用随机策略也是为了更好得将消息均衡写入每个分区。但后来发现对这一需求而言,轮询策略的表现更优,所以社区后来的默认策略就是轮询策略了。
按key保存策略
按键保存策略,就是当生产者发送数据的时候,可以指定一个key,计算这个key的hashCode值,按照hashCode的值对不同消息进行存储。
至于要如何实现,那也简单,只要让生产者发送的时候指定key就行。欸刚刚不是说默认的是轮询策略吗?其实啊,kafka默认是实现了两个策略,没指定key的时候就是轮询策略,有的话那激素按键保存策略了。
上面有说到一个场景,那就是要顺序发送消息到kafka。前面提到的方案是让所有数据存储到一个分区中,但其实更好的做法,就是使用这种按键保存策略。
让需要顺序存储的数据都指定相同的键,而不需要顺序存储的数据指定不同的键,这样一来,即实现了顺序存储的需求,又能够享受到kafka多分区的优势,岂不美哉。
切记分区是实现负载均衡以及高吞吐量的关键,所以一定要在生产者这一端就要考虑好合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,从而导致下游数据消费的性能下降的问题。
副本(Parition)
副本是什么
在kafka中,每个主题可以有多个分区,每个分区又可以有多个副本。多个副本中仅有一个作为“leader”节点,其他副本作为“followers”。leader负责处理消息的读和写,followers则去同步leader中的消息。如果leader节点宕机了,followers中的一台则会进行选举后自动成为leader。
副本数量不能超过kafka集群节点数量,每个节点最多放置一个分区的一个副本。通过这样的机制实现了高可用,当leader节点挂掉后,其他follower副本也能迅速”转正“,开始对外提供服务。
在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。
AR、ISR、OSR
- AR:Assigned Replicas 指当前分区中的所有副本。
- ISR:In-Sync Replicas 副本同步队列。ISR中包括Leader和Foller。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.message(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务器是否可以加入ISR副本队列,在0.10版本之后移除了replica.lag.max.message(延迟条数)参数,防治服务频繁的进出队列。任意一个维度超过阈值都会把Follower踢出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
- OSR:(Out-of-Sync Replicas)非同步副本队列。与leader副本同步滞后过多的副本(不包括leader副本)组成OSR。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会(不过这个原则也可以通过修改unclean.leader.election.enable参数配置来改变)。unclean.leader.election.enable 为true的话,意味着非ISR集合的broker 也可以参与选举,这样就有可能发生数据丢失和数据不一致的情况,Kafka的可靠性就会降低;而如果unclean.leader.election.enable参数设置为false,Kafka的可用性就会降低。
leader副本选举
发生leader副本选举的情况:
- 创建主题
- 增加分区
- 分区下线(分区中原先的leader副本下线,此时分区需要选举一个新的leader上线来对外提供服务)
- 分区重分配
分区leader副本的选举由Kafka控制器负责具体实施。主要过程如下:
- 从Zookeeper中读取当前分区的所有ISR(in-sync replicas)集合。
- 调用配置的分区选择算法选择分区的leader。
分区副本分为ISR(同步副本)和OSR(非同步副本),当leader发生故障时,只有“同步副本”才可以被选举为leader。选举时按照集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。
同时kafka支持OSR(非同步副本)也参加选举,Kafka broker端提供了一个参数unclean.leader.election.enable,用于控制是否允许非同步副本参与leader选举;如果开启,则当 ISR为空时就会从这些副本中选举新的leader,这个过程称为 Unclean leader选举。
可以根据实际的业务场景选择是否开启Unclean leader选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。一般建议是关闭Unclean leader选举,因为通常数据的一致性要比可用性重要。
OSR、ISR的区分标准
ISR: 与leader副本保持同步状态的副本集合
OSR: 与leader副本同步滞后过多的副本(不包括leader副本)组成OSR
其中是否同步的区分标准主要有以下两个参数:
replica.lag.max.message(延迟条数)
此参数在0.10版本之后移除
表示当前follwer副本落后leader副本的消息数量超过了这个参数的值,具体follower副本的结束位移(以下皆称LEO, log end offset)与该leader副本LEO的差值判定。
如果差值超过参数值,那么leader就会把follower从ISR中删除。假设设置replica.lag.max.messages=4,那么如果producer一次传送至broker的消息数量都小于4条时,因为在leader接受到producer发送的消息之后而follower副本开始拉取这些消息之前,follower落后leader的消息数不会超过4条消息,故此没有follower移出ISR,所以这时候replica.lag.max.message的设置似乎是合理的。
但是producer发起瞬时高峰流量,producer一次发送的消息超过4条时,也就是超过replica.lag.max.messages,此时follower都会被认为是与leader副本不同步了,从而被踢出了ISR。但实际上这些follower都是存活状态的且没有性能问题。那么在之后追上leader,并被重新加入了ISR。于是就会出现它们不断地剔出ISR然后重新回归ISR,这无疑增加了无谓的性能损耗。而且这个参数是broker全局的,即所有topic都受到这个参数的影响。假设集群中有两个topic: t1和t2。假设它们的流量差异非常巨大,如果阈值设置太大,则影响真正“落后”follower的移除;设置的太小了,则会导致follower的频繁进出。无法给定一个合适的replica.lag.max.messages的值,故此参数在0.10版本之后移除。
replica.lag.time.max.ms(延迟时间)
只要follower副本每隔指定时间都能发送FetchRequest请求给leader,那么该副本就不会被标记成dead从而被踢出ISR。
kafka副本数据的同步过程
生产者在发布消息到某个partition时,先通过ZooKeeper找到该partition的leader副本,然后将该消息发送到leader副本节点。leader会将该消息写入本地Log。每个follower都从leader节点pull数据。这种方式上,follower存储的数据顺序与leader保持一致。follower在收到该消息并写入其Log后,向leader发送ACK。一旦Leader收到了ISR中的所有节点的ACK,该消息就被认为已经commit了,Leader将增加HW(HighWatermark)并且向Producer发送ACK。
LEO和HW
Kafka副本同步过程中,如何确定副本的同步进度以及同步状态,就需要用到HW(High Watermark)机制。
- LEO (Log End Offset),标识当前日志文件中下一条待写入的消息的offset。上图中offset为9的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的offset值加1.分区 ISR 集合中的每个副本都会维护自身的 LEO ,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
Kafka有两套follower副本LEO(明白这个是搞懂后面内容的关键,因此请多花一点时间来思考):1. 一套LEO保存在follower副本所在broker的副本管理机中;2. 另一套LEO保存在leader副本所在broker的副本管理机中——换句话说,leader副本机器上保存了所有的follower副本的LEO。
- HW:副本中最新一条已提交消息的位移。每个replica都有HW值,但仅仅leader中的HW才能作为标示信息,标示着consumer可以消费的消息区间,对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。
leader epoch
Kafka 0.11引入了leader epoch来取代HW值。Leader端多开辟一段内存区域专门保存leader的epoch信息。所谓leader epoch实际上是一对值:(epoch,offset)。epoch表示leader的版本号,从0开始,当leader变更过1次时epoch就会+1,而offset则对应于该epoch版本的leader写入第一条消息的位移。因此假设有两对值:
(0, 0)
(1, 120)
则表示第一个leader从位移0开始写入消息;共写了120条[0, 119];而第二个leader版本号是1,从位移120处开始写入消息。
leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件中。
当leader写底层log时它会尝试更新整个缓存——如果这个leader首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为leader时会查询这部分缓存,获取出对应leader版本的位移,这就不会发生数据不一致和丢失的情况。