KafKa存储机制

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
日志服务 SLS,月写入数据量 50GB 1个月
简介: KafKa存储机制



存储机制

Kafka 是为了解决大数据的实时日志流而生的, 每天要处理的日志量级在千亿规模。对于日志流的特点主要包括 :

1. 数据实时产生

2. 海量数据存储与处理

所以它必然要面临分布式系统遇到的高并发、高可用、高性能等三高问题。

对于 Kafka 的存储需要保证以下几点:

1. 存储的主要是消息流(可以是简单的文本格式也可以是其他格式,对于 Broker 存储来说,它并不关心数据本身)

2. 要支持海量数据的高效存储、高持久化(保证重启后数据不丢失)

3. 要支持海量数据的高效检索(消费的时候可以通过offset或者时间戳高效查询并处理)

4. 要保证数据的安全性和稳定性、故障转移容错性

kafka 存储选型

从上图性能测试的结果看出普通机械磁盘的顺序I/O性能指标是53.2M values/s,而内存的随机I/O性能 指标是36.7M values/s。由此似乎可以得出结论:磁盘的顺序I/O性能要强于内存的随机I/O性能。

另外,如果需要较高的存储性能,必然是提高读速度和写速度:

1. 提高读速度:利用索引,来提高查询速度,但是有了索引,大量写操作都会维护索引,那么会降低写入效率。常见的如关系型数据库:mysql等

2. 提高写速度:这种一般是采用日志存储, 通过顺序追加写的方式来提高写入速度,因为没有索引,无法快速查询,最严重的只能一行行遍历读取。常见的如大数据相关领域的基本都基于此方式来实现。

Kafka 存储方案剖析

对于 Kafka 来说, 它主要用来处理海量数据流,这个场景的特点主要包括:

1. 写操作:写并发要求非常高,基本得达到百万级 TPS,顺序追加写日志即可,无需考虑更新操作

2. 读操作:相对写操作来说,比较简单,只要能按照一定规则高效查询即可(offset或者时间戳)

对于写操作来说,直接采用顺序追加写日志的方式就可以满足 Kafka 对于百万TPS写入效率要求。所以我们重点放在如何解决高效查询这些日志。Kafka采用了稀疏哈希索引(底层基于Hash Table 实现)的方式

把消息的 Offset 设计成一个有序的字段,这样消息在日志文件中也就有序存放了,也不需要额外引入哈 希表结构, 可以直接将消息划分成若干个块,对于每个块,我们只需要索引当前块的第一条消息的 Offset (类似二分查找算法的原理),即先根据 Offset 大小找到对应的块, 然后再从块中顺序查找, 这样就可以快速定位到要查找的消息。

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制。 它将每个 Partition 分为多个 Segment,每个 Segment 对应两个文件:“.index” 索引文件和 “.log” 数据文件。这些文件位于同一文件下,该文件夹的命名规则为:topic 名-分区号。例如,test这个 topic 有三分分 区,则其对应的文件夹为 test-0,test-1,test-2。

index 和 log 文件以当前 Segment 的第一条消息的 Offset 命名。下图为 index 文件和 log 文件的结构示意图:

“.index” 文件存储大量的索引信息               “.log” 文件存储大量的数据,

索引文件中的元数据指向对应数据文件中 Message 的物理偏移量。

查看索引:./kafka-dump-log.sh --files /tmp/kafka-logs/test-1/00000000000000000000.index

kafka 存储架构设计

从上分析我们可以知道: Kafka 最终的存储实现方案, 即基于顺序追加写日志 + 稀疏哈希索引。

Kafka 日志存储结构:

从上图可以看出Kafka 是基于「主题 + 分区 + 副本 + 分段 + 索引」的结构:

1. kafka 中消息是以主题 Topic 为基本单位进行归类的,这里的 Topic 是逻辑上的概念,实际上在磁盘存储是根据分区 Partition 存储的, 即每个 Topic 被分成多个 Partition,分区 Partition 的数量可以在主题 Topic 创建的时候进行指定。

2. Partition 分区主要是为了解决 Kafka 存储的水平扩展问题而设计的, 如果一个 Topic 的所有消息都只存储到一个 Kafka Broker上的话, 对于 Kafka 每秒写入几百万消息的高并发系统来说,这个 Broker 肯定会出现瓶颈, 故障时候不好进行恢复,所以 Kafka 将 Topic 的消息划分成多个 Partition, 然后均衡的分布到整个 Kafka Broker 集群中

3. Partition 分区内每条消息都会被分配一个唯一的消息 id,即我们通常所说的偏移量 Offset, 因此 kafka 只能保证每个分区内部有序性,并不能保证全局有序性。

