探究Kafka原理-5.Kafka设计原理和生产者原理解析(下)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 探究Kafka原理-5.Kafka设计原理和生产者原理解析

探究Kafka原理-5.Kafka设计原理和生产者原理解析(上):https://developer.aliyun.com/article/1413723


kafka 关键原理加强


日志分段切分条件


(1)当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值log.segment.bytes 参数的默认值为 1073741824,即 1GB


(2)当前日志分段中消息的最小时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高默认情况下,只配置了 log.roll.hours 参数,其值为 168,即 7 天。


(3)偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。log.index.size .max.bytes 的默认值为 10485760,即 10MB


(4)追加的消息的偏移量与当前日志分段的起始偏移量之间的差值大于 Integer.MAX_VALUE, 即要追加的消息的偏移量不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE)。


controller 控制器


Controller 简单来说,就是 kafka 集群的状态管理者


在 Kafka 集群中会有一个或者多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责维护整个集群中所有分区和副本的状态及分区 leader 的选举。


当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。


Kafka 中的控制器选举的工作依赖于 Zookeeper,成功竞选为控制器的 broker 会在 Zookeeper 中创建/controller 这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:

{"version":1,"brokerid":0,"timestamp":"1529210278988"}

其中 version 在目前版本中固定为 1,brokerid 表示成为控制器的 broker 的 id 编号,timestamp 表示竞选成为控制器时的时间戳。


在任意时刻,集群中有且仅有一个控制器。每个 broker 启动的时候会去尝试去读取 zookeeper 上的/controller 节点的 brokerid 的值,如果读取到 brokerid 的值不为-1,则表示已经有其它 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;如果 Zookeeper 中不存在/controller 这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller 这个节点,当前 broker 去创建节点的时候,也有可能其他 broker 同时去尝试创建这个节点,只有创建成功的那个 broker 才会成为控制器,而创建失败的 broker 则表示竞选失败。每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId。


controller 竞选机制:简单说,先来先上!


controller 的职责


监听 partition 相关变化

对 Zookeeper 中的/admin/reassign_partitions 节点注册 PartitionReassignmentListener,用来处理分区重分配的动作。
对 Zookeeper 中的/isr_change_notification 节点注册 IsrChangeNotificetionListener,用来处理ISR 集合变更的动作。
对 Zookeeper 中的/admin/preferred-replica-election 节点添加PreferredReplicaElectionListener,用来处理优先副本选举。

监听 topic 增减变化

对 Zookeeper 中的/brokers/topics 节点添加 TopicChangeListener,用来处理 topic 增减的变化;
对 Zookeeper 中的/admin/delete_topics 节点添加 TopicDeletionListener,用来处理删除 topic 的动作

监听 broker 相关的变化

对 Zookeeper 中的/brokers/ids/节点添加 BrokerChangeListener,用来处理 broker 增减的变化。

更新集群的元数据信息

从 Zookeeper 中读取获取当前所有与 topic、partition 以及 broker 有关的信息并进行相应的管理。对各topic 所对应的
Zookeeper 中的/brokers/topics/[topic]节点添加 PartitionModificationsListener,用来监听 topic 中的分区分配变化。并将最新信息同步给其他所有 broker

启动并管理分区状态机和副本状态机。


如果参数 auto.leader.rebalance.enable 设置为 true,则还会开启一个名为“auto-leader-rebalance-task”


的定时任务来负责维护分区的 leader


分区的负载分布


topic中的每个分区的每个副本及其leader副本,在集群中的众多broker中,如何分布的。


有两大策略:


策略1:kafka自动分布

策略2:topic的创建者手动指定


客户端请求创建一个 topic 时,每一个分区副本在 broker 上的分配,是由集群 controller 来决定;其分布策略源码如下:

