生产者分区策略
Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。
分区的原因
为什么要分区呢?对于分布式系统的三高我们已经很熟悉了,我们再来强调一下:
- 高可扩展:方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic,又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了
- 高并发:可以提高并发,因为可以以 Partition 为单位读写了,可以并发的往一个Topic的多个Partion中发送消息
- 高可用:当然有了高可用和高可扩展了,我们还希望整个集群稳定,并发的情况下消息不会有丢失现象,为了保证数据的可靠性,我们每个分区都有多个副本来保证不丢消息,如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求
如上图所示展示的,我们分布式集群的特性才能体现出来,其实不光是Kafka,所有的分布式中间件,都需要满足以上的特性。
分区的原则
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。producer 发送消息到 broker 时,既然分区了,我们怎么知道生产者的消息该发往哪个分区呢?producer 会根据分区算法选择将其存储到哪一个 partition。
从代码结构里我们可以看到实际上可以归纳为三种方法,也就是三种路由机制,决定消息被发往哪个分区,分别是:
- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值
- 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法【轮询算法】。
可以说分区策略对于Kafka来说是三高机制的基础,有了分区才能实现Kafka的高可扩展,在这样的构建模型之上我们来看看基于分区机制,Kafka如何实现数据可靠性【高并发】和故障转移【高可用】。
ACK应答机制
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
那么基于此,因为在Kafka集群中一个分区会存在多个副本,leader负责读写,但同时需要把消息可靠的同步,防止消息丢失和遗漏或者重复。基于性能考虑,同时存在多种ACK应答机制。
Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:
- request.required.acks = 0,producer 不停向leader发送数据,而不需要 leader 反馈成功消息,这种情况下数据传输效率最高,但是数据可靠性确是最低的。可能在发送过程中丢失数据,可能在 leader 宕机时丢失数据。【传输效率最高,可靠性最低】
- request.required.acks = 1,这是默认情况,即:producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的其它副本还没有来得及拉取该消息,如果此时 leader 宕机了,那么此次发送的消息就会丢失。【传输效率中,可靠性中】
- request.required.acks = -1(all),producer 发送数据给 leader,leader 收到数据后要等到 ISR 列表中的所有副本都同步数据完成后(强一致性),才向生产者返回成功消息,如果一直收不到成功消息,则认为发送数据失败会自动重发数据。这是可靠性最高的方案,当然,性能也会受到一定影响。【传输效率低,可靠性高】,同时如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复
当 request.required.acks = -1时需要注意,如果要提高数据的可靠性,在设置 request.required.acks=-1 的同时,还需参数 min.insync.replicas
配合,如此才能发挥最大的功效。min.insync.replicas 这个参数用于设定 ISR 中的最小副本数,默认值为1,当且仅当 request.required.acks 参数设置为-1时,此参数才生效。当 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required
。通过将参数 min.insync.replicas 设置为 2,当 ISR 中实际副本数为 1 时(只有leader),将无法保证可靠性,因为如果发送ack后leader宕机,那么此时该条消息就会被丢失,所以应该拒绝客户端的写请求以防止消息丢失。在-1策略下有三个问题单独讨论一下:
副本同步策略
我们知道当 request.required.acks=-1时需要ISR中的全部副本都同步完成,才返回ACK,但是其实在最终确定这个方案之前还有一些别的方案,讨论核心是那么到底多少foller副本同步完成,才发送ack呢?
现有的两种方案我们选择了第二种,第一种占用的机器资源过多,造成了大量的数据冗余,而网络延迟对于Kafka的影响并不大。
ISR选举策略
采用全量副本同步方案后,我们发送ack的时机确定如下:leader 收到数据,所有 follower 都开始同步数据,但是设想如下情况:有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么解决呢?我们引入ISR的概念
- 所有的副本(replicas)统称为 Assigned Replicas,即 AR
- ISR 是 AR 中的一个子集,由 leader 维护 ISR列表,follower 从 leader 同步数据有一些延迟(由参数 replica.lag.time.max.ms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中
- AR=ISR+OSR,也就是所有副本=可用副本+备用副本。
- ISR列表包括:leader + 与leader保持同步的followers,Leader 发生故障之后,会从 ISR 中选举新的 leader
在这种机制下,ISR始终是动态保持稳定的集群,消息来了之后,leader先读取,然后推送到各个follwer里,保证ISR中各个副本处于同步状态,只要所有ISR中的follwer都同步完成即可发布ACK,leader挂掉后,立即能从ISR中选举新的leader来处理消息。
Exactly Once语义
对于一些非常重要的信息,消费者要求数据既不重复也不丢失,即 Exactly Once 语义。其实以上讨论的三种策略可以如此归类语义:
- 将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义,极容易丢失数据。
- 将服务器 ACK 级别设置为 1,可以理解为碰运气语义,正常情况下,leader不宕机且刚好宕机前将数据同步给了副本的话不会丢失数据,其它情况就会造成数据的丢失。
- 将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义,At Least Once 可以保证数据不丢失,但是不能保证数据不重复
顾名思义,我们一定是需要在不丢数据的基础上去去重,在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
幂等性【partion Exactly Once】
0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。即:At Least Once + 幂等性 = Exactly Once
要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
生产者事务【topic Exactly Once】
为了实现跨分区跨会话的事务以及防止PID重启造成的数据重复,需要引入一个Topic全局唯一的 Transaction ID,并将 Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的TransactionID 获得原来的 PID。为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
故障转移机制
在数据可靠性保障策略中我们了解到如何通过分区和副本,以及动态的ISR和ACK机制来确保消息的可靠,那么接下来深入探讨下,故障发生的时候,我们如何将集群恢复正常?首先需要明确两个概念:LEO和HW:
- Base Offset:是起始位移,该副本中第一条消息的offset,如下图,这里的起始位移是0,如果一个日志文件写满1G后(默认1G后会log rolling),这个起始位移就不是0开始了。
- HW(high watermark):副本的高水位值,replica中leader副本和follower副本都会有这个值,通过它可以得知副本中已提交或已备份消息的范围,leader副本中的HW,决定了消费者能消费的最新消息能到哪个offset。如下图所示,HW值为8,代表offset为[0,8]的9条消息都可以被消费到,它们是对消费者可见的,而[9,12]这4条消息由于未提交,对消费者是不可见的。注意HW最多达到LEO值时,这时可见范围不会包含HW值对应的那条消息了,如下图如果HW也是13,则消费的消息范围就是[0,12]。
- LEO(log end offset):日志末端位移,代表日志文件中下一条待写入消息的offset,这个offset上实际是没有消息的。不管是leader副本还是follower副本,都有这个值。当leader副本收到生产者的一条消息,LEO通常会自增1,而follower副本需要从leader副本fetch到数据后,才会增加它的LEO,最后leader副本会比较自己的LEO以及满足条件的follower副本上的LEO,选取两者中较小值作为新的HW,来更新自己的HW值。
而leader和副本之间LEO及HW的更新时机如下:
- remote LEO是保存在leader副本上的follower副本的LEO,可以看出leader副本上保存所有副本的LEO,当然也包括自己的。remote LEO是保存在leader副本上的follower副本的LEO,可以看出leader副本上保存所有副本的LEO,当然也包括自己的。
- follower LEO就是follower副本的LEO,它的更新是在follower副本得到leader副本发送的数据并随后写入到log文件,就会更新自己的LEO
HW和LEO在消息流转的层面上过程如下。
标准写入流程
在了解故障转移机制前,我们先来看看标准的写入流程是什么样的,这样在故障的时候我们可以看到故障发生在哪些节点影响标准写入流程,以及故障转移机制如何处理使其恢复正常:
整个过程分为如下几个步骤
- producer发送消息4、5给leader【之前提到过只有leader可以读写数据】,leader收到后更新自己的LEO为5
- fetch尝试更新remote LEO,因为此时leader的HW为3,且follower1和follower2的最小LEO也是3,所以remote LEO为3
- leader判读ISR中哪些副本还和自己保持同步,剔除不能保持同步的follower,得出follower1和follower2都可以
- leader计算自己的HW,取所有分区LEO的最小值为HW为3
- leader将消息4、5以及自己的HW发往follower1和follower2,follower1和follower2开始向自己的log写日志并更新消息4、5,但是follower1更新的快些,leo为5,而follower2更新的慢些,leo为4
- fetch再次更新remote LEO,取所有follower中的最小LEO为4,然后更新Leader的HW为4
- leader将自己的HW发往follower1和follower2,直到follower2同步完了更新自己的LEO为5,remote LEO为5,leaderHW为5,更新follower2中的HW为5则同步结束
实质上,Leader的HW是所有LEO最短的offset,并且是消费者需要认定的offset,Follower的HW则是Leader的HW和自身LEO取最小值,也就是长度不能超过消费者认定的offsetKafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。
- 同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。也就是 request.required.acks = -1策略
- 异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性,也就是 request.required.acks = 1策略
而 Kafka 使用request.required.acks = -1 + ISR
的策略则在可靠性和吞吐率方面取得了较好的平衡,同步复制并干掉复制慢的副本,只同步ISR中的Follwer,
故障转移机制
当不同的机器宕机故障时来看看ISR如何处理集群以及消息,分为 follower 故障和leader故障:
- follower故障,follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 follower HW,并将 log 文件高于follower HW 的【follower HW一定小于leader HW】部分截取掉,令副本的LEO与故障时的follower HW一致,然后follower LEO 开始从 leader 同步。等该 follower 的 LEO 大于等于该 Partition 的 的 HW【也就是leader的HW】,即 follower 追上 leader 之后,就可以重新加入 ISR 了。你可能会问为什么不从follower的LEO之后开始截呢?试想一下,如果follower故障离场后,leader也故障离场,一个LEO比故障follower低的ISR follower当选为新leader,那么故障follower回归后会比新leader多消息,这显然造成了数据不一致。
- leader 故障,leader 发生故障之后,会从 ISR 中选出一个新的 leader之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于HW【也就是leader的HW】的部分截掉,然后从新的 leader同步数据。如果新leader的LEO就是HW,则直接接收新的消息即可,如果是其它某个Follower的LEO是HW,则从新Leader同步Leader的LEO-HW之间的消息给所有副本
总而言之,要以所有副本都同步好的最新的HW为准(这样可以保证follower的消息永远是小于等于leader的)。但这只是处理方法,并不能保证数据不重复或者不丢失,我们来看一种数据重复的案例: Leader宕机:考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:
整个过程分为如下几个步骤
- follower1 同步了消息 4、5,follower2 同步了消息 4,HW为4。
- leader宕机,由于还未收到follower2 同步完成的消息,所以没有给生产者发送ACK,与此同时 follower1 被选举为 leader,follower2从follower1开始同步数据。当然如果follower2被选为leader,那么follower就需要截断自己的消息5。
- producer未收到ACK,于是重新发送,发送了给了新的leader(老的follower1),但因为follower1其实已经同步了4、5,所以此时来的消息就是重复消息。
这样就出现了数据重复的现象,所以HW&LEO机制只能保证副本之间保持同步,并不能保证数据不重复或不丢失,要想都保证,需要结合ACK机制食用
Leader选举
在可能发生的故障中,当Leader挂了的时候我们需要选举新的leader,遵循如下策略:Kafka 在 ZooKeeper 中为每一个 partition 动态的维护了一个 ISR,这个 ISR 里的所有 replica 都与 leader 保持同步,只有 ISR 里的成员才能有被选为 leader 的可能。
当然也有 极端情况:当 ISR 中至少有一个 follower 时(ISR 包括 leader),Kafka 可以确保已经 commit 的消息不丢失,但如果某一个 partition 的所有 replica 都挂了,自然就无法保证数据不丢失了。这种情况下如何进行 leader 选举呢?通常有两种方案:
- 等待 ISR 中任意一个 replica 恢复过来,并且选它作为 leader【高可靠性】,如果一定要等待 ISR 中的 replica 恢复过来,不可用的时间就可能会相对较长。而且如果 ISR 中所有的 replica 都无法恢复了,或者数据丢失了,这个 partition 将永远不可用。
- 选择第一个恢复过来的 replica(并不一定是在 ISR 中)作为leader【高可用性】,选择第一个恢复过来的 replica 作为 leader,如果这个 replica 不是 ISR 中的 replica,那么,它可能并不具备所有已经 commit 的消息,从而造成消息丢失。
默认情况下,Kafka 采用第二种策略,即 unclean.leader.election.enable=true,也可以将此参数设置为 false 来启用第一种策略
Kafka可靠高效原因
Kafka是如何保证高效读写数据的呢,有三点支持:分布式读写、顺序写磁盘以及零拷贝技术,其实前两点在之前的blog中也有提到
- 分布式读写,我们提到的各种策略都是为了满足分布式的可靠高效读写
- 顺序写磁盘,Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
- 零拷贝技术,简单来说就是数据不需要经过用户态,传统的文件读写或者网络传输,通常需要将数据从内核态转换为用户态。应用程序读取用户态内存数据,写入文件 / Socket之前,需要从用户态转换为内核态之后才可以写入文件或者网卡当中,而Kafka使用零拷贝技术让数据直接在内核态中进行传输。详细原理可以参照Kafka是如何利用零拷贝提高性能的
通过以上这几种技术可以实现Kafka的高并发读写
消费者策略:消费方式、分区分配策略、offset的维护
聊完了生产者策略,知道了消息是如何发送到Kafka集群并且保证不重不漏,以及在故障时如何保证多个副本的数据一致性之后,我们再从消费端来看下,消费者是如何消费消息的。
两种消费方式
消息有两种方式被投递,一种是broker推给消费者,一种是消费者从broker拉。这两种方式各自有优缺点:
- push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
- pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据
Kafka 采取的是pull 模式,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费:
- 控制消费方式——既可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义
- 超时返回机制——Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout
通过pull以及一定的策略可以满足Kafka的消费诉求。需要注意:
- 如果消费线程大于 patition 数量,则有些线程将收不到消息;
- 如果 patition 数量大于消费线程数,则有些线程多收到多个 patition 的消息;如果一个线程消费多个 patition,则无法保证你收到的topic消息的顺序,而一个 patition 内的消息是有序的。
这三点需要注意,消息的消费和分区个数的关系。
消费者分区分配策略
一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。Kafka 有三种分配策略: RoundRobin, Range,Sticky。。
- 目前我们还不能自定义分区分配策略,只能通过partition.assignment.strategy参数选择 range 或 roundrobin。partition.assignment.strategy参数默认的值是range
- 同一个组内同一分区只能被一个消费者消费,可以理解,如果一个组内多个消费者消费同一个分区,那么该消费者组如何保证单分区消息的顺序性呢?
无论是哪种策略,当消费者组里的消费者个数的变化【增多或减少】或者订阅主题分区的增加都会触发重新分配,这种将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)**
Rang策略
Range分配策略是面向每个主题的,首先会对同一个topic里面的分区按照序号进行排序,并把消费者线程按照字母顺序进行排序。然后用分区数除以消费者线程数量来判断每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。当然,这样的缺点就是对每个组内的每个消费者分布不均匀。举例如下:
这样ConsumerA承受的压力会越来越大。局部打散【计算后顺序整体分,不是轮询分】,每个topic的局部压力都会压向消费者组中的某个消费者
RoudRobin策略
RoudRobin策略也即轮询策略,RoundRobin策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询算法逐个将分区以此分配给每个消费者:
- 如果同一消费组内,所有的消费者订阅的消息都是相同的【也就是所有消费者订阅的topic数量相同】,那么 RoundRobin 策略的分区分配会是均匀的。
- 如果同一消费者组内,所订阅的消息是不相同的,那么在执行分区分配的时候,就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个 topic,那么在分配分区的时候,此消费者将不会分配到这个 topic 的任何分区。
这样的好处是,分配较为均衡。
当然前提是同一个消费者组里的每个消费者订阅的主题必须相同,当然也一定是相同的,如果不同也就没必要放到一个消费组里了。