Kafka(三)【Broker 存储】(1)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
日志服务 SLS,月写入数据量 50GB 1个月
云原生网关 MSE Higress,422元/月
简介: Kafka(三)【Broker 存储】

前言

       今天学习 Kafka 的第二部分 Broker,相比较 Flink ,Kafka 的流式数据处理还是比较好理解的。

Kafka Broker

环境准备:

  • 启动 Zookeeper
  • 启动 Kafka

1、工作流程

我们先看看 Zookeeper 中存储了 Kafka 的哪些信息,它俩是如何协调工作的,

1.1、Zookeeper 存储的 Kafka 信息

我们需要在任意节点启动 Zookeeper 客户端:

# 启动zookeeper客户端
bin/zkCli.sh
# 查看信息
ls /kafka/brokers/ids

/kafka/brokers/ids

可以看到,我们当前启动了三个 Kafka broker节点。

/kafka/brokers/topics/

这个目录下我们可以看到指定主题的分区信息:

/kafka/consumers/

  • 0.9 版本之前用于保存 offset 信息
  • 0.9版本之后 offset 存储在 kfaka 主题中

/kafka/controller

辅助节点选举,这个 controller 是谁取决于哪个节点先在 /kafka/controller 中注册的。在Kafka中,controller负责管理整个集群中所有分区和副本的状态。当检测到某个分区的ISR集合(ISR 就是所有 leader 和 follower 节点之间同步正常的节点集合)发生变化时,controller会负责通知所有broker更新其元数据信息。

1.2、Kafka Broker 的总体工作流程

  1. broker 节点启动后会先去 zookeeper 注册(注册到 /kafka/brokers/ids)代表自己活着
  2. 然后每个 broker 节点的 controller (每个 broker 节点都有一个 controller)会去zookeeper的 controller 注册一个临时节点(注册到 /kafka/controller),先注册的节点就是 controller ,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker 叫 Kafka Broker follower。
  3. 选举出来的 controller 会负责管理整个集群中所有分区和副本的状态
  4. controller 同时负责 Leader 的选举,选举规则是:首先要成为leader必须是存在于 ISR 中节点,其次按照这些节点在 AR 中的排名(AR 是 Kafak 分区中所有副本的统称),排在前面的节点优先成为 leader
  5. controller 节点将分区状态信息(当前集群的leade、isr ...)上传到 zookeeper (比如上传到 /kafka/brokers/topics/like/partitions/0/state)
  6. 其他 controller  去 zookeeper 拉取集群分区状态信息(防止 controller 挂了)
  7. 假设 leader 挂了
  8. controller 会立即检测到这个变化
  9. controller 从 zookeeper 中拉取回来集群分区状态信息,继续进行选举
  10. 选举的前提依然是必须存活于 ISR 队列,之后按照 AR 中的排列顺序,排在前面的优先成为 leader
  11. 选举完之后,controller 再次更新 zookeeper 中的集分区状态信息

1.3、Broke 重要参数

参数名称

描述

replica.lag.time.max.ms

ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s

auto.leader.rebalance.enable

默认是true。 自动Leader Partition 平衡。

leader.imbalance.per.broker.percentage

默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。

leader.imbalance.check.interval.seconds

默认值300秒。检查leader负载是否平衡的间隔时间。

log.segment.bytes

Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。

log.index.interval.bytes

默认4kbkafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。

log.retention.hours

Kafka中数据保存的时间,默认7天。

log.retention.minutes

Kafka中数据保存的时间,分钟级别,默认关闭。

log.retention.ms

Kafka中数据保存的时间,毫秒级别,默认关闭。

log.retention.check.interval.ms

检查数据是否保存超时的间隔,默认是5分钟

log.retention.bytes

默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。

log.cleanup.policy

默认是delete,表示所有数据启用删除策略;

如果设置值为compact,表示所有数据启用压缩策略。

num.io.threads

默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%

num.replica.fetchers

副本拉取线程数,这个参数占总核数的50%的1/3

num.network.threads

默认是3。数据传输线程数,这个参数占总核数的50%的2/3 

log.flush.interval.messages

强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。

log.flush.interval.ms

每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

2、Kafka 副本

2.1、副本基本信息

  1. Kafka副本作用:提高数据可靠性。
  2. Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  3. Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。Kafka 中,生产者和消费者只能针对 Leader 进行操作
  4. Kafka分区中的所有副本统称为AR(Assigned Repllicas)。

AR = ISR + OSR

ISR,表示和Leader保持同步的Follower集合。如果 Follower 长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR(踢出到 OSR)。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从 ISR 中选举新的Leader。

OSR表示Follower与Leader副本同步时,延迟过多的副本(特定条件下会恢复到 ISR)。

2.2、Keader 选举流程

       Kafka集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker的上下线,所有 topic 的分区副本分配Leader选举等工作。

       Controller的信息同步工作是依赖于Zookeeper的。

接下来我们来验证一下:

1. 创建一个新的topic,3个分区,3个副本

./kafka-topics.sh --bootstrap-server hadoop102:9092 -create --topic buy --partitions 3 -replication-factor 3

2. 查看 Leader 分布情况

注意区分 AR 和 ISR:

3. 停止掉hadoop103 的 kafka 进程,并查看Leader分区情况

上面的图中我们可以看到hadoop103上的副本一共有 3 个 ,分别存在节点 [2,0,1] 上,并且当前的副本 leader 是 2,现在我们把 hadoop103 节点的 kafka 进程停掉,相当于保存在分区 1 的副本全部故障,而我们分区 3 中的副本的 leader 刚好是节点1,所以要把副本的 leader 进行更新,要按照理论应该是 2 号节点继续上任副本 leader 的位置。

我们发现,我们分区 3 的副本 leader  原本存在分区2 中,但是现在分区 2 挂掉了,所以它会从自己的 AR 中选一个在前面的节点 [2,0],于是 分区2 中的 副本 leader 变成了节点 2。

4. 我们再恢复一下hadoop103

[lyh@hadoop103 bin] ./kafka-server-start.sh -daemon ../config/server.properties

2.3、Leader 和 Follower 的故障处理细节

概念:

  • LEO(Log End Offset):每个副本中最后一个 offset ,LEO 就是最新的 offset +1
  • HW(High Watermark):所有副本中最小的 LEO
Follower 故障

Leader 故障:

注意:只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复!

比如上面,我们的 leader 一开始是节点0,但是 leader 挂掉之后,leader 变成了节点1,但是新的leader 显然比旧的 leader 的数据少了3条,所以这部分数据就丢失了(毕竟自己处理问题总不能让人家生产者再发送一遍)。

2.4、分区副本分配

       如果kafka服务器只有3个节点,那么设置kafka 的分区数大于服务器台数,在kafka底层如何分配存储副本呢?

       注意:分区数可以大于机器数,毕竟大不了一个机器上多存几个分区,但是副本数不能大于机器数,那样没有意义,而且 Kafka 会报错:

我有三台机器,上面试着创建了 5 个副本的一个 topic,报错不能超过机器数量。

1)创建12分区,3个副本

./kafka-topics.sh -bootstrap-server hadoop102:9092 --create --topic card --partitions 12 --replication-factor 3

2)查看分区和副本信息

我们可以看到,Kafka 使得我们每个副本的 Leader 都尽可能的不一样,这样很好地分担了读写压力,毕竟Kafka 生产者和消费者都是只对分区 leader 进行操作的。每4个分区对应一个副本,而且每一个副本的 AR 顺序 Kafka 都尽量使它们不一样,这样可以尽可能做到负载均衡。

Kafka(三)【Broker 存储】(2)https://developer.aliyun.com/article/1532329

相关文章
|
12天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
存储 消息中间件 运维
分层存储救不了Kafka
Apache Kafka,作为流处理领域的标杆,面临云环境下的挑战,如高存储成本、运维复杂性和性能瓶颈。传统的本地磁盘Shared Nothing架构导致这些问题,而分层存储仅部分缓解,未根本解决问题。直接写入S3虽降低成本,但牺牲了延迟。为解决这些痛点,提出了创新的共享存储架构,通过EBS+S3实现存算分离,保持低延迟并提高弹性,同时降低成本和运维复杂性。该架构将EBS视为共享存储,实现Broker与存储的解耦,确保在云时代引领流处理系统的发展。
46 2
分层存储救不了Kafka
|
1月前
|
消息中间件 Cloud Native Kafka
一文搞懂 Kafka consumer 与 broker 交互机制与原理
AutoMQ致力于打造下一代云原生Kafka系统,解决Kafka痛点。本文深入解析Kafka Consumer与Broker的交互机制,涉及消费者角色、核心组件及常用接口。消费者以group形式工作,包括leader和follower。交互流程涵盖FindCoordinator、JoinGroup、SyncGroup、拉取消息和退出过程。文章还探讨了broker的consumer group状态管理和rebalance原理。AutoMQ团队分享Kafka技术,感兴趣的话可以关注他们。
68 2
一文搞懂 Kafka consumer 与 broker 交互机制与原理
|
21天前
|
存储 消息中间件 缓存
Kafka(三)【Broker 存储】(2)
Kafka(三)【Broker 存储】
|
24天前
|
消息中间件 Kafka 网络安全
Kafka. Broker not available
Kafka. Broker not available
10 0
|
1月前
|
消息中间件 负载均衡 监控
【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
【4月更文挑战第13天】【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
|
1月前
|
消息中间件 存储 缓存
【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
【4月更文挑战第13天】【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
|
1月前
|
消息中间件 Kafka 网络安全
Kafka. Broker not available
Kafka. Broker not available
70 0
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
76 1
|
1月前
|
消息中间件 监控 Java
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
67 0

热门文章

最新文章