Kafka Producer整体架构概述及源码分析(上)

简介: Kafka Producer整体架构概述及源码分析

整体架构

640.png

「线程」

  • 整个 Kafka 客户端由两个线程协调运行,即Main线程和Sender线程。
  • 在Main线程中由KafkaProducer创建消息,然后通过Interceptor、Serializer和Partitioner之后缓存到RecordAccumulator(消息累加器)中。
  • Sender线程 负责从RecordAccumulator中获取消息并发送到Kafka中。

「RecordAccumulator」

  • RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
  • RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory进行配置,默认值是32MB。如果生产者发送消息的速度超过了发送到客户端的速度,则会导致生产者空间不足,此时KafkaProducer send()方法的调用要么会被阻塞,要么抛出异常。
  • KafkaProducer发送消息的速度可以有参数max.block.ms进行配置,此参数默认值为60秒。

「ProducerBatch」

  • Main线程发送过来的消息会被追加到RecordAccumulator的Deque(双端队列)中,在RecordAccumulator的内部每个Partition都维护了一个Deque,Deque中的内容就是ProducerBatch,即:Deque。
  • 消息被写入缓存时,会被追加到Deque的尾部。Sender读取消息时,会从Deque的头部进行读取。
  • ProducerBatch中可以包含一到多个ProducerRecord(生产者创建的消息),这样可以使字节的使用更加紧凑。同时,将娇小的ProducerRecord拼成一个较大的ProducerBatch也可以减少网络请求的次数以提高整体的吞吐量。
  • 如果生产者需要向多个分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

「BufferPool」

  • 消息在网络上都是以字节进行传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中通过java.io.ByteBuffer实现消息的创建和释放,不过频繁的创建和释放比较消耗资源,在RecordAccumulator的内部还有一个BufferPool,它主要用来试验ByteBuffer的复用,已实现缓存的高效利用。
  • 但是BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会进入BufferPool。此特定值的大小可以通过参数batch.size进行配置以实现缓存不同大小的消息。

