Kafka详解日志结构

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Kafka详解日志结构

基础概念

Kafka 作为大数据技术生态的重要组件,尤其是实时流数据处理场景下,作为分布式生产/消费系统,得到广泛的重用。而 Kafka 在数据生产和消费上,日志是主要的场景。今天的大数据开发学习分享,我们就来讲讲 kafka 日志结构的基础。

Kafka 消息是以主题为单位,主题之间相互独立。每个主题又由一个或多个分区构成,分区数可以在创建主题时指定,也可以在主题创建后再修改,但只能增加一个主题的分区数而不能减少其分区数。每个分区可以有一个或多个副本。

在存储结构上分区的每个副本对应一个 Log 对象,每个 Log 又划分为多个 LogSegment,每个 LogSegment 包括一个日志文件和两个索引文件,其中两个索引文件分别为偏移量索引文件和时间戳索引文件。Log 对象中维护了一个 ConcurrentSkipListMap,底层是一个跳跃表,保存该主题所有分区对应的所有 LogSegment。日志文件和索引文件与磁盘上的物理存储文件相对应。

Kafka 将日志文件封装为一个 FileMessageSet 对象,将两个索引文件封装为 OffsetIndex 和 TimeIndex 对象。


数据文件

数据文件用来存储消息,每条消息由一个固定长度的消息头和一个可变长度的消息体数据组成。消息体包括一个可变长度的消息 Key 和消息实际数据 Value,消息 Key 可以为空,消息结构如下图所示:

640.png

消息结构各字段说明:

  • 起始位移:占用 8 字节,其存储了当前 batch 中第一条消息的位移;
  • 长度:占用了 4 个字节,其存储了整个 batch 所占用的磁盘空间的大小,通过该字段,kafka 在进行消息遍历的时候,可以快速的跳跃到下一个 batch 进行数据读取;
  • 分区 leader 版本号:记录了当前消息所在分区的 leader 的服务器版本,主要用于进行一些数据版本的校验和转换工作;
  • CRC:对当前整个 batch 的数据的 CRC 校验码,主要是用于对数据进行差错校验的;
  • 属性:占用 2 个字节,这个字段的最低 3 位记录了当前 batch 中消息的压缩方式,现在主要有 GZIP、LZ4 和 Snappy 三种。第 4 位记录了时间戳的类型,第 5 和 6 位记录了新版本引入的事务类型和控制类型;
  • 最大位移增量:最新的消息的位移相对于第一条消息的唯一增量;
  • 起始时间戳:占用 8 个字节,记录了 batch 中第一条消息的时间戳;
  • 最大时间戳:占用 8 个字节,记录了 batch 中最新的一条消息的时间戳;
  • PID、producer epoch 和起始序列号:这三个参数主要是为了实现事务和幂等性而使用的,其中 PID 和 producer epoch 用于确定当前 producer 是否合法,而起始序列号则主要用于进行消息的幂等校验;
  • 消息个数:占用 4 个字节,记录当前 batch 中所有消息的个数;

通过上面的介绍可以看出,每个 batch 的头部数据中占用的字节数固定为 61 个字节,可变部分主要是与具体的消息有关,下面我们来看一下 batch 中每条消息的格式:

640.png

这里的消息的头部数据就与 batch 的大不相同,可以看到,其大部分数据的长度都是可变的。既然是可变的,这里我们需要强调两个问题:

  • 对于数字的存储,kafka 采用的是 Zig-Zag 的存储方式,也即负数并不会使用补码的方式进行编码,而是将其转换为对应的正整数,比如-1 映射为 1、1 映射为 2、-2 映射为 3、2 映射为 4,关系图如下所示:

640.png

通过图可以看出,在对数据反编码的时候,我们只需要将对应的整数转换成其原始值即可;

  • 在使用 Zig-Zag 编码方式的时候,每个字节最大为 128,而其中一半要存储正数,一半要存储负数,还有一个 0,也就是说每个字节能够表示的最大整数为 64,此时如果有大于 64 的数字,kafka 就会使用多个字节进行存储,而这多个字节的表征方式是通过将每个字节的最大位作为保留位来实现的,如果最高位为 1,则表示需要与后续字节共同表征目标数字,如果最高位为 0,则表示当前位即可表示目标数字。