private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                               replicationFactor: Int,
                                               brokerList: Seq[Int],
                                               fixedStartIndex: Int,
                                               startPartitionId: Int): Map[Int, Seq[Int]] = {
  val ret = mutable.Map[Int, Seq[Int]]()
  val brokerArray = brokerList.toArray
  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  var currentPartitionId = math.max(0, startPartitionId)
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  for (_ <- 0 until nPartitions) {
    if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) {
      nextReplicaShift += 1
    }
    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
    val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
    for (j <- 0 until replicationFactor - 1) {
      replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
    }
    ret.put(currentPartitionId, replicaBuffer)
    currentPartitionId += 1
  }
  ret
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex:
Int, nBrokers: Int): Int = {
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  (firstReplicaIndex + shift) % nBrokers
}
副本因子不能大于 Broker 的个数;
partition_0 的第 1 个副本(leader 副本)放置位置是随机从 brokerList 选择的;
其他分区的第 1 个副本(leader)放置位置相对于 paritition_0 分区依次往后移(也就是如果我们有 5 个 Broker,5 个分区,假设 partition0 分区放在 broker4 上,那么 partition1 将会放在 broker5上;patition2 将会放在 broker1 上;partition3 在 broker2,依次类);
各分区剩余的副本相对于分区前一个副本偏移随机数 nextReplicaShift


分区 Leader 的选举机制


分区 leader 副本的选举由控制器 controller 负责具体实施


当创建分区(创建主题或增加分区都有创建分区的动作)或 Leader 下线(此时分区需要选举一个新的 leader 上线来对外提供服务)的时候都需要执行 leader 的选举动作。

选举策略:按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中;


一个分区的 AR 集合在 partition 分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的 ISR 集合中副本的顺序可能会改变


生产者原理解析


生产者工作流程


一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 。


在主线程中由 kafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。


Sender 线程负责从 RecordAccumulator 获取消息并将其发送到 Kafka 中;


RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B ,即 32M。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms 的配置,此参数的默认值为 60000,即 60 秒。


主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中,RecordAccumulator 内部为每个分区都维护了一个双端队列,即 Deque消息写入缓存时,追加到双端队列的尾部;


Sender 读取消息时,从双端队列的头部读取。注意:ProducerBatch 是指一个消息批次;与此同时,会将较小的 ProducerBatch 凑成一个较大 ProducerBatch ,也可以减少网络请求的次数以提升整体的吞吐量。