4. 然后每个 Partition 分区又被划分成了多个 LogSegment,这是为了防止 Log 日志过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegement,相当于一个巨型文件被平均分割为一些相对较小的文件,这样也便于消息的查找、维护和清理。这样在做历史数据清理的时候,直接删除旧的 LogSegement 文件就可以了。

5. Log 日志在物理上只是以文件夹的形式存储,而每个 LogSegement 对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".snapshot"为后缀的快照索引文件等)

kafka 日志系统架构设计

再来研究topic->partition的关系

kafka 消息是按主题 Topic 为基础单位归类的,各个 Topic 在逻辑上是独立的,每个 Topic 又可以分为一个或者多个 Partition,每条消息在发送的时候会根据分区规则被追加到指定的分区中,如下图所示:

4个分区的主题逻辑结构图

日志目录布局

Kafka 消息写入到磁盘的日志目录布局是怎样的?Log 对应了一个命名为-的文件夹。举个例子,假设现在有一个名为“topic-order”的 Topic,该 Topic 中 有4个 Partition,那么在实际物理存储上表现为“topic-order-0”、“topic-order-1”、“topic-order-2”、 “topic-order-3” 这4个文件夹。

Log 中写入消息是顺序写入的。但是只有最后一个 LogSegement 才能执行写入操作,之前的所有 LogSegement 都不能执行写入操作。为了更好理解这个概念,我们将最后一个 LogSegement 称 为"activeSegement",即表示当前活跃的日志分段。随着消息的不断写入,当 activeSegement 满足一定的条件时,就需要创建新的 activeSegement,之后再追加的消息会写入新的 activeSegement。

为了更高效的进行消息检索,每个 LogSegment 中的日志文件(以“.log”为文件后缀)都有对应的几个索引文件:偏移量索引文件(以“.index”为文件后缀)、时间戳索引文件(以“.timeindex”为文件后缀)、快照索引文件 (以“.snapshot”为文件后缀)。其中每个 LogSegment 都有一个 Offset 来作为 基准偏移量(baseOffset),用来表示当前 LogSegment 中第一条消息的 Offset。偏移量是一个64位的 Long 长整型数,日志文件和这几个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20位数字,没有达到的位数前面用0填充。比如第一个 LogSegment 的基准偏移量为0,对应的日志文件 为00000000000000000000.log。

上面例子中 LogSegment 对应的基准位移是12768089,也说明了当前 LogSegment 中的第一条消息的偏移量为12768089,同时可以说明前一个 LogSegment 中共有12768089条消息(偏移量从0至 12768089的消息)。再如:

注意每个 LogSegment 中不只包含“.log”、“.index”、“.timeindex”这几种文件,还可能包含 “.snapshot”、“.txnindex”、“leader-epoch-checkpoint”等文件, 以及 “.deleted”、“.cleaned”、 “.swap”等临时文件。

消费者消费的时候,会将提交的位移保存在 Kafka 内部的主题__consumer_offsets中,下面来看一个整体的日志目录结构图:

磁盘数据存储

我们知道 Kafka 是依赖文件系统来存储和缓存消息,以及典型的顺序追加写日志操作,另外它使用操作系统的 PageCache 来减少对磁盘 I/O 操作,即将磁盘的数据缓存到内存中,把对磁盘的访问转变为对内存的访问。

在 Kafka 中,大量使用了 PageCache, 这也是 Kafka 能实现高吞吐的重要因素之一, 当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据页是否在 PageCache 中,如果命中则直接返回数据,从而避免了对磁盘的 I/O 操作;如果没有命中,操作系统则会向磁盘发起读取请求并将读取的数据页存入 PageCache 中,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检查数据页是否在页缓存中,如果不存在,则 PageCache 中添加相应的数据页,最后将数据写入对应的数据页。被修改过后的数据页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。

除了消息顺序追加写日志、PageCache以外, kafka 还使用了零拷贝(Zero-Copy)技术来进一步提 升系统性能, 如下图所示:

消息从生产到写入磁盘的整体过程如下图所示:

可靠性

可靠性相关的问题:

我发消息的时候,需要等 ack 嘛?

我发了消息之后,消费者一定会收到嘛?

遇到各种故障时,我的消息会不会丢?

消费者侧会收到多条消息嘛?消费者 svr 重启后消息会丢失嘛?

Kafka 从拓扑上分有如下角色:

Consumer: 消费者,一般以 API 形式存在于各个业务 svr 中

Producer: 生产者,一般以 API 形式存在于各个业务 svr 中 Kafka

broker: kafka 集群中的服务器,topic 里的消息数据存在上面

Producer 采用发送 push 的方式将消息发到 broker 上,broker 存储后。由 consumer 采用 pull 模式订阅并消费消息

