Kafka Template–2.2.0 api
KafkaTemplate
KafkaTemplate
这个类包装了个生产者,来提供方便的发送数据到kafka的topic里面。
同步和异步的方法都有,异步方法返回一个Future
。
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
-
sendDefault
这个方法,需要提供一个默认的topic。 - API使用一个
timestamp
作为一个参数,这个时间戳奖杯存储在record里面。 - 用户的时间戳是否被存储,取决于在kafka的topic里面配置的时间戳类型。如果topic被配置成了
CREATE_TIME
,则将使用用户指定的时间戳,如果没指定,将自动生成。如果设置为了LOG_APPEND_TIME
那么用户指定的时间戳将会被忽略,使用broker的local time. -
metrics
方法和partitionsFor
方法,委托给一些潜在的生产者;而execute
方法则是提供了直接访问潜在生产者的途径。
为了使用这个template,配置个工厂类,并将这个工厂类提供给templage类的构造方法。
举个栗子:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
- 这个template可以使用spring方式进行配置。
-
当使用一个
Message<?>
参数的时候,需要为这个消息的头部,提供topic,partition,key信息。- KafkaHeaders.TOPIC
- KafkaHeaders.PARTITION_ID
- KafkaHeaders.MESSAGE_KEY
- KafkaHeaders.TIMESTAMP
这些信息将被装在到数据里面。
你可以配置给
KafkaTemplate
配置个ProducerListener
来监听异步的回调结果,用这种方式替代使用Future
。
public interface ProducerListener<K, V> {
void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
void onError(String topic, Integer partition, K key, V value, Exception exception);
boolean isInterestedInSuccess();
}
- template默认配置
LoggingProducerListener
这个类作为监听类用来记录错误,但是当发送成功时候,却不做任何操作。 - 为了方便使用,提供了抽象类
ProducerListenerAdapter
,你可以只实现它一两个方法,它为isInterestedInSuccess
方法返回false - 观察template类的发送方法,发现它返回一个
ListenableFuture<SendResult>
。你可以使用一个监听类来注册一个回调,用来接收异步发送返回的结果。
ListenableFuture<SendResult<Integer, String>> future = template.send("foo");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
-
SendResult
这个类有两个属性,ProducerRecord
和RecordMetadata
;具体参考kafka的API - 如果你想要阻塞发送线程(批量发送),为了等待结果,你可以调用future的
get()
方法;另外,在等待的过程中,你可能想要调用flush()
方法,为了简便,template
类提供了一个autoFlush
,这个属性将会在每次send的时候,立即去flush掉sending thread.不过autoFlush将会明显降低性能(就是说最好不要一条条的发送,要批量发送)。
原文地址:http://docs.spring.io/spring-kafka/docs/2.0.0.M2/reference/htmlsingle/#_sending_messages