ProducerBatch 大小和 batch.size 参数也有着密切的关系。当一条消息(ProducerRecord ) 流入RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch (如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以写入,如果不可以则需要创建一个新的 Producer Batch。在新建ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数大小,如果不超过,那么就以 batch.size参数的大小来创建 ProducerBatch 。


如果生产者客户端需要向很多分区发送消息, 则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。


Sender 从 RecordAccumulator 获取缓存的消息之后,会进一步将<分区,Deque>的形式转变成的形式,其中 Node 表示 Kafka 集群 broker 节点。


对于网络连接来说,生产者客户端是与具体 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络 I/O 层面的转换。


在转换成的形式之后, Sender 会进一步封装成 的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是 Kafka 各种协议请求;


但是还是存在发送失败的情况,所以请求在从 sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请(NodeId是一个String类型,表示节点的id编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.request.per. connection , 默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response )。


在上图中还可以看到,当发送成功了之后,会将Response返回给InFlightRequests 中,然后从队列里面将对应的Request移除,说明这个请求就再也不需要存着了。保存的意义其实也就是为了可以重试,当把Response返回的时候,如果失败了,那么其实是可以重试的,并不是无穷的,而是有一个参数的上限。


通过比较 Deque 的 size 这个参数的大小来判断对应的 Node 中是否己经堆积了很多未响应的消息,如果真是此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续发送请求会增大请求超时的可能。


但是可能存在一个有序性的问题。


假如代码中:


send(a);


send(b);


有没有一种可能,到达服务端broker的log数据文件中的时候,文件中的数据长这样?


b


a


比如 a先失败了,进缓存, b成功了,a又发送成功了。也就是producer内部的重试机制,有可能造成数据在服务端存储的乱序。


这个流程不像我们代码里自己设置,如果发送失败了去重试,那样其实就变成同步了,只有一条成功了才继续发吓一条。


重要的生产者参数


acks


acks 是控制 kafka 服务端向生产者应答消息写入成功的条件;


生产者根据得到的确认信息,来判断消息发送是否成功;


那么涉及到ack的有两个问题?其参数起什么作用?ack参数有哪几种类型,分别有什么特点。


Producer 往 Broker 发送消息应答机制


kafka 在 producer 里面提供了消息确认机制。我们可以通过配置来决定消息发送到对应分区的几个副 本 才 算 消 息 发 送 成 功 。可 以 在 构 造 producer 时 通 过 acks 参 数 指 定 ( 在 0.8.2.X 前 是 通过 request.required.acks 参数设置的)。这个参数支持以下三种值:


acks = 0:


意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 kafka 。在这种情况下还是有可能发生错误,比如发送的对象不能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,大概率会丢失一些消息。


acks = 1:


意味着 leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 leader 选举,生产者会在选举时收到一个LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 leader,但在消息被复制到 follower 副本之前 leader 发生崩溃。


acks = all:


(这个和 request.required.acks = -1 含义一样):意味着 leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。


根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。

acks 含义
0 Producer 往集群发送数据不需要等到集群的确认信息,不确保消息发送成功。安全性最低但是效率最高。
1 Producer 往集群发送数据只要 leader 成功写入消息就可以发送下一条,只确保 Leader 接收成功。
-1 或 all Producer 往集群发送数据需要所有的 ISR Follower 都完成从 Leader 的同步才会发送下一条,确保Leader 发送成功和所有的副本都成功接收。安全性最高,但是效率最低。

生产者将 acks 设置为 all,是否就一定不会丢数据呢?(-1是代表所有的ISR副本拿到)


否!如果在某个时刻 ISR 列表只剩 leader 自己了,那么就算 acks=all,收到这条数据还是只有一个节点;


可以配合另外一个参数缓解此情况: 最小同步副本数>=2


BROKER 端参数: min.insync.replicas(默认 1)


如果让min.insync.replicas = 分区副本总数,那么这个分区的可用性就会极大程度的降低(经常不可用,因为只要有一个副本掉队,那么整个分区就不可用了,导致分区经常不能读写)


所以综上所述,kafka中没有一个尽善尽美的方案,都是在取舍中进行的。


但是还存在一个问题,就是如果我的数据又不能丢,产生数据的速度还特别快,那么一定得配置 ack = -1


但是数据写入速度就跟不上产生得速度,那就会在我的生产端出现数据积压。


解决办法就是:**加分区,扩集群 ** 生产者写的时候,有的写0 有的写1 有的写2 这样就增加了吞吐量。也就是加分区增加并行度,扩展了吞吐量。


一般情况 配置 ack = 1,中庸之道,毕竟节点不可能老挂。


kafka本身,作为流失计算的缓冲用的,流式计算,多半是无法保证百分之一百正确的。


重要的生产者参数


max.request.size


这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 lMB 一般情况下,这个默认值就可以满足大多数的应用场景了。


这个参数还涉及一些其它参数的联动,比如 broker 端(topic 级别参数)的 message.max.bytes 参数( 默 认 1000012), 如 果 配 置 错 误 可 能 会 引 起 一 些 不 必 要 的 异 常 ; 比 如 将 broker 端 的message.max.bytes 参数配置为 10 ,而 max.request.size 参数配置为 20,那么当发送一条大小为 15B的消息时,生产者客户端就会报出异常;


retries 和 retry.backoff.ms


retries 参数用来配置生产者重试的次数,默认值为 0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、 leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于 0 的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。


重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为 100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。


如果将 retries 参数配置为非零值,并且 max .in.flight.requests.per.connection 参数配置为大于 1 的值,那可能会出现错序的现象:如果批次 1 消息写入失败,而批次 2 消息写入成功,那么生产者会重试发送批次 1 的消息,此时如果批次 1 的消息写入成功,那么这两个批次的消息就出现了错序。对于某些应用来说,顺序性非常重要 ,比如 MySQL binlog 的传输,如果出现错误就会造成非常严重的后果;


一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight.requests.per.connection 配置为 1 ,而不是把 retries 配置为 0,不过这样也会影响整体的吞吐。


compression.type


这个参数用来指定消息的压缩方式,默认值为“none ",即默认情况下,消息不会被压缩。


该参数还可以配置为 “gzip”,“snappy” 和 “lz4”。


对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能 。


**消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;这个也是一种权衡 **


batch.size


每个 Batch 要存放 batch.size 大小的数据后,才可以发送出去。比如说 batch.size 默认值是 16KB,那么里面凑够 16KB 的数据才会发送。


理论上来说,提升 batch.size 的大小,可以允许更多的数据缓冲在 recordAccumulator 里面,那么一次Request 发送出去的数据量就更多了,这样吞吐量可能会有所提升。


linger.ms


这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入ProducerBatch 时间,默认值为 0。生产者客户端会在 ProducerBatch 填满或等待时间超过 linger.ms


增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。

linger.ms = 0


batchsize = 100


那么是不是batchsize 就无效了。


设置了linger.ms = 0 理论上应该马上走,但是此时 sender线程忙不过来,所以只能阻塞着。


linger.ms = 0 相当于一旦有车就立马接走,没车就阻塞着。


enable.idempotence


是否开启幂等性功能,详见后续原理加强;


int a = 1;


a++; // 非幂等操作


val map = new HashMap()


map.put(“a”,1); // 幂等操作


在 kafka 中,同一条消息,生产者如果多次重试发送,在服务器中的结果如果还是只有一条,这就是具备幂等性;否则,就不具备幂等性!


所以,producer内部有重试机制


1.有可能造成服务端数据的乱序

2.有可能造成服务端数据的重复


partitioner.class


用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner


默认分区器的分区规则:


  • 如果数据中有 key,则按 key 的 murmur hash 值 % topic 分区总数得到目标分区
  • 如果数据只有 value,则在各个分区间


自定义 partitioner 需要实现 org.apache.kafka.clients.producer.Partitioner 接口

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                         Cluster cluster) {
        // 这里可以根据自己的业务逻辑计算出要发送到哪个分区
        int numPartitions = cluster.partitionCountForTopic(topic);
        if (key == null) {
            return 0;
        }
        return Math.abs(key.hashCode() % numPartitions);
    }
    @Override
    public void close() {
        // do nothing
    }
    @Override
    public void configure(Map<String, ?> configs) {
        // do nothing
    }
}
使用自定义的Partitioner也很简单,只需要在创建Producer时指定即可。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.CustomPartitioner");
Producer<String, String> producer = new KafkaProducer<>(props);
目录
相关文章
|
4天前
|
Java
并发编程之线程池的底层原理的详细解析
并发编程之线程池的底层原理的详细解析
15 0
|
15天前
|
消息中间件 存储 算法
深度解析Kafka中的消息奥秘
深度解析Kafka中的消息奥秘
35 0
|
1月前
|
关系型数据库 MySQL Shell
CMake构建Makefile深度解析:从底层原理到复杂项目(三)
CMake构建Makefile深度解析:从底层原理到复杂项目
33 0
|
1月前
|
编译器 vr&ar C++
CMake构建Makefile深度解析:从底层原理到复杂项目(二)
CMake构建Makefile深度解析:从底层原理到复杂项目
36 0
|
1月前
|
存储 安全 编译器
【C++ 17 新功能 std::visit 】深入解析 C++17 中的 std::visit:从原理到实践
【C++ 17 新功能 std::visit 】深入解析 C++17 中的 std::visit:从原理到实践
70 0
|
28天前
|
安全 Java 数据安全/隐私保护
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
66 1
|
1天前
|
安全 索引
【集合】03 Linkedlist原理深入解析
【集合】03 Linkedlist原理深入解析
6 0
|
1天前
|
Java Spring 容器
SpringBoot自动装配原理之@Import注解解析
SpringBoot自动装配原理之@Import注解解析
|
4天前
|
缓存 JavaScript 前端开发
|
4天前
|
SQL 分布式计算 资源调度
一文解析 ODPS SQL 任务优化方法原理
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。

推荐镜像

更多