Producer的可靠性保证

回答生产者的可靠性保证,即回答:

1. 发消息之后有没有 ack

2. 发消息收到 ack 后,是不是消息就不会丢失了而 Kafka 通过配置来指定 producer 生产者在发送消息时的 ack 策略:

# -1(全量同步确认,强可靠性保证)
Request.required.acks= -1
# 1(leader 确认收到, 默认)
Request.required.acks = 1
# 0(不确认,但是吞吐量大)
Request.required.acks = 0

kafka 配置为 CP系统

如果想实现 kafka 配置为 CP(Consistency & Partition tolerance) 系统, 配置需要如下:

request.required.acks=-1
min.insync.replicas = ${N/2 + 1}     N: follower的个数
unclean.leader.election.enable = false

如图所示,在 acks=-1 的情况下,新消息只有被 ISR 中的所有 follower(f1 和 f2, f3) 都从 leader 复制过 去才会回 ack, ack 后,无论那种机器故障情况(全部或部分), 写入的 msg4,都不会丢失, 消息状态满足 一致性 C 要求。

正常情况下,所有 follower 复制完成后,leader 回 producer ack。

异常情况下,如果当数据发送到 leader 后部分副本(f1 和 f2 同步), leader 挂了?此时任何 follower 都有可能变成新的 leader, producer 端会得到返回异常,producer 端会重新发送数据,但这样数据可能会重复(但不会丢失), 暂不考虑数据重复的情况。

min.insync.replicas 参数用于保证当前集群中处于正常同步状态的副本 follower 数量,当实际值小于配置值时,集群停止服务。如果配置为 N/2+1, 即多一半的数量,则在满足此条件下,通过算法保证强一致性。当不满足配置数时,牺牲可用性即停服。

如果选举 f3 为新 leader, 则可能会发生消息截断,因为 f3 还未同步 msg4 的数据。Kafka 的通 unclean.leader.election.enable 来控制在这种情况下,是否可以选举 f3 为 leader。旧版本中默认为 true,在某个版本下已默认为 false,避免这种情况下消息截断的出现。

通过 ack 和 min.insync.replicas 和 unclean.leader.election.enable 的配合,保证在 kafka 配置为 CP 系统时,要么不工作,要么得到 ack 后,消息不会丢失且消息状态一致。

kafka 配置为 AP系统

如果想实现 kafka 配置为 AP(Availability & Partition tolerance)系统:

request.required.acks=1

min.insync.replicas = 1

unclean.leader.election.enable = false

当配置为 acks=1 时,即 leader 接收消息后回 ack,这时会出现消息丢失的问题:如果 leader 接受到了 第 4 条消息,此时还没有同步到 follower 中,leader 机器挂了,其中一个 follower 被选为 leader, 则第 4 条消息丢失了。当然这个也需要 unclean.leader.election.enable 参数配置为 false 来配合。但是 leader 回 ack 的情况下,follower 未同步的概率会大大提升。

通过 producer 策略的配置和 kafka 集群通用参数的配置,可以针对自己的业务系统特点来进行合理的参数配置,在通讯性能和消息可靠性下寻得某种平衡。

Broker 的可靠性保证

消息通过 producer 发送到 broker 之后,还会遇到很多问题:

Partition leader 写入成功, follower 什么时候同步?

Leader 写入成功,消费者什么时候能读到这条消息?

Leader 写入成功后,leader 重启,重启后消息状态还正常嘛?

Leader 重启,如何选举新的 leader?

这些问题集中在, 消息落到 broker 后,集群通过何种机制来保证不同副本建的消息状态一致性。

LEO和HW简单介绍

LEO:LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。

HW: HighWaterMark的缩写,是指consumer能够看到的此partition的位置。 取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。

下面具体分析一下 ISR 集合和 HW、LEO的关系。

假设某分区的 ISR 集合中有 3 个副本,即一个 leader 副本和 2 个 follower 副本,此时分区的 LEO 和 HW 都分别为 3 。消息3和消息4从生产者出发之后先被存入leader副本。

在消息被写入leader副本之后,follower副本会发送拉取请求来拉取消息3和消息4进行消息同步。

在同步过程中不同的副本同步的效率不尽相同,在某一时刻follower1完全跟上了leader副本而 follower2只同步了消息3,如此leader副本的LEO为5,follower1的LEO为5,follower2的LEO 为4,那么当前分区的HW取最小值4,此时消费者可以消费到offset0至3之间的消息。

当所有副本都成功写入消息3和消息4之后,整个分区的HW和LEO都变为5,因此消费者可以消费到 offset为4的消息了。