kafka 使用这种编码方式的优点在于,大部分的数据增量都是非常小的数字,因此使用一个字节即可保存,这比直接使用原始类型的数据要节约大概七倍的内存。

对于上面的每条消息的格式,除了消息 key 和 value 相关的字段,其还有属性字段和 header,属性字段的主要作用是存储当前消息 key 和 value 的压缩方式,而 header 则供给用户进行添加一些动态的属性,从而实现一些定制化的工作。通过对 kafka 消息日志的存储格式我们可以看出,其使用 batch 的方式将一些公共信息进行提取,从而保证其只需要存储一份,虽然看起来每个 batch 的头部信息比较多,但其平摊到每条消息上之后使用的字节更少了;在消息层面,kafka 使用了数据增量的方式和 Zig-Zag 编码方式对数据进行的压缩,从而极大地减少其占用的字节数。总体而言,这种存储方式极大的减少了 kafka 占用的磁盘空间大小。

数据文件的大小由配置项 log.segment.bytes 指定,默认为 1GB。同时 Kafka 提供了根据时间来切分日志段的机制,即使数据文件大小没有达到 log.segment.bytes 设置的阈值,但达到了 log.roll.ms 或是 log.roll.hours 设置的阈值,同样会创建新的日志段,在磁盘上创建一个数据文件和两个索引文件。接收消息追加操作的日志段也称为活跃段 activeSegment。

索引文件

kafka 主要有两种类型的索引文件:位移索引文件和时间戳索引文件。位移索引文件中存储的是消息的位移与该位移所对应的消息的物理地址;时间戳索引文件中则存储的是消息的时间戳与该消息的位移值。也就是说,如果需要通过时间戳查询消息记录,那么其首先会通过时间戳索引文件查询该时间戳对应的位移值,然后通过位移值在位移索引文件中查询消息具体的物理地址。关于位移索引文件,这里有两点需要说明:

  • 由于 kafka 消息都是以 batch 的形式进行存储,因而索引文件中索引元素的最小单元是 batch,也就是说,通过位移索引文件能够定位到消息所在的 batch,而没法定位到消息在 batch 中的具体位置,查找消息的时候,还需要进一步对 batch 进行遍历;
  • 位移索引文件中记录的位移值并不是消息真正的位移值,而是该位移相对于该位移索引文件的起始位移的偏移量,通过这种方式能够极大的减小位移索引文件的大小。如下图所示为一个位移索引文件的格式示意图:

640.png

如下则是具体的位移索引文件的示例:

640.png

关于时间戳索引文件,由于时间戳的变化比位移的变化幅度要大一些,其即使采用了增量的方式存储时间戳索引,但也没法有效地使用 Zig-Zag 方式对数据进行编码,因而时间戳索引文件是直接存储的消息的时间戳数据,但是对于时间戳索引文件中存储的位移数据,由于其变化幅度不大,因而其还是使用相对位移的方式进行的存储,并且这种存储方式也可以直接映射到位移索引文件中而无需进行计算。如下图所示为时间戳索引文件的格式图:

640.png

如下则是时间戳索引文件的一个存储示例:

640.png

可以看到,如果需要通过时间戳来定位消息,就需要首先在时间戳索引文件中定位到具体的位移,然后通过位移在位移索引文件中定位到消息的具体物理地址。

关于大数据学习,Kafka 日志结构,以上就为大家做了基本的讲解了。Kafka 在实时消息流的生产和消费上,其稳定性和可靠性,依赖于存储,对于日志结构这部分,建议大家一定要理解透彻。

相关文章
|
1月前
|
消息中间件 存储 Kafka
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
|
1月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
170 0
|
8天前
|
消息中间件 存储 Kafka
go语言并发实战——日志收集系统(二) Kafka简介
go语言并发实战——日志收集系统(二) Kafka简介
|
8天前
|
消息中间件 算法 Java
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
|
1月前
|
消息中间件 存储 Kafka
【Kafka】Kafka 的日志保留期与数据清理策略
【4月更文挑战第13天】【Kafka】Kafka 的日志保留期与数据清理策略
|
12天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
625 0
|
2天前
|
Java
使用kafka-clients操作数据(java)
使用kafka-clients操作数据(java)
13 6
|
12天前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之如何实现OSS数据到Kafka的实时同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。