这是我今天面试被问的一个问题,这会回来就来看看源码
先看KafkaProducer的构造函数
public KafkaProducer(Map<String, Object> configs) {
this((ProducerConfig)(new ProducerConfig(configs)), (Serializer)null, (Serializer)null);
}
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer);
}
public KafkaProducer(Properties properties) {
this((ProducerConfig)(new ProducerConfig(properties)), (Serializer)null, (Serializer)null);
}
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), keySerializer, valueSerializer);
}
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
如果我们使用KafkaProducer
我们就看send方法是不是安全
RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, (Callback)interceptCallback, remainingWaitMs);
再看下accumulator 的主要属性
从ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
获取Deque,通过synchronized 进行发送,好了我们知道KafkaProducer是线程安全的,但是KafkaTemplate是线程安全的吗?
双重检测 +volatile
private volatile DefaultKafkaProducerFactory.CloseSafeProducer<K, V> producer;
单例模式,也是线程安全的,官网的话
A template for executing high-level operations. When used with a DefaultKafkaProducerFactory, the template is thread-safe. The producer factory and KafkaProducer ensure this; refer to their respective javadocs.
KafkaTemplate 需要和DefaultKafkaProducerFactory一起使用