Sender
Sender里最重要的方法莫过于run()方法,其中比较核心的方法是org.apache.kafka.clients.producer.internals.Sender#sendProducerData
「其中pollTimeout需要认真读注释,意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回0则表示走起发车了」
private long sendProducerData(long now) { // 获取当前集群的所有信息 Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send // @return ReadyCheckResult类的三个变量解释 // 1.Set<Node> readyNodes 准备好发送的节点 // 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间 // 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update // 如果有些topic不知道leader信息,更新metadata if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // 去除不能发送信息的节点 // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // 获取将要发送的消息 // create produce requests Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); // 保证发送消息的顺序 if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } // 过期的batch List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); boolean needsTransactionStateReset = false; // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why // we need to reset the producer id here. if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException()); if (transactionManager != null && expiredBatch.inRetry()) { needsTransactionStateReset = true; } this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); } if (needsTransactionStateReset) { transactionManager.resetProducerId(); return 0; } sensors.updateProduceRequestMetrics(batches); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. // 1.The amount of time to block if there is nothing to do // 2.waiting for a channel to become ready; if zero, block indefinitely; long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; pollTimeout = 0; } // 发送消息 // 最后调用client.send() 把ProducerBatch转换为对应的ProduceRequest,并调用NetworkClient将消息写入网络发送出去 sendProduceRequests(batches, now); return pollTimeout; }
sendProduceRequests()详解
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) { if (batches.isEmpty()) return; Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); // find the minimum magic version used when creating the record sets byte minUsedMagic = apiVersions.maxUsableProduceMagic(); for (ProducerBatch batch : batches) { if (batch.magic() < minUsedMagic) minUsedMagic = batch.magic(); } for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); // down convert if necessary to the minimum magic used. In general, there can be a delay between the time // that the producer starts building the batch and the time that we send the request, and we may have // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use // the new message format, but found that the broker didn't support it, so we need to down-convert on the // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may // not all support the same message format version. For example, if a partition migrates from a broker // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); produceRecordsByPartition.put(tp, records); recordsByPartition.put(tp, batch); } String transactionalId = null; if (transactionManager != null && transactionManager.isTransactional()) { transactionalId = transactionManager.transactionalId(); } // 将ProducerBatch转换为ProduceRequest ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout, produceRecordsByPartition, transactionalId); RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; String nodeId = Integer.toString(destination); // 将ProduceRequest转换为clientRequest ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback); // 调用NetworkClient将消息写入网络发送出去 client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); }
其中也需要了解这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#ready。返回的类中3个关键参数的解释都在注释里。
/** * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated * partition batches. * <p> * A destination node is ready to send data if: * <ol> * <li>There is at least one partition that is not backing off its send * <li><b>and</b> those partitions are not muted (to prevent reordering if * {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION} * is set to one)</li> * <li><b>and <i>any</i></b> of the following are true</li> * <ul> * <li>The record set is full</li> * <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li> * <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions * are immediately considered ready).</li> * <li>The accumulator has been closed</li> * </ul> * </ol> */ /** * @return ReadyCheckResult类的三个变量解释 * 1.Set<Node> readyNodes 准备好发送的节点 * 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间 * 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点 * * 一个节点满足以下任一条件则表示可以发送数据 * 1.batch满了 * 2.batch没满,但是等了lingerMs的时间 * 3.accumulator满了 * 4.accumulator关了 */ public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>(); boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<ProducerBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); synchronized (deque) { // leader没有且队列非空则添加unknownLeaderTopics if (leader == null && !deque.isEmpty()) { // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); // 如果readyNodes不包含leader且muted不包含part // mute这个变量跟producer端的一个配置有关系:max.in.flight.requests.per.connection=1 // 主要防止topic同分区下的消息乱序问题,限制了producer在单个broker连接上能够发送的未响应请求的数量 // 如果设置为1,则producer在收到响应之前无法再给该broker发送该topic的PRODUCE请求 } else if (!readyNodes.contains(leader) && !muted.contains(part)) { ProducerBatch batch = deque.peekFirst(); if (batch != null) { long waitedTimeMs = batch.waitedTimeMs(nowMs); boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; // 等待时间 long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; // batch满了 boolean full = deque.size() > 1 || batch.isFull(); // batch过期 boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. // 目前还没有leader,下次重试 nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
还有一个方法就是org.apache.kafka.clients.producer.internals.RecordAccumulator#drain,从accumulator缓冲区获取要发送的数据,最大一次性发max.request.size大小的数据(最上面的配置参数里有):
/** * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. * * @param cluster The current cluster metadata * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain * maxSize也就是producer端配置参数max.request.size来控制的,一次最多发多少 * @param now The current unix time in milliseconds * @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize. */ public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer, List<ProducerBatch>> batches = new HashMap<>(); for (Node node : nodes) { // for循环获取要发的batch List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now); batches.put(node.id(), ready); } return batches; } private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) { int size = 0; // 获取node的partition List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<ProducerBatch> ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ // 避免每次都从一个partition取,要雨露均沾 int start = drainIndex = drainIndex % parts.size(); do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); this.drainIndex = (this.drainIndex + 1) % parts.size(); // Only proceed if the partition has no in-flight batches. if (isMuted(tp, now)) continue; Deque<ProducerBatch> deque = getDeque(tp); if (deque == null) continue; // 加锁,不用说了吧 synchronized (deque) { // invariant: !isMuted(tp,now) && deque != null ProducerBatch first = deque.peekFirst(); if (first == null) continue; // first != null // 查看是否在backoff期间 boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // Only drain the batch if it is not during backoff period. if (backoff) continue; // 超过maxSize且ready里有东西 if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due to // compression; in this case we will still eventually send this batch in a single request // 有一种特殊的情况,batch的大小超过了maxSize,且batch是空的。也就是一个batch大小直接大于一次发送的maxSize // 这种情况下最终还是会发送这个batch在一次请求 break; } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false; ProducerIdAndEpoch producerIdAndEpoch = transactionManager != null ? transactionManager.producerIdAndEpoch() : null; ProducerBatch batch = deque.pollFirst(); if (producerIdAndEpoch != null && !batch.hasSequence()) { // If the batch already has an assigned sequence, then we should not change the producer id and // sequence number, since this may introduce duplicates. In particular, the previous attempt // may actually have been accepted, and if we change the producer id and sequence here, this // attempt will also be accepted, causing a duplicate. // // Additionally, we update the next sequence number bound for the partition, and also have // the transaction manager track the batch so as to ensure that sequence ordering is maintained // even if we receive out of order responses. batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + "{} being sent to partition {}", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, batch.baseSequence(), tp); transactionManager.addInFlightBatch(batch); } // 添加batch,并且close batch.close(); size += batch.records().sizeInBytes(); ready.add(batch); batch.drained(now); } } } while (start != drainIndex); return ready; }
总结
以上几个方法主要做了如下几件事:
- 从RecordAccumulator中读取ProducerBatch,获取node列表,并将ProducerBatch与node建立对应关系;
- 将ProducerBatch转换为ProducerRequest,再进一步转换为ClientRequest;
- 调用NetWorkClient的send方法将消息发送出去;