1.生产者代码
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; public class Producer { private static final String TOPIC = "test1115"; private static final String BROKER_LIST = "localhost:9092"; private static KafkaProducer<String, String> producer = null; static { Properties configs = initConfig();//初始化参数 producer = new KafkaProducer<String, String>(configs);//生产者实例化 } private static Properties initConfig() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); //序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } public static void main(String[] args) { try { String message = "hello world"; while (true) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "0", message); producer.send(record); System.out.println(1); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }
2.kafka生产者原理
客户端把消息放入队列中,然后由一个消息发送线程从队列中拉取消息,以批量的方式发送消息给服务端。
RecordAccumulator----记录收集器(缓存生产者客户端产生的消息)
Sender线程----读取记录收集器的批量信息,通过网络发送给服务端
Selector----选择器(处理网络连接和读写处理)
NetworkClient-----处理客户端网络请求
流程分析:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//拦截处理 return this.doSend(interceptedRecord, callback); }
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { this.throwIfProducerClosed(); KafkaProducer.ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);//获取metadata元数据 } catch (KafkaException var19) { if (this.metadata.isClosed()) { throw new KafkaException("Producer closed while send in progress", var19); } throw var19; } long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; //序列化相关,key、value byte[] serializedKey; try { serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException var18) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var18); } byte[] serializedValue; try { serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException var17) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var17); } int partition = this.partition(record, serializedKey, serializedValue, cluster);//选择合适分区 tp = new TopicPartition(record.topic(), partition); this.setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers); this.ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp(); this.log.trace("Sending record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition}); Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp); if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.maybeAddPartitionToTransaction(tp); } //将数据追加到消息缓存中 RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup();//将数据发送出去 } return result.future; } catch (ApiException var20) { this.log.debug("Exception occurred during message send:", var20); if (callback != null) { callback.onCompletion((RecordMetadata)null, var20); } this.errors.record(); this.interceptors.onSendError(record, tp, var20); return new KafkaProducer.FutureFailure(var20); } catch (InterruptedException var21) { this.errors.record(); this.interceptors.onSendError(record, tp, var21); throw new InterruptException(var21); } catch (BufferExhaustedException var22) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, var22); throw var22; } catch (KafkaException var23) { this.errors.record(); this.interceptors.onSendError(record, tp, var23); throw var23; } catch (Exception var24) { this.interceptors.onSendError(record, tp, var24); throw var24; } }
参考:
kafka技术内幕
kafka源码分析
✨新鲜出炉~ Kafka训练营课件链接:
链接:https://pan.baidu.com/s/1sPdmkDM3P74BfTc9VchZbw
提取码:1beu