Kafka(三)【Broker 存储】(2)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Kafka(三)【Broker 存储】

Kafka(三)【Broker 存储】(1)https://developer.aliyun.com/article/1532328

2.5、生产经验—手动调整分区副本存储

       生产环境中,每台服务器的配置和性能都不一致,但是 Kafka 只会根据自己的代码创建对应的分区副本,就会导致个别服务器存储压力比较大。所以需要手动调整分区副本的存储。

       需求:创建一个新的 topic ,4个分区,两个副本。我们将该 topic 的所有副本都存储到 hadoop102 和 hadoop103 上,hadoop104 不存储任何数据。

1)创建一个新的 topic 叫做 card ,4个分区,2个副本

2)查看分区副本信息

3)创建副本存储计划(所有副本都指定存储在hadoop102、hadoop103中)。

4)执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-repliaction-factor.json --execute

5)验证副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-repliaction-factor.json --verify

6)查看分区副本存储情况

2.6、生产经验—Leader Partition 负载均衡

       正常情况下,Kafka 本身会自动把 Leader Partition 均匀分散在各个机器上,来保证每台节点的吞吐量都是均匀的。但是如果某些 broker 节点宕机,会导致 leader partition 过于集中在其他少部分几台 broker 节点上,这会导致少数几台 broker 节点的读写压力过高,而且即使这些宕机的 broker 再次恢复上线,也只是 follower partition ,不会再次恢复为 leader partition,造成集群负载不均衡。

  • auto.leader.rebalance.enable,more为 true。自动 Leader partition 平衡
  • leader.imbalance.per.broker.percentag,默认是 10%,每个 broker 允许不平衡的leader的比率。如果超过这个值,控制器会触发 leader 的平衡。
  • leader.imbalance.check.interval.seconds,默认300s,也就是每300s检查一次leader负载是否平衡。

在生产环境中,通常不建议开启自动平衡,因为这可能会影响性能。

2.7、生产经验—增加副本因子

       假设我们创建了一个主题并设置副本数为 1 ,但是后来我们发现这部分数据特别重要,于是想要增加副本数,怎么办呢?通过命令行是不行的,得像我们上面 2.5 手动调整分区副本 一样,通过 json 文件来修改。

手动增加副本存储

1)创建副本存储计划(所有副本都指定存储在hadoop102、hadoop103、hadoop104中)

vim increase-replication-factor.json
# 输入下面的内容
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}

2)执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

3、文件存储

3.1、文件存储机制

1)Topic 数据的存储机制

       Topic是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应一个 log 文件,该 log 文件中存储的就是 producer 产生的数据。producer 产生的数据会被不断追加(追加是 Kafka 能够高效读写的一个重要原因)到该 log 文件末端,且每条数据都有自己的 offset 。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分区和索引机制,将每个 partition 分为多个 segment 。每个 segment 包括多个:“.index”文件、“.log”文件和 .timeindex 等文件(.timeindex 是一个时间戳索引文件,描述了文件保存时间,因为 Kafka 中的数据默认为保存 7 天才会自动删除,而判断文件日期是否达到7天就需要判断这个文件)。这些文件位于一个文件夹下,文件夹的命名规则为:topic名称+分区序号,例如:like-0。

# 000000~170409
00000000000000000000.index
00000000000000000000.log
# 170410~239429
00000000000000170410.index
00000000000000170410.log
# 239430~
00000000000000239430.index
00000000000000239430.log

验证:

1. 查看 kafka 的 data 目录下的 topic 数据

2. 这些文件因为都是经过序列化的所以都是乱码,需要使用 Kafka 提供的一个工具来看:

# 查看 .index 文件
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000015.index 
# 查看 .log 文件
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000015.log

2)index 文件和log文件
  1. kafka 的 index 是稀疏索引,大约每往 .log 文件中写入 4KB 数据,会往 .index 文件写入一条索引。参数 log.index.interval.bytes 默认 4kb;
  2. index 文件中保存的 offset 是相对 offset,这样能确保 offset 的值所占空间不会太大,因此能将 offset 的值控制在固定大小。
  3. 一个 segment 文件大小 1GB

现有索引文件 000000.index,0000522.index,000001005.index ,如何在 log 文件中定位到 offset=600 的record?

  • offset=600,offset 大于522 小于 1005,说明就要找的文件索引就在 522.index 文件中,所以去 00000522.index 文件中
  • 000001005.index 文件中的数据有多行,因为每4kb数据就会往这个文件写一条记录

可以看到 587 就是我们的 index 文件名中的基础offset+相对offset得到的,计算得到 587 后我们发现要找的 offset=600 是大于 587 小于 639 的,说明要找的 record 就在这一行。于是查看 position 得到 6410,接着查看 log文件。

找到 000000522.index 对应的 000000522.log 文件,这个 log 文件同样有一个 position 属性,我们要找的 6410 刚好在log文件中就有一条记录的position= 6410,这就找到了。如果我们要找的position=6415,那么我们就得找到介于这个值中间的数据,因为 6410 < 6415 <10090 所以我们要找的 position=6415的数据就在 6410这一行。

