Kafka Producer整体架构概述及源码分析(下)

简介: Kafka Producer整体架构概述及源码分析


Sender

Sender里最重要的方法莫过于run()方法,其中比较核心的方法是org.apache.kafka.clients.producer.internals.Sender#sendProducerData

「其中pollTimeout需要认真读注释,意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回0则表示走起发车了」

private long sendProducerData(long now) {
  // 获取当前集群的所有信息
  Cluster cluster = metadata.fetch();
  // get the list of partitions with data ready to send
  // @return ReadyCheckResult类的三个变量解释
  // 1.Set<Node> readyNodes 准备好发送的节点
  // 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
  // 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
  RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  // if there are any partitions whose leaders are not known yet, force metadata update
  // 如果有些topic不知道leader信息,更新metadata
  if (!result.unknownLeaderTopics.isEmpty()) {
      // The set of topics with unknown leader contains topics with leader election pending as well as
      // topics which may have expired. Add the topic again to metadata to ensure it is included
      // and request metadata update, since there are messages to send to the topic.
      for (String topic : result.unknownLeaderTopics)
          this.metadata.add(topic);
      this.metadata.requestUpdate();
  }
  // 去除不能发送信息的节点
  // remove any nodes we aren't ready to send to
  Iterator<Node> iter = result.readyNodes.iterator();
  long notReadyTimeout = Long.MAX_VALUE;
  while (iter.hasNext()) {
      Node node = iter.next();
      if (!this.client.ready(node, now)) {
          iter.remove();
          notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
      }
  }
  // 获取将要发送的消息
  // create produce requests
  Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
          this.maxRequestSize, now);
  // 保证发送消息的顺序
  if (guaranteeMessageOrder) {
      // Mute all the partitions drained
      for (List<ProducerBatch> batchList : batches.values()) {
          for (ProducerBatch batch : batchList)
              this.accumulator.mutePartition(batch.topicPartition);
      }
  }
  // 过期的batch
  List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
  boolean needsTransactionStateReset = false;
  // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
  // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
  // we need to reset the producer id here.
  if (!expiredBatches.isEmpty())
      log.trace("Expired {} batches in accumulator", expiredBatches.size());
  for (ProducerBatch expiredBatch : expiredBatches) {
      failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
      if (transactionManager != null && expiredBatch.inRetry()) {
          needsTransactionStateReset = true;
      }
      this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
  }
  if (needsTransactionStateReset) {
      transactionManager.resetProducerId();
      return 0;
  }
  sensors.updateProduceRequestMetrics(batches);
  // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
  // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
  // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
  // with sendable data that aren't ready to send since they would cause busy looping.
  // 1.The amount of time to block if there is nothing to do
  // 2.waiting for a channel to become ready; if zero, block indefinitely;
  long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
  if (!result.readyNodes.isEmpty()) {
      log.trace("Nodes with data ready to send: {}", result.readyNodes);
      // if some partitions are already ready to be sent, the select time would be 0;
      // otherwise if some partition already has some data accumulated but not ready yet,
      // the select time will be the time difference between now and its linger expiry time;
      // otherwise the select time will be the time difference between now and the metadata expiry time;
      pollTimeout = 0;
  }
  // 发送消息
  // 最后调用client.send() 把ProducerBatch转换为对应的ProduceRequest,并调用NetworkClient将消息写入网络发送出去
  sendProduceRequests(batches, now);
  return pollTimeout;
}

sendProduceRequests()详解

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
      if (batches.isEmpty())
          return;
      Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
      final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
      // find the minimum magic version used when creating the record sets
      byte minUsedMagic = apiVersions.maxUsableProduceMagic();
      for (ProducerBatch batch : batches) {
          if (batch.magic() < minUsedMagic)
              minUsedMagic = batch.magic();
      }
      for (ProducerBatch batch : batches) {
          TopicPartition tp = batch.topicPartition;
          MemoryRecords records = batch.records();
          // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
          // that the producer starts building the batch and the time that we send the request, and we may have
          // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
          // the new message format, but found that the broker didn't support it, so we need to down-convert on the
          // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
          // not all support the same message format version. For example, if a partition migrates from a broker
          // which is supporting the new magic version to one which doesn't, then we will need to convert.
          if (!records.hasMatchingMagic(minUsedMagic))
              records = batch.records().downConvert(minUsedMagic, 0, time).records();
          produceRecordsByPartition.put(tp, records);
          recordsByPartition.put(tp, batch);
      }
      String transactionalId = null;
      if (transactionManager != null && transactionManager.isTransactional()) {
          transactionalId = transactionManager.transactionalId();
      }
      // 将ProducerBatch转换为ProduceRequest
      ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
              produceRecordsByPartition, transactionalId);
      RequestCompletionHandler callback = new RequestCompletionHandler() {
          public void onComplete(ClientResponse response) {
              handleProduceResponse(response, recordsByPartition, time.milliseconds());
          }
      };
      String nodeId = Integer.toString(destination);
      // 将ProduceRequest转换为clientRequest
      ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
              requestTimeoutMs, callback);
      // 调用NetworkClient将消息写入网络发送出去
      client.send(clientRequest, now);
      log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
  }

