Kafka Producer整体架构概述及源码分析(下)

简介: Kafka Producer整体架构概述及源码分析


Sender

Sender里最重要的方法莫过于run()方法,其中比较核心的方法是org.apache.kafka.clients.producer.internals.Sender#sendProducerData

「其中pollTimeout需要认真读注释,意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回0则表示走起发车了」

private long sendProducerData(long now) {
  // 获取当前集群的所有信息
  Cluster cluster = metadata.fetch();
  // get the list of partitions with data ready to send
  // @return ReadyCheckResult类的三个变量解释
  // 1.Set<Node> readyNodes 准备好发送的节点
  // 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
  // 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
  RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  // if there are any partitions whose leaders are not known yet, force metadata update
  // 如果有些topic不知道leader信息,更新metadata
  if (!result.unknownLeaderTopics.isEmpty()) {
      // The set of topics with unknown leader contains topics with leader election pending as well as
      // topics which may have expired. Add the topic again to metadata to ensure it is included
      // and request metadata update, since there are messages to send to the topic.
      for (String topic : result.unknownLeaderTopics)
          this.metadata.add(topic);
      this.metadata.requestUpdate();
  }
  // 去除不能发送信息的节点
  // remove any nodes we aren't ready to send to
  Iterator<Node> iter = result.readyNodes.iterator();
  long notReadyTimeout = Long.MAX_VALUE;
  while (iter.hasNext()) {
      Node node = iter.next();
      if (!this.client.ready(node, now)) {
          iter.remove();
          notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
      }
  }
  // 获取将要发送的消息
  // create produce requests
  Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
          this.maxRequestSize, now);
  // 保证发送消息的顺序
  if (guaranteeMessageOrder) {
      // Mute all the partitions drained
      for (List<ProducerBatch> batchList : batches.values()) {
          for (ProducerBatch batch : batchList)
              this.accumulator.mutePartition(batch.topicPartition);
      }
  }
  // 过期的batch
  List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
  boolean needsTransactionStateReset = false;
  // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
  // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
  // we need to reset the producer id here.
  if (!expiredBatches.isEmpty())
      log.trace("Expired {} batches in accumulator", expiredBatches.size());
  for (ProducerBatch expiredBatch : expiredBatches) {
      failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
      if (transactionManager != null && expiredBatch.inRetry()) {
          needsTransactionStateReset = true;
      }
      this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
  }
  if (needsTransactionStateReset) {
      transactionManager.resetProducerId();
      return 0;
  }
  sensors.updateProduceRequestMetrics(batches);
  // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
  // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
  // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
  // with sendable data that aren't ready to send since they would cause busy looping.
  // 1.The amount of time to block if there is nothing to do
  // 2.waiting for a channel to become ready; if zero, block indefinitely;
  long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
  if (!result.readyNodes.isEmpty()) {
      log.trace("Nodes with data ready to send: {}", result.readyNodes);
      // if some partitions are already ready to be sent, the select time would be 0;
      // otherwise if some partition already has some data accumulated but not ready yet,
      // the select time will be the time difference between now and its linger expiry time;
      // otherwise the select time will be the time difference between now and the metadata expiry time;
      pollTimeout = 0;
  }
  // 发送消息
  // 最后调用client.send() 把ProducerBatch转换为对应的ProduceRequest,并调用NetworkClient将消息写入网络发送出去
  sendProduceRequests(batches, now);
  return pollTimeout;
}

sendProduceRequests()详解

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
      if (batches.isEmpty())
          return;
      Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
      final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
      // find the minimum magic version used when creating the record sets
      byte minUsedMagic = apiVersions.maxUsableProduceMagic();
      for (ProducerBatch batch : batches) {
          if (batch.magic() < minUsedMagic)
              minUsedMagic = batch.magic();
      }
      for (ProducerBatch batch : batches) {
          TopicPartition tp = batch.topicPartition;
          MemoryRecords records = batch.records();
          // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
          // that the producer starts building the batch and the time that we send the request, and we may have
          // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
          // the new message format, but found that the broker didn't support it, so we need to down-convert on the
          // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
          // not all support the same message format version. For example, if a partition migrates from a broker
          // which is supporting the new magic version to one which doesn't, then we will need to convert.
          if (!records.hasMatchingMagic(minUsedMagic))
              records = batch.records().downConvert(minUsedMagic, 0, time).records();
          produceRecordsByPartition.put(tp, records);
          recordsByPartition.put(tp, batch);
      }
      String transactionalId = null;
      if (transactionManager != null && transactionManager.isTransactional()) {
          transactionalId = transactionManager.transactionalId();
      }
      // 将ProducerBatch转换为ProduceRequest
      ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
              produceRecordsByPartition, transactionalId);
      RequestCompletionHandler callback = new RequestCompletionHandler() {
          public void onComplete(ClientResponse response) {
              handleProduceResponse(response, recordsByPartition, time.milliseconds());
          }
      };
      String nodeId = Integer.toString(destination);
      // 将ProduceRequest转换为clientRequest
      ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
              requestTimeoutMs, callback);
      // 调用NetworkClient将消息写入网络发送出去
      client.send(clientRequest, now);
      log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
  }

