最近打算梳理消息引擎系统,以Kafka和RocketMQ为主进行学习。关于Kafka打算写两篇文章,一篇是基础知识,一篇是实践,打算用Kafka收集日志,并实现报警功能。Kafka版本经常更新,有的知识可能和最新版本不一致,这点需注意。
基础知识
Kafka是什么
Apache Kafka 是消息引擎系统,也是一个分布式流处理平台(Distributed Streaming Platform)。本次主要讨论其作为消息引擎方面的内容。
消息引擎系统是一组规范,企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递。即系统 A 发送消息给消息引擎系统,系统 B 从消息引擎系统中读取 A 发送的消息。
Kafka结构
我们通过一个实例,来认识Kafka的结构和核心术语。
角色
首先需要了解Kafka内部有哪些角色,它们的关系如何:
Broker:Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化,一台服务器可以启动多个Broker。
协调者:Coordinator。它专门为 Consumer Group 服务,负责为 Group 执行Rebalance 以及提供位移管理和组成员管理等。所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有Broker 都有各自的 Coordinator 组件。
主题:Topic。主题是承载消息的逻辑容器,在实际使用中用来区分具体业务。如订单消息主题、物流消息主题。
分区:Partition。一个有序不变的消息序列,用于存放消息。每个主题下可以有多个分区。分区可存在不同的Broker上。
消息:Record。Kafka 是消息引擎,这里的消息就是指 Kafka 处理的主要对象。
生产者:Producer。向主题发布新消息的应用程序。
消费者:Consumer。从主题订阅新消息的应用程序。
消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。消费消息时,一般都是用消费者组的形式。
实例
假设现有两台机器M1和M2,在M1机器上创建两个Broker,在M2机器上创建一个Broker。创建主题A,设置该主题的分区数为3、副本数为2。有一个生产者负责生产消息,有两个消费者组G1、G2负责消费消息,G1包含2个消费者,G2包含3个消费者。概念图如下:
- 创建主题时可以设置分区数量,由Kafka配置分区会分配到哪个Broker上。
- 创建主题时可以设置副本数量。
副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
- 分区可以有多个副本,只能有一个Leader副本,Leader副本负责和生产者、消费者交互,追随者副本只负责冗余数据。
- 一个分区只能被同一个消费者组里的一个消费者消费,但可以被多个消费者组消费,如G1\_1、G2\_1都能消费分区1。
- 主题下所有分区都需分配给消费者组里的一个消费者。如果消费者数量比分区数量少,则一个消费者需要消费多个分区,如G1\_2需要消费分区2和分区3。如果消费者数量比分区数量多,则有的消费者不会被分配分区,无事可做。
执行流程
通过基础知识,我们对Kafka有了基本的了解。现在我们按照消息生产、存储、消费流程,串联Kafka的整个过程。
创建Broker
命令
要运行Kafka,首先需要创建Broker,使用命令为:bin/kafka-server-start.sh config/server.properties &。
控制器
产生
Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。换句话说,每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。
作用
控制器可用于主题管理(创建、删除、增加分区)、分区重分配、 Preferred 领导者选举、集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)、数据服务(向其他 Broker 推送集群元数据信息)。
选举
如果控制器故障,ZooKeeper 通过 Watch 机制感知到并删除了 /controller 临时节点,所有存活的 Broker 开始竞选新的控制器身份。成功地在 ZooKeeper 上重建了 /controller 节点最终赢得了选举,成为最新的控制器。
生产者
生产者主要用于生成消息。
连接Broker
- 在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会根据bootstrap.servers配置中记录的Broker IP地址,创建与这些Broker 的连接。
- Producer 向bootstrap.servers中的某一台 Broker 发送了 METADATA 请求,尝试获取集群的元数据信息。
- 当 Producer 获取了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。
至此,建立了Producer和Broker之间的连接。
交付可靠性
交付可靠性是指消息丢失、重复发送情况,有以下三种类型:
最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
至少一次(at least once):消息不会丢失,但有可能被重复发送。
精确一次(exactly once):消息不会丢失,也不会被重复发送。
Kafka 默认提供的交付可靠性保障是第二种,即至少一次。
这三种类型,生产者的实现方案为:
- 至少一次:只有 Broker 成功“提交”消息且 Producer 接到Broker 的应答才会认为该消息成功发送,否则就不断重试,这导致消息可能重复。
- 最多一次:Producer禁止重试。这样消息要么写入成功,要么写入失败,但绝不会重复发送,有可能导致消息可能丢失。
- 精确一次:
创建幂等性 Producer,Kafka 自动做消息的重复去重。
- 只能保证单分区上的幂等性
- 只能实现单会话上的幂等性,Producer 进程重启后,幂等性保证就丧失
创建事务型 Producer,要么全部写入成功,要么全部失败。
- 支持多分区、多会话
- 比起幂等性 Producer,事务型 Producer 的性能更差
发送消息
一个主题有多个分区,分区策略是决定生产者的消息发送到哪个分区的算法。分区策略主要有三种:
- 轮询策略:也称 Round-robin 策略,即顺序分配。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上。
- 随机策略:也称 Randomness 策略。所谓随机就是随意地将消息放置到任意一个分区上。
- 按消息键保序策略:也称 Key-ordering 策略。Kafka 允许为每条消息定义消息键,简称为 Key。一旦消息被定义了 Key,那么就可以保证同一个Key 的所有消息都进入到相同的分区里面。
Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略。
数据存储
存储
Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。
Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文
件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。分区存放消息日志,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
图中的数字叫做消息位移(Offset),表示分区中每条消息的位置信息,是一个单调递增且不变的值。
副本
在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。消息写入Leader副本后,Follower副本怎么获取到新的消息呢?
同步
追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
可见性
分区里的消息分为已提交消息和未提交消息,两者以高水位(HW)进行分割。消费者只能消费已提交消息。
水位的定义:在时刻 T,任意创建时间(Event Time)为 T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。
高水位更新
那高水位是怎样进行更新的呢?每个副本对象都保存了一组高水位值和 LEO 值,但实际上,在 Leader副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值。
Leader 副本
处理生产者请求的逻辑如下:
- 写入消息到本地磁盘。
- 更新分区高水位值。
i. 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值{LEO-1,LEO-2,……,LEO-n}。
ii. 获取 Leader 副本高水位值:currentHW。
iii. 更新 currentHW = min(currentHW, LEO-1,LEO-2,……,LEO-n)。
处理 Follower 副本拉取消息的逻辑如下:
- 读取磁盘(或页缓存)中的消息数据。
- 使用 Follower 副本发送请求中的位移值更新远程副本 LEO 值。
- 更新分区高水位值(具体步骤与处理生产者请求的步骤相同)。
Follower 副本
从 Leader 拉取消息的处理逻辑如下:
- 写入消息到本地磁盘。
- 更新 LEO 值。
- 更新高水位值。
i. 获取 Leader 发送的高水位值:currentHW。
ii. 获取步骤 2 中更新过的 LEO 值:currentLEO。
iii. 更新高水位为 min(currentHW, currentLEO)。
简单来说,是所有副本都拉取到的消息,才能被消费者看到。
Leader Epoch
高水位在界定Kafka 消息对外可见性以及实现副本机制等方面起到了非常重要的作用,但其设计上的缺陷给 Kafka 留下了很多数据丢失或数据不一致的潜在风险。为此,社区引入了 Leader Epoch机制,尝试规避掉这类风险。
所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。
- Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
- 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。
选举
当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
一个分区有多个Follower分区,如何选举呢?Kafka 引入了 In-sync Replicas,也就是所谓的 ISR 副本集合。
ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。Broker 端参数 replica.lag.time.max.ms 的含义是Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。如果时间间隔在replica.lag.time.max.ms之内,则放入ISR,否则踢出ISR。
如果Broker 端参数 unclean.leader.election.enable 为false,非ISR中的分区可以参加领导者选举,否则只能在ISR中的分区参加选举。
此处大家会有疑问,即使是ISR中的分区,也可能没有完全同步完Leader副本的消息,是否会导致消息丢失?这里有一个核心点:Kafka只对已提交消息做持久化保证。如果我们设置了最高等级的持久化需求,比如acks=all,那么follower副本没有同步完成前这条消息就不算已提交,就不算丢失了。
所以可以设置 min.insync.replicas > 1。这是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。这样能够保证至少有多个副本是有完整的消息。
消费者
位移主题
Kafka 中有一个内部主题(InternalTopic)\_\_consumer\_offsets。\_\_consumer\_offsets 在 Kafka 中叫位移主题,即 Offsets Topic。当Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题,也会自动设置该主题的分区数和备份数。
consumer\_offsets 的主要作用是保存 Kafka 消费者的位移信息。本质上Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 consumer\_offsets 中。
consumer\_offsets中主要存放三类消息:
- 位移消息,格式为K-V结构。其中Key:,Value是位移值。表示消费者组A,消费主题B的分区C,已经消费到什么位置了。
- 用于保存 Consumer Group 信息的消息。
- 用于删除 Group 过期位移甚至是删除 Group 的消息。
连接Broker
消费者创建和Broker的TCP 连接,是在调用 KafkaConsumer.poll 方法时被创建的。步骤为:
- 发起 FindCoordinator 请求
当消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者。消费者程序会向集群中当前负载最小的那台 Broker 发送请求。计算协调者所在Broker的算法为:
- 第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
- 第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
- 连接协调者
FindCoordinator返回数据显式地告诉消费者哪个 Broker 是真正的协调者,消费者知晓了真正的协调者后,会创建连向该 Broker 的 Socket 连接。只有成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。
- 消费数据
经过第二步,消费者组知道各个消费者需要消费哪个分区的消息,消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。
消费消息
消费者位移
消费者连接对应Broker后,便可以消费指定分区的消息了。消费者也有一个位移,叫做消费者位移(Consumer Offset),表示消费者消费进度,每个消费者都有自己的消费者位移。
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
当然,消费者的位移数据,就存放于位移主题中。
提交位移
消费者提交位移的方案有两大类,自动提交和手动提交。
自动提交
Consumer 端参数 enable.auto.commit=true表示自动提交,Kafka 每auto.commit.interval.ms秒会自动提交一次位移。
- 优点:保证不出现消费丢失的情况。开启自动提交,poll方法会先提交上一批消息位移,后获取消息。
- 缺点:它可能会出现重复消费。如消息已经处理,但未能及时提交位移,消费者便崩溃了。
手动提交
enable.auto.commit 设置为 false,然后调用API手动提交位移。手动提交API有三种:
- 同步提交:函数为KafkaConsumer#commitSync(),该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。
- 优点:能够自己把控位移提交的时机和频率。
- 缺点:影响整个应用程序的 TPS。调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果。
- 异步提交:函数为KafkaConsumer#commitAsync(),调用 commitAsync() 之后,它会立即返回,不会阻塞,Kafka 提供了回调函数(callback),供我们实现提交之后的逻辑,比如记录日志或处理异常等。
- 优点:不会影响 Consumer 应用的 TPS。
- 缺点:出现问题时它不会自动重试,重试也没有价值
- 部分提交:Consumer位移提交,都是提交 poll 方法返回的所有消息的位移,如下两个函数commitSync(Map<TopicPartition, OffsetAndMetadata>) 和commitAsync(Map<TopicPartition, OffsetAndMetadata>)支持部分提交poll下的位移。
- 优点:更加细粒度化地提交位移
实际上,手动提交也不能避免消息重复消费,如果要避免重复消费,需要自行设计去重逻辑。
提交位移最佳实践
如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:
- 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
- 我们不希望程序总处于阻塞状态,影响 TPS。
重平衡
时机
如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发重平衡(Rebalance)。重平衡含义为:消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
其实有三个时机会引发重平衡:
- 组成员数量发生变化
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
最常引起重平衡的时机是组成员数量变化,这种变化有可能确实是需要进行增减,也有可能是误判:
- 每个Consumer实例都会定期地向Coordinator 发送心跳请求,表明它还存活着。所以需要合理设置session.timeout.ms 和 heartbeat.interval.ms,防止未能及时发送心跳,导致协调者误将Consumer“踢出”Group”。
- Consumer 程序如果在 max.poll.interval.ms 之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance,所以需要合理设置max.poll.interval.ms。
重平衡过程
当协调者决定开启新一轮重平衡后:
- 通知:每个Consumer实例都会定期地向Coordinator 发送心跳请求,Coordinator会将REBALANCE\_IN\_PROGRESS封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE\_IN\_PROGRESS”,就能立马知道重平衡开始了。
- 加入组:消费者会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。
- 选举消费者组领导者:一旦收集了全部成员的JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。
- 订阅信息传递:选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者。
- 制定分区消费方案:领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
- 消费方案传递:领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。
- 全员同步消费方案:协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
总结
终于梳理完整个流程了,太累了!阅读资料后写文章有三种方案:
一是先读完一遍,然后一篇一篇重新读,然后一篇一篇写,使用的这个方案写了《设计模式系列》
二是边看边写,看一篇写一篇,使用的这个方案写了《MySQL系列》《算法系列》
三是迅速读完一遍,记录核心点,心中重新梳理知识结构,趁着对知识的短期记忆,快速写出文章,使用这个方案写了《Kafka》
不得不说,第三个方案速度是真快,效果也很不错,自己能够清晰感受到随着时间的流逝,记忆中的知识也在不断消退,但是随着文章的书写,记忆会被重新唤起并被加深。但是这个方案需要注意力高度集中,并且能够快速整理出核心知识点,后期可能会多使用方案三来进行学习。
最后
大家如果喜欢我的文章,可以关注我的公众号(程序员麻辣烫)
我的个人博客为:https://shidawuhen.github.io/
往期文章回顾: