前言
今天学习 Kafka 的第二部分 Broker,相比较 Flink ,Kafka 的流式数据处理还是比较好理解的。
Kafka Broker
环境准备:
- 启动 Zookeeper
- 启动 Kafka
1、工作流程
我们先看看 Zookeeper 中存储了 Kafka 的哪些信息,它俩是如何协调工作的,
1.1、Zookeeper 存储的 Kafka 信息
我们需要在任意节点启动 Zookeeper 客户端:
# 启动zookeeper客户端 bin/zkCli.sh # 查看信息 ls /kafka/brokers/ids
/kafka/brokers/ids
可以看到,我们当前启动了三个 Kafka broker节点。
/kafka/brokers/topics/
这个目录下我们可以看到指定主题的分区信息:
/kafka/consumers/
- 0.9 版本之前用于保存 offset 信息
- 0.9版本之后 offset 存储在 kfaka 主题中
/kafka/controller
辅助节点选举,这个 controller 是谁取决于哪个节点先在 /kafka/controller 中注册的。在Kafka中,controller负责管理整个集群中所有分区和副本的状态。当检测到某个分区的ISR集合(ISR 就是所有 leader 和 follower 节点之间同步正常的节点集合)发生变化时,controller会负责通知所有broker更新其元数据信息。
1.2、Kafka Broker 的总体工作流程
- broker 节点启动后会先去 zookeeper 注册(注册到 /kafka/brokers/ids)代表自己活着
- 然后每个 broker 节点的 controller (每个 broker 节点都有一个 controller)会去zookeeper的 controller 注册一个临时节点(注册到 /kafka/controller),先注册的节点就是 controller ,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker 叫 Kafka Broker follower。
- 选举出来的 controller 会负责管理整个集群中所有分区和副本的状态
- controller 同时负责 Leader 的选举,选举规则是:首先要成为leader必须是存在于 ISR 中节点,其次按照这些节点在 AR 中的排名(AR 是 Kafak 分区中所有副本的统称),排在前面的节点优先成为 leader
- controller 节点将分区状态信息(当前集群的leade、isr ...)上传到 zookeeper (比如上传到 /kafka/brokers/topics/like/partitions/0/state)
- 其他 controller 去 zookeeper 拉取集群分区状态信息(防止 controller 挂了)
- 假设 leader 挂了
- controller 会立即检测到这个变化
- controller 从 zookeeper 中拉取回来集群分区状态信息,继续进行选举
- 选举的前提依然是必须存活于 ISR 队列,之后按照 AR 中的排列顺序,排在前面的优先成为 leader
- 选举完之后,controller 再次更新 zookeeper 中的集分区状态信息
1.3、Broke 重要参数
参数名称 |
描述 |
replica.lag.time.max.ms |
ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s。 |
auto.leader.rebalance.enable |
默认是true。 自动Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage |
默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。 |
leader.imbalance.check.interval.seconds |
默认值300秒。检查leader负载是否平衡的间隔时间。 |
log.segment.bytes |
Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。 |
log.index.interval.bytes |
默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。 |
log.retention.hours |
Kafka中数据保存的时间,默认7天。 |
log.retention.minutes |
Kafka中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms |
Kafka中数据保存的时间,毫秒级别,默认关闭。 |
log.retention.check.interval.ms |
检查数据是否保存超时的间隔,默认是5分钟。 |
log.retention.bytes |
默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。 |
log.cleanup.policy |
默认是delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策略。 |
num.io.threads |
默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。 |
num.replica.fetchers |
副本拉取线程数,这个参数占总核数的50%的1/3 |
num.network.threads |
默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。 |
log.flush.interval.messages |
强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms |
每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。 |
2、Kafka 副本
2.1、副本基本信息
- Kafka副本作用:提高数据可靠性。
- Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
- Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。Kafka 中,生产者和消费者只能针对 Leader 进行操作。
- Kafka分区中的所有副本统称为AR(Assigned Repllicas)。
AR = ISR + OSR
ISR,表示和Leader保持同步的Follower集合。如果 Follower 长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR(踢出到 OSR)。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从 ISR 中选举新的Leader。
OSR,表示Follower与Leader副本同步时,延迟过多的副本(特定条件下会恢复到 ISR)。
2.2、Keader 选举流程
Kafka集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker的上下线,所有 topic 的分区副本分配和Leader选举等工作。
Controller的信息同步工作是依赖于Zookeeper的。
接下来我们来验证一下:
1. 创建一个新的topic,3个分区,3个副本
./kafka-topics.sh --bootstrap-server hadoop102:9092 -create --topic buy --partitions 3 -replication-factor 3
2. 查看 Leader 分布情况
注意区分 AR 和 ISR:
3. 停止掉hadoop103 的 kafka 进程,并查看Leader分区情况
上面的图中我们可以看到hadoop103上的副本一共有 3 个 ,分别存在节点 [2,0,1] 上,并且当前的副本 leader 是 2,现在我们把 hadoop103 节点的 kafka 进程停掉,相当于保存在分区 1 的副本全部故障,而我们分区 3 中的副本的 leader 刚好是节点1,所以要把副本的 leader 进行更新,要按照理论应该是 2 号节点继续上任副本 leader 的位置。
我们发现,我们分区 3 的副本 leader 原本存在分区2 中,但是现在分区 2 挂掉了,所以它会从自己的 AR 中选一个在前面的节点 [2,0],于是 分区2 中的 副本 leader 变成了节点 2。
4. 我们再恢复一下hadoop103
[lyh@hadoop103 bin] ./kafka-server-start.sh -daemon ../config/server.properties
2.3、Leader 和 Follower 的故障处理细节
概念:
- LEO(Log End Offset):每个副本中最后一个 offset ,LEO 就是最新的 offset +1
- HW(High Watermark):所有副本中最小的 LEO
Follower 故障
Leader 故障:
注意:只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复!
比如上面,我们的 leader 一开始是节点0,但是 leader 挂掉之后,leader 变成了节点1,但是新的leader 显然比旧的 leader 的数据少了3条,所以这部分数据就丢失了(毕竟自己处理问题总不能让人家生产者再发送一遍)。
2.4、分区副本分配
如果kafka服务器只有3个节点,那么设置kafka 的分区数大于服务器台数,在kafka底层如何分配存储副本呢?
注意:分区数可以大于机器数,毕竟大不了一个机器上多存几个分区,但是副本数不能大于机器数,那样没有意义,而且 Kafka 会报错:
我有三台机器,上面试着创建了 5 个副本的一个 topic,报错不能超过机器数量。
1)创建12分区,3个副本
./kafka-topics.sh -bootstrap-server hadoop102:9092 --create --topic card --partitions 12 --replication-factor 3
2)查看分区和副本信息
我们可以看到,Kafka 使得我们每个副本的 Leader 都尽可能的不一样,这样很好地分担了读写压力,毕竟Kafka 生产者和消费者都是只对分区 leader 进行操作的。每4个分区对应一个副本,而且每一个副本的 AR 顺序 Kafka 都尽量使它们不一样,这样可以尽可能做到负载均衡。
Kafka(三)【Broker 存储】(2)https://developer.aliyun.com/article/1532329