「ProducerBatch与batch.size关系」

  • 当一条消息ProducerRecord进入RecordAccumulator中时,会先寻找与消息分区所对应的的Deque(如果没有则新创建),在从这个Deque的尾部获取一个ProducerBatch(如果没有则新创建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,否则需要创建一个新的ProducerBatch。
  • 在新建ProducerBatch时需要评估这条消息的大小是否超过batch.size,如果不超过,就以batch.size的大小来创建这个ProducerBatch,这样在使用完后还可以通过BufferPool的管理进行复用。若果超过,则以消息的大小来创建ProducerBatch,此内存区域不会被复用。

「Sender」

  • Sender从RecordAccumulator中获取缓存的消息后,会进一步将原本<TopicPartition, Deque>的保存形式进一步转换为<Node,List>的形式,其中Node表示Kafka集群中的Broker节点。
  • 对于网络连接来说,生产者客户端与具体的Broker节点建立连接,也就是向具体的Broker节点发送消息,而并不关心消息属于哪个分区;对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。
  • 在转换成<Node,List>的形式之后,Sender还会进一步封装成<Node,List>的形式,这样就可以将Request请求发送到各个Node。

「InFlightRequests」

  • 请求从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式是Map<NodeId, Deque>,其主要作用是缓存已经发出去但还没有收到响应的请求。与此同时,InFlightRequests还提供了趣多管理类的方法,并且通过配置参数还可以限制每个连接(即客户端与Node之间的连接)最多缓存的请求数。此参数为max.in.flight.requests.per.connection,默认值是5。超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应。
  • 通过比较Deque的size与配置的最大连接数可以判断对应的node是否已经堆积了很多未响应的请求。如果已有较大未响应请求的堆积,那么说明这个Node节点负载较大或者网络连接有问题,再继续向其发送请求会增大请求超时的可能。

源码分析及图解原理

RecordAccumulator

在RecordAccumulator中,最核心的参数就是:

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

它是一个ConcurrentMap,key是TopicPartition类,代表一个topic的一个partition。value是一个包含ProducerBatch的双端队列。等待Sender线程发送给broker。画张图来看下:

640.png

「再从源码角度来看如何添加到缓冲区队列里的,主要看这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#append:」

/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param tp The topic/partition to which this record is being sent
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
*/
public RecordAppendResult append(TopicPartition tp,
                               long timestamp,
                               byte[] key,
                               byte[] value,
                               Header[] headers,
                               Callback callback,
                               long maxTimeToBlock) throws InterruptedException {
  // We keep track of the number of appending thread to make sure we do not miss batches in
  // abortIncompleteBatches().
  appendsInProgress.incrementAndGet();
  ByteBuffer buffer = null;
  if (headers == null) headers = Record.EMPTY_HEADERS;
  try {
      // check if we have an in-progress batch
      // 其实就是一个putIfAbsent操作的方法,不展开分析
      Deque<ProducerBatch> dq = getOrCreateDeque(tp);
      // batches是线程安全的,但是Deque不是线程安全的
      // 已有在处理中的batch
      synchronized (dq) {
          if (closed)
              throw new IllegalStateException("Cannot send after the producer is closed.");
          RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
          if (appendResult != null)
              return appendResult;
      }
      // we don't have an in-progress record batch try to allocate a new batch
      // 创建一个新的ProducerBatch
      byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
      // 分配一个内存
      int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
      log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
      // 申请不到内存
      buffer = free.allocate(size, maxTimeToBlock);
      synchronized (dq) {
          // Need to check if producer is closed again after grabbing the dequeue lock.
          if (closed)
              throw new IllegalStateException("Cannot send after the producer is closed.");
          // 再次尝试添加,因为分配内存的那段代码并不在synchronized块中
          // 有可能这时候其他线程已经创建好RecordBatch了,finally会把分配好的内存还回去
          RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
          if (appendResult != null) {
              // 作者自己都说了,希望不要总是发生,多个线程都去申请内存,到时候还不是要还回去?
              // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
              return appendResult;
          }
          // 创建ProducerBatch
          MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
          ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
          FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
          dq.addLast(batch);
          // incomplete是一个Set集合,存放不完整的batch
          incomplete.add(batch);
          // Don't deallocate this buffer in the finally block as it's being used in the record batch
          buffer = null;
          // 返回记录添加结果类
          return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
      }
  } finally {
      // 释放要还的内存
      if (buffer != null)
          free.deallocate(buffer);
      appendsInProgress.decrementAndGet();
  }
}

附加tryAppend()方法,不多说,都在代码注释里:

/**
 *  Try to append to a ProducerBatch.
 *
 *  If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
 *  resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
 *  and memory records built) in one of the following cases (whichever comes first): right before send,
 *  if it is expired, or when the producer is closed.
 */
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
    // 获取最新加入的ProducerBatch
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            last.closeForRecordAppends();
        else
            // 记录添加结果类包含future、batch是否已满的标记、是否是新batch创建的标记
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    }
    // 如果这个Deque没有ProducerBatch元素,或者已经满了不足以加入本条消息则返回null
    return null;
}

以上代码见图解:

fee75e2a95915fb64311f7c1705c37b1.png


相关文章
|
29天前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
61 2
|
29天前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
73 0
|
9天前
|
存储 监控 Linux
Docker技术架构概述
【10月更文挑战第22天】Docker采用CS架构,Client与Daemon交互,Compose管理多容器应用。
|
30天前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
59 5
|
30天前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
58 4
|
19天前
|
前端开发 Unix Linux
KVM 架构概述
【10月更文挑战第12天】KVM是基于硬件辅助虚拟化技术的虚拟机监控器,核心依赖于CPU的虚拟化支持如Intel VT和AMD-V。
|
29天前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
38 0
|
29天前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
54 0
|
29天前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
37 0
|
29天前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
35 0

热门文章

最新文章