其中也需要了解这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#ready。返回的类中3个关键参数的解释都在注释里。

/**
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
* partition batches.
* <p>
* A destination node is ready to send data if:
* <ol>
* <li>There is at least one partition that is not backing off its send
* <li><b>and</b> those partitions are not muted (to prevent reordering if
*   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
*   is set to one)</li>
* <li><b>and <i>any</i></b> of the following are true</li>
* <ul>
*     <li>The record set is full</li>
*     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
*     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
*     are immediately considered ready).</li>
*     <li>The accumulator has been closed</li>
* </ul>
* </ol>
*/
/**
* @return ReadyCheckResult类的三个变量解释
* 1.Set<Node> readyNodes 准备好发送的节点
* 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
* 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
*
* 一个节点满足以下任一条件则表示可以发送数据
* 1.batch满了
* 2.batch没满,但是等了lingerMs的时间
* 3.accumulator满了
* 4.accumulator关了
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
  Set<Node> readyNodes = new HashSet<>();
  long nextReadyCheckDelayMs = Long.MAX_VALUE;
  Set<String> unknownLeaderTopics = new HashSet<>();
  boolean exhausted = this.free.queued() > 0;
  for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
      TopicPartition part = entry.getKey();
      Deque<ProducerBatch> deque = entry.getValue();
      Node leader = cluster.leaderFor(part);
      synchronized (deque) {
          // leader没有且队列非空则添加unknownLeaderTopics
          if (leader == null && !deque.isEmpty()) {
              // This is a partition for which leader is not known, but messages are available to send.
              // Note that entries are currently not removed from batches when deque is empty.
              unknownLeaderTopics.add(part.topic());
              // 如果readyNodes不包含leader且muted不包含part
              // mute这个变量跟producer端的一个配置有关系:max.in.flight.requests.per.connection=1
              // 主要防止topic同分区下的消息乱序问题,限制了producer在单个broker连接上能够发送的未响应请求的数量
              // 如果设置为1,则producer在收到响应之前无法再给该broker发送该topic的PRODUCE请求
          } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
              ProducerBatch batch = deque.peekFirst();
              if (batch != null) {
                  long waitedTimeMs = batch.waitedTimeMs(nowMs);
                  boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                  // 等待时间
                  long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                  // batch满了
                  boolean full = deque.size() > 1 || batch.isFull();
                  // batch过期
                  boolean expired = waitedTimeMs >= timeToWaitMs;
                  boolean sendable = full || expired || exhausted || closed || flushInProgress();
                  if (sendable && !backingOff) {
                      readyNodes.add(leader);
                  } else {
                      long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                      // Note that this results in a conservative estimate since an un-sendable partition may have
                      // a leader that will later be found to have sendable data. However, this is good enough
                      // since we'll just wake up and then sleep again for the remaining time.
                      // 目前还没有leader,下次重试
                      nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                  }
              }
          }
      }
  }
  return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

还有一个方法就是org.apache.kafka.clients.producer.internals.RecordAccumulator#drain,从accumulator缓冲区获取要发送的数据,最大一次性发max.request.size大小的数据(最上面的配置参数里有):

/**
* Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
*
* @param cluster The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* maxSize也就是producer端配置参数max.request.size来控制的,一次最多发多少
* @param now The current unix time in milliseconds
* @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
*/
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
  if (nodes.isEmpty())
      return Collections.emptyMap();
  Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
  for (Node node : nodes) {
      // for循环获取要发的batch
      List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
      batches.put(node.id(), ready);
  }
  return batches;
}
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
  int size = 0;
  // 获取node的partition
  List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
  List<ProducerBatch> ready = new ArrayList<>();
  /* to make starvation less likely this loop doesn't start at 0 */
  // 避免每次都从一个partition取,要雨露均沾
  int start = drainIndex = drainIndex % parts.size();
  do {
      PartitionInfo part = parts.get(drainIndex);
      TopicPartition tp = new TopicPartition(part.topic(), part.partition());
      this.drainIndex = (this.drainIndex + 1) % parts.size();
      // Only proceed if the partition has no in-flight batches.
      if (isMuted(tp, now))
          continue;
      Deque<ProducerBatch> deque = getDeque(tp);
      if (deque == null)
          continue;
      // 加锁,不用说了吧
      synchronized (deque) {
          // invariant: !isMuted(tp,now) && deque != null
          ProducerBatch first = deque.peekFirst();
          if (first == null)
              continue;
          // first != null
          // 查看是否在backoff期间
          boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
          // Only drain the batch if it is not during backoff period.
          if (backoff)
              continue;
          // 超过maxSize且ready里有东西
          if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
              // there is a rare case that a single batch size is larger than the request size due to
              // compression; in this case we will still eventually send this batch in a single request
              // 有一种特殊的情况,batch的大小超过了maxSize,且batch是空的。也就是一个batch大小直接大于一次发送的maxSize
              // 这种情况下最终还是会发送这个batch在一次请求
              break;
          } else {
              if (shouldStopDrainBatchesForPartition(first, tp))
                  break;
              boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false;
              ProducerIdAndEpoch producerIdAndEpoch =
                  transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
              ProducerBatch batch = deque.pollFirst();
              if (producerIdAndEpoch != null && !batch.hasSequence()) {
                  // If the batch already has an assigned sequence, then we should not change the producer id and
                  // sequence number, since this may introduce duplicates. In particular, the previous attempt
                  // may actually have been accepted, and if we change the producer id and sequence here, this
                  // attempt will also be accepted, causing a duplicate.
                  //
                  // Additionally, we update the next sequence number bound for the partition, and also have
                  // the transaction manager track the batch so as to ensure that sequence ordering is maintained
                  // even if we receive out of order responses.
                  batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                  transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                  log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                          "{} being sent to partition {}", producerIdAndEpoch.producerId,
                      producerIdAndEpoch.epoch, batch.baseSequence(), tp);
                  transactionManager.addInFlightBatch(batch);
              }
              // 添加batch,并且close
              batch.close();
              size += batch.records().sizeInBytes();
              ready.add(batch);
              batch.drained(now);
          }
      }
  } while (start != drainIndex);
  return ready;
}