3.2、文件清理策略

Kafka中默认的日志保存时间为7,可以通过调整如下参数修改保存时间。

  1. log.retention.hours(int),最低优先级小时,默认7天。
  2. log.retention.minutes(int),分钟。
  3. log.retention.ms(long),最高优先级毫秒。
  4. log.retention.check.interval.ms,负责设置检查周期,默认5分钟。

这四个参数的优先级从上到下越来越高,也就是说 当我们设置了日志保存参数为 ms 级别时,前面设置的 hours 和 minutes 级别的参数就都失效了。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka中提供的日志清理策略有 delete 和 compact 两种。

1)delete日志删除:将过期数据删除
  • log.cleanup.policy = delete    所有数据启用删除策略

(1)基于时间:默认打开以segment中所有记录中的最大时间戳作为该文件时间戳。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。

log.retention.bytes(long),默认等于-1,表示无穷大。

思考:如果一个segment中有一部分数据过期,一部分没有过期,怎么处理?

当然是以segment中所有记录中的最大时间戳作为该文件时间戳,所以即使数据有99.9%是旧的,只要有0.01%是新的数据,就得等它过期了才能删除。

2compact 日志压缩

compact 日志压缩:对于相同的 key 的不同 value 只保留最后一个版本

要使用这个功能,只需要修改配置

  • log.cleanup.policy=compact

使用场景

       这种压缩只能用于特定场景,比如消息的 key 是用户id,value是用户的资料,通过这个压缩,整个消息集里就保存了所有用户最新的信息。

4、高效读写数据(面试重点)

1Kafka本身是分布式集群,可以采用分区技术,并行度高

2)读数据采用稀疏索引,可以快速定位要消费的数据

3)顺序写磁盘

       Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

4)页缓存 + 零拷贝技术

零拷贝:Kakfa 把对数据操作的步骤放到了 生产者和消费者当中(生产者和消费者可以在拦截器来对数据进行处理),所以 Kafka Broker 应用层并不关心数据的存储,所以数据都不需要走应用层,直接走网卡就可以传输费 消费者,传输效率高

pageCache 页缓存:Kafka 重度依赖底层操作系统提供的 PageCache 功能。当上层有写操作时,操作系统只是将数据写入 pageCache。当读操作发生时,先从 pageCache 中查找,如果找不到,再去磁盘读取。实际上 pageCache 是尽可能把更多的空闲内存空间都当做了磁盘缓存来使用。

参数

描述

log.flush.interval.messages

强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。

log.flush.interval.ms

每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

总结

       这一节用到的 prettyZoo 挺震撼我的,JavaFX 能开发出如此漂亮使用的一款软件 ,实在让我想不到,希望自己写的软件有一天也可以为百千人使用。

相关文章
|
12天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
21天前
|
消息中间件 存储 缓存
Kafka(三)【Broker 存储】(1)
Kafka(三)【Broker 存储】
|
1月前
|
存储 消息中间件 运维
分层存储救不了Kafka
Apache Kafka,作为流处理领域的标杆,面临云环境下的挑战,如高存储成本、运维复杂性和性能瓶颈。传统的本地磁盘Shared Nothing架构导致这些问题,而分层存储仅部分缓解,未根本解决问题。直接写入S3虽降低成本,但牺牲了延迟。为解决这些痛点,提出了创新的共享存储架构,通过EBS+S3实现存算分离,保持低延迟并提高弹性,同时降低成本和运维复杂性。该架构将EBS视为共享存储,实现Broker与存储的解耦,确保在云时代引领流处理系统的发展。
46 2
分层存储救不了Kafka
|
1月前
|
消息中间件 Cloud Native Kafka
一文搞懂 Kafka consumer 与 broker 交互机制与原理
AutoMQ致力于打造下一代云原生Kafka系统,解决Kafka痛点。本文深入解析Kafka Consumer与Broker的交互机制,涉及消费者角色、核心组件及常用接口。消费者以group形式工作,包括leader和follower。交互流程涵盖FindCoordinator、JoinGroup、SyncGroup、拉取消息和退出过程。文章还探讨了broker的consumer group状态管理和rebalance原理。AutoMQ团队分享Kafka技术,感兴趣的话可以关注他们。
68 2
一文搞懂 Kafka consumer 与 broker 交互机制与原理
|
24天前
|
消息中间件 Kafka 网络安全
Kafka. Broker not available
Kafka. Broker not available
10 0
|
1月前
|
消息中间件 负载均衡 监控
【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
【4月更文挑战第13天】【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
|
1月前
|
消息中间件 存储 缓存
【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
【4月更文挑战第13天】【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
|
1月前
|
消息中间件 Kafka 网络安全
Kafka. Broker not available
Kafka. Broker not available
70 0
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
76 1
|
1月前
|
消息中间件 监控 Java
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
67 0

热门文章

最新文章