其中也需要了解这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#ready。返回的类中3个关键参数的解释都在注释里。

/**
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
* partition batches.
* <p>
* A destination node is ready to send data if:
* <ol>
* <li>There is at least one partition that is not backing off its send
* <li><b>and</b> those partitions are not muted (to prevent reordering if
*   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
*   is set to one)</li>
* <li><b>and <i>any</i></b> of the following are true</li>
* <ul>
*     <li>The record set is full</li>
*     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
*     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
*     are immediately considered ready).</li>
*     <li>The accumulator has been closed</li>
* </ul>
* </ol>
*/
/**
* @return ReadyCheckResult类的三个变量解释
* 1.Set<Node> readyNodes 准备好发送的节点
* 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
* 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
*
* 一个节点满足以下任一条件则表示可以发送数据
* 1.batch满了
* 2.batch没满,但是等了lingerMs的时间
* 3.accumulator满了
* 4.accumulator关了
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
  Set<Node> readyNodes = new HashSet<>();
  long nextReadyCheckDelayMs = Long.MAX_VALUE;
  Set<String> unknownLeaderTopics = new HashSet<>();
  boolean exhausted = this.free.queued() > 0;
  for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
      TopicPartition part = entry.getKey();
      Deque<ProducerBatch> deque = entry.getValue();
      Node leader = cluster.leaderFor(part);
      synchronized (deque) {
          // leader没有且队列非空则添加unknownLeaderTopics
          if (leader == null && !deque.isEmpty()) {
              // This is a partition for which leader is not known, but messages are available to send.
              // Note that entries are currently not removed from batches when deque is empty.
              unknownLeaderTopics.add(part.topic());
              // 如果readyNodes不包含leader且muted不包含part
              // mute这个变量跟producer端的一个配置有关系:max.in.flight.requests.per.connection=1
              // 主要防止topic同分区下的消息乱序问题,限制了producer在单个broker连接上能够发送的未响应请求的数量
              // 如果设置为1,则producer在收到响应之前无法再给该broker发送该topic的PRODUCE请求
          } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
              ProducerBatch batch = deque.peekFirst();
              if (batch != null) {
                  long waitedTimeMs = batch.waitedTimeMs(nowMs);
                  boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                  // 等待时间
                  long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                  // batch满了
                  boolean full = deque.size() > 1 || batch.isFull();
                  // batch过期
                  boolean expired = waitedTimeMs >= timeToWaitMs;
                  boolean sendable = full || expired || exhausted || closed || flushInProgress();
                  if (sendable && !backingOff) {
                      readyNodes.add(leader);
                  } else {
                      long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                      // Note that this results in a conservative estimate since an un-sendable partition may have
                      // a leader that will later be found to have sendable data. However, this is good enough
                      // since we'll just wake up and then sleep again for the remaining time.
                      // 目前还没有leader,下次重试
                      nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                  }
              }
          }
      }
  }
  return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

还有一个方法就是org.apache.kafka.clients.producer.internals.RecordAccumulator#drain,从accumulator缓冲区获取要发送的数据,最大一次性发max.request.size大小的数据(最上面的配置参数里有):

/**
* Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
*
* @param cluster The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* maxSize也就是producer端配置参数max.request.size来控制的,一次最多发多少
* @param now The current unix time in milliseconds
* @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
*/
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
  if (nodes.isEmpty())
      return Collections.emptyMap();
  Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
  for (Node node : nodes) {
      // for循环获取要发的batch
      List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
      batches.put(node.id(), ready);
  }
  return batches;
}
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
  int size = 0;
  // 获取node的partition
  List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
  List<ProducerBatch> ready = new ArrayList<>();
  /* to make starvation less likely this loop doesn't start at 0 */
  // 避免每次都从一个partition取,要雨露均沾
  int start = drainIndex = drainIndex % parts.size();
  do {
      PartitionInfo part = parts.get(drainIndex);
      TopicPartition tp = new TopicPartition(part.topic(), part.partition());
      this.drainIndex = (this.drainIndex + 1) % parts.size();
      // Only proceed if the partition has no in-flight batches.
      if (isMuted(tp, now))
          continue;
      Deque<ProducerBatch> deque = getDeque(tp);
      if (deque == null)
          continue;
      // 加锁,不用说了吧
      synchronized (deque) {
          // invariant: !isMuted(tp,now) && deque != null
          ProducerBatch first = deque.peekFirst();
          if (first == null)
              continue;
          // first != null
          // 查看是否在backoff期间
          boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
          // Only drain the batch if it is not during backoff period.
          if (backoff)
              continue;
          // 超过maxSize且ready里有东西
          if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
              // there is a rare case that a single batch size is larger than the request size due to
              // compression; in this case we will still eventually send this batch in a single request
              // 有一种特殊的情况,batch的大小超过了maxSize,且batch是空的。也就是一个batch大小直接大于一次发送的maxSize
              // 这种情况下最终还是会发送这个batch在一次请求
              break;
          } else {
              if (shouldStopDrainBatchesForPartition(first, tp))
                  break;
              boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false;
              ProducerIdAndEpoch producerIdAndEpoch =
                  transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
              ProducerBatch batch = deque.pollFirst();
              if (producerIdAndEpoch != null && !batch.hasSequence()) {
                  // If the batch already has an assigned sequence, then we should not change the producer id and
                  // sequence number, since this may introduce duplicates. In particular, the previous attempt
                  // may actually have been accepted, and if we change the producer id and sequence here, this
                  // attempt will also be accepted, causing a duplicate.
                  //
                  // Additionally, we update the next sequence number bound for the partition, and also have
                  // the transaction manager track the batch so as to ensure that sequence ordering is maintained
                  // even if we receive out of order responses.
                  batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                  transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                  log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                          "{} being sent to partition {}", producerIdAndEpoch.producerId,
                      producerIdAndEpoch.epoch, batch.baseSequence(), tp);
                  transactionManager.addInFlightBatch(batch);
              }
              // 添加batch,并且close
              batch.close();
              size += batch.records().sizeInBytes();
              ready.add(batch);
              batch.drained(now);
          }
      }
  } while (start != drainIndex);
  return ready;
}