总结

以上几个方法主要做了如下几件事:

  • 从RecordAccumulator中读取ProducerBatch,获取node列表,并将ProducerBatch与node建立对应关系;
  • 将ProducerBatch转换为ProducerRequest,再进一步转换为ClientRequest;
  • 调用NetWorkClient的send方法将消息发送出去;
相关文章
|
3月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
88 2
|
3月前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
114 0
|
2月前
|
消息中间件 缓存 架构师
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
Kafka 是一个高吞吐量、高性能的消息中间件,关于 Kafka 高性能背后的实现,是大厂面试高频问题。本篇全面详解 Kafka 高性能背后的实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
|
2月前
|
存储 监控 Linux
Docker技术架构概述
【10月更文挑战第22天】Docker采用CS架构,Client与Daemon交互,Compose管理多容器应用。
|
2月前
|
消息中间件 存储 负载均衡
【赵渝强老师】Kafka的体系架构
Kafka消息系统是一个分布式系统,包含生产者、消费者、Broker和ZooKeeper。生产者将消息发送到Broker,消费者从Broker中拉取消息并处理。主题按分区存储,每个分区有唯一的偏移量地址,确保消息顺序。Kafka支持负载均衡和容错。视频讲解和术语表进一步帮助理解。
|
3月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
82 5
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
92 4
|
3月前
|
存储 消息中间件 大数据
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
34 1
|
3月前
|
前端开发 Unix Linux
KVM 架构概述
【10月更文挑战第12天】KVM是基于硬件辅助虚拟化技术的虚拟机监控器,核心依赖于CPU的虚拟化支持如Intel VT和AMD-V。
|
3月前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
63 0

热门文章

最新文章