由此可见,HW用于标识消费者可以读取的最大消息位置,LEO用于标识消息追加到文件的最后位置。 如果消息发送成功,不代表消费者可以消费这条消息。

Consumer 的可靠性策略

Consumer 的可靠性策略集中在 consumer 的投递语义上,即:

何时消费,消费到什么?

按消费是否会丢?

消费是否会重复?

这些语义场景,可以通过 kafka 消费者的而部分参数进行配置,简单来说有以下 3 中场景:

AutoCommit(at most once, commit 后挂,实际会丢)

enable.auto.commit = true

auto.commit.interval.ms

配置如上的 consumer 收到消息就返回正确给 brocker, 但是如果业务逻辑没有走完中断了,实际上这个消息没有消费成功。这种场景适用于可靠性要求不高的业务。其中 auto.commit.interval.ms 代表了自动提交的间隔。比如设置为 1s 提交 1 次,那么在 1s 内的故障重启,会从当前消费 offset 进行重新消费时,1s 内未提交但是已经消费的 msg, 会被重新消费到。

手动 Commit(at least once, commit 前挂,就会重复, 重启还会丢)

enable.auto.commit = false

配置为手动提交的场景下,业务开发者需要在消费消息到消息业务逻辑处理整个流程完成后进行手动提交。如果在流程未处理结束时发生重启,则之前消费到未提交的消息会重新消费到,即消息显然会投递多次。此处应用与业务逻辑明显实现了幂等的场景下使用。

特别应关注到在 golang 中 sarama 库的几个参数的配置:

sarama.offset.initial (oldest, newest)

offsets.retention.minutes

intitial = oldest 代表消费可以访问到的 topic 里的最早的消息,大于 commit 的位置,但是小于 HW。 同时也受到 broker 上消息保留时间的影响和位移保留时间的影响。不能保证一定能消费到 topic 起始位置的消息。

如果设置为 newest 则代表访问 commit 位置的下一条消息。如果发生 consumer 重启且 autocommit 没有设置为 false, 则之前的消息会发生丢失,再也消费不到了。在业务环境特别不稳定或非持久化 consumer 实例的场景下,应特别注意。 一般情况下, offsets.retention.minutes 为 1440s。

Exactly once, 很难,需要 msg 持久化和 commit 是原子的

消息投递且仅投递一次的语义是很难实现的。首先要消费消息并且提交保证不会重复投递,其次提交前 要完成整体的业务逻辑关于消息的处理。在 kafka 本身没有提供此场景语义接口的情况下,这几乎是不 可能有效实现的。一般的解决方案,也是进行原子性的消息存储,业务逻辑异步慢慢的从存储中取出消息进行处理。

消费组Reblance

消费者组

消费组指的是多个消费者组成起来的一个组,它们共同消费 topic 的所有消息,并且一个 topic 的一个 partition 只能被一个 consumer 消费。其实reblance就是为了kafka对提升消费效率做的优化,规定了 一个ConsumerGroup下的所有consumer均匀分配订阅 Topic 的每个分区。

例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。

rebalance的影响

每次reblance会把所有的消费者重新分配监听topic,会产生一定影响

首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收 机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称 STW。在 STW 期间,所 有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。

其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更 高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配 其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

最后,Rebalance 实在是太慢了。曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成 功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至 少现在还没有特别好的解决方案。所谓“本事大不如不摊上”,也许最好的解决方案就是避免 Rebalance 的发生吧。

目录
相关文章
|
1月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
36 4
|
1月前
|
消息中间件 存储 缓存
大数据-71 Kafka 高级特性 物理存储 磁盘存储特性 如零拷贝、页缓存、mmp、sendfile
大数据-71 Kafka 高级特性 物理存储 磁盘存储特性 如零拷贝、页缓存、mmp、sendfile
53 3
|
1月前
|
存储 消息中间件 大数据
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
39 1
|
1月前
|
存储 消息中间件 大数据
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
26 1
|
1月前
|
消息中间件 Java 大数据
Kafka ISR机制详解!
本文详细解析了Kafka的ISR(In-Sync Replicas)机制,阐述其工作原理及如何确保消息的高可靠性和高可用性。ISR动态维护与Leader同步的副本集,通过不同ACK确认机制(如acks=0、acks=1、acks=all),平衡可靠性和性能。此外,ISR机制支持故障转移,当Leader失效时,可从ISR中选取新的Leader。文章还包括实例分析,展示了ISR在不同场景下的变化,并讨论了其优缺点,帮助读者更好地理解和应用ISR机制。
51 0
Kafka ISR机制详解!
|
1月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
175 0
|
3月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
52 3
|
3月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
129 3
|
3月前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
121 4
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3
下一篇
无影云桌面