总结

以上几个方法主要做了如下几件事:

  • 从RecordAccumulator中读取ProducerBatch,获取node列表,并将ProducerBatch与node建立对应关系;
  • 将ProducerBatch转换为ProducerRequest,再进一步转换为ClientRequest;
  • 调用NetWorkClient的send方法将消息发送出去;
相关文章
|
1月前
|
消息中间件 监控 大数据
Kafka消息队列架构与应用场景探讨:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Kafka的消息队列架构,包括Broker、Producer、Consumer、Topic和Partition等核心概念,以及消息生产和消费流程。此外,还介绍了Kafka在微服务、实时数据处理、数据管道和数据仓库等场景的应用。针对面试,文章解析了Kafka与传统消息队列的区别、实际项目挑战及解决方案,并展望了Kafka的未来发展趋势。附带Java Producer和Consumer的代码示例,帮助读者巩固技术理解,为面试做好准备。
52 0
|
1月前
|
存储 运维 关系型数据库
2024年最全ceph的功能组件和架构概述(2),Linux运维工程面试问题
2024年最全ceph的功能组件和架构概述(2),Linux运维工程面试问题
2024年最全ceph的功能组件和架构概述(2),Linux运维工程面试问题
|
1月前
|
缓存 自然语言处理 前端开发
第一章 引言-HTTP协议基础概念和前后端分离架构请求交互概述
第一章 引言-HTTP协议基础概念和前后端分离架构请求交互概述
|
8天前
|
消息中间件 负载均衡 Kafka
一文读懂Kafka API:Producer、Consumer和Streams全解析
大家好,今天我们将深入探讨Kafka的三大核心API。通过这篇文章,你将了解如何使用Producer API发布记录流,利用Consumer API订阅和处理数据,以及通过Streams API实现复杂的流处理。一起开启Kafka的探索之旅吧!
25 2
|
1天前
|
消息中间件 存储 SQL
RocketMQ与Kafka架构深度对比
RocketMQ与Kafka架构深度对比
8 0
|
27天前
|
消息中间件 Kafka Apache
Kafka 架构深入介绍 及搭建Filebeat+Kafka+ELK
Kafka 架构深入介绍 及搭建Filebeat+Kafka+ELK
|
1月前
|
消息中间件 缓存 Kafka
原理剖析| 一文搞懂 Kafka Producer(上)
本文介绍了Apache Kafka 3.7的Producer使用及原理,讲解了如何创建和使用Producer,展示了一个发送消息的示例代码,并介绍了ProducerRecord和Callback接口。ProducerRecord包含topic、partition等属性,Callback用于发送消息后的回调处理。接着阐述了send、flush和close方法的功能。文章还探讨了核心组件,包括ProducerMetadata、RecordAccumulator、Sender和TransactionManager,以及消息发送流程。最后,讨论了元数据刷新、分区选择、消息攒批和超时处理等实现细节。
35 0
原理剖析| 一文搞懂 Kafka Producer(上)
|
1月前
|
传感器 Java Android开发
Android HAL深入探索(1): 架构概述
Android HAL深入探索(1): 架构概述
190 1
|
1月前
|
消息中间件 安全 搜索推荐
概述软件架构的定义与分类
【5月更文挑战第8天】软件架构是指导大型软件系统设计的抽象模式集合,旨在简化复杂工程,通过模块化实现系统各方面的分工。
|
1月前
|
消息中间件 缓存 安全
Kafka 的生产者优秀架构设计
Kafka 的生产者优秀架构设计
38 0

热门文章

最新文章