- 整个 Kafka 客户端由两个线程协调运行,即Main线程和Sender线程。
- 在Main线程中由KafkaProducer创建消息,然后通过Interceptor、Serializer和Partitioner之后缓存到RecordAccumulator(消息累加器)中。
- Sender线程 负责从RecordAccumulator中获取消息并发送到Kafka中。
- RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
- RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory进行配置,默认值是32MB。如果生产者发送消息的速度超过了发送到客户端的速度,则会导致生产者空间不足,此时KafkaProducer send()方法的调用要么会被阻塞,要么抛出异常。
- KafkaProducer发送消息的速度可以有参数max.block.ms进行配置,此参数默认值为60秒。
- Main线程发送过来的消息会被追加到RecordAccumulator的Deque(双端队列)中,在RecordAccumulator的内部每个Partition都维护了一个Deque,Deque中的内容就是ProducerBatch,即:Deque。
- 消息被写入缓存时,会被追加到Deque的尾部。Sender读取消息时,会从Deque的头部进行读取。
- ProducerBatch中可以包含一到多个ProducerRecord(生产者创建的消息),这样可以使字节的使用更加紧凑。同时,将娇小的ProducerRecord拼成一个较大的ProducerBatch也可以减少网络请求的次数以提高整体的吞吐量。
- 如果生产者需要向多个分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。
- 消息在网络上都是以字节进行传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中通过java.io.ByteBuffer实现消息的创建和释放,不过频繁的创建和释放比较消耗资源,在RecordAccumulator的内部还有一个BufferPool,它主要用来试验ByteBuffer的复用,已实现缓存的高效利用。
- 但是BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会进入BufferPool。此特定值的大小可以通过参数batch.size进行配置以实现缓存不同大小的消息。
- 当一条消息ProducerRecord进入RecordAccumulator中时,会先寻找与消息分区所对应的的Deque(如果没有则新创建),在从这个Deque的尾部获取一个ProducerBatch(如果没有则新创建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,否则需要创建一个新的ProducerBatch。
- 在新建ProducerBatch时需要评估这条消息的大小是否超过batch.size,如果不超过,就以batch.size的大小来创建这个ProducerBatch,这样在使用完后还可以通过BufferPool的管理进行复用。若果超过,则以消息的大小来创建ProducerBatch,此内存区域不会被复用。
- Sender从RecordAccumulator中获取缓存的消息后,会进一步将原本<TopicPartition, Deque>的保存形式进一步转换为<Node,List>的形式,其中Node表示Kafka集群中的Broker节点。
- 对于网络连接来说,生产者客户端与具体的Broker节点建立连接,也就是向具体的Broker节点发送消息,而并不关心消息属于哪个分区;对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。
- 在转换成<Node,List>的形式之后,Sender还会进一步封装成<Node,List>的形式,这样就可以将Request请求发送到各个Node。
- 请求从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式是Map<NodeId, Deque>,其主要作用是缓存已经发出去但还没有收到响应的请求。与此同时,InFlightRequests还提供了趣多管理类的方法,并且通过配置参数还可以限制每个连接(即客户端与Node之间的连接)最多缓存的请求数。此参数为max.in.flight.requests.per.connection,默认值是5。超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应。
- 通过比较Deque的size与配置的最大连接数可以判断对应的node是否已经堆积了很多未响应的请求。如果已有较大未响应请求的堆积,那么说明这个Node节点负载较大或者网络连接有问题,再继续向其发送请求会增大请求超时的可能。
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
/** * Add a record to the accumulator, return the append result * <p> * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created * <p> * * @param tp The topic/partition to which this record is being sent * @param timestamp The timestamp of the record * @param key The key for the record * @param value The value for the record * @param headers the Headers for the record * @param callback The user-supplied callback to execute when the request is complete * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available */ public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { // check if we have an in-progress batch // 其实就是一个putIfAbsent操作的方法,不展开分析 Deque<ProducerBatch> dq = getOrCreateDeque(tp); // batches是线程安全的,但是Deque不是线程安全的 // 已有在处理中的batch synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } // we don't have an in-progress record batch try to allocate a new batch // 创建一个新的ProducerBatch byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); // 分配一个内存 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); // 申请不到内存 buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // 再次尝试添加,因为分配内存的那段代码并不在synchronized块中 // 有可能这时候其他线程已经创建好RecordBatch了,finally会把分配好的内存还回去 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // 作者自己都说了,希望不要总是发生,多个线程都去申请内存,到时候还不是要还回去? // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } // 创建ProducerBatch MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); // incomplete是一个Set集合,存放不完整的batch incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; // 返回记录添加结果类 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { // 释放要还的内存 if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
/** * Try to append to a ProducerBatch. * * If it is full, we return null and a new batch is created. We also close the batch for record appends to free up * resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written * and memory records built) in one of the following cases (whichever comes first): right before send, * if it is expired, or when the producer is closed. */ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) { // 获取最新加入的ProducerBatch ProducerBatch last = deque.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()); if (future == null) last.closeForRecordAppends(); else // 记录添加结果类包含future、batch是否已满的标记、是否是新batch创建的标记 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false); } // 如果这个Deque没有ProducerBatch元素,或者已经满了不足以加入本条消息则返回null return null; }