kafka producer所有实现的接口:
java.io.Closeable, java.lang.AutoCloseable, Producer<K,V>
生产者用于向kafka集群发送消息。
生产者是线程安全的,通常在多线程中使用一个生产者实例比使用多个实例更快。
下面是一个简单的例子:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
生产者持有一个缓存区,用来缓存尚未发送至服务端的数据,同时启动一个后台I/O线程用来发送这些消息。使用后没有成功关闭生产者可能导致缓存区域的资源泄露。
send()方法是异步的。当被调用时,它将消息添加到待发缓存区并立即返回。这使得生产者能够批量的发送消息。
acks配置项控制着发送请求的完成标准。"all" 配置会导致请求阻塞,,需等待本次请求的所有消息返回成功确认。这种是最慢的也是最可靠的设置。
如果请求失败,生产者会自动重试,如果把retries设置为0,就不会重试。若果启用retries,有可能会导致数据重复。
生产者为每一个partition分别维护各自的消息缓冲区(buffer)。这些buffer的大小通过batch.size配置项设定。调大这个参数,能够减少处理批次,但也需要更多的内存(因为每个分区通常都有一个buffer)。
不管buffer有没有剩余的空间,默认情况下,其中的数据都能立刻发送出去。如果你想要减少发送请求数量,可以通过设置linger.ms 的值大于0。生产者会等待设置的时间后再发送下一次请求,这样可能使更多的消息填充到同一个batch。这类似于TCP协议中的纳格算法。例如上面的代码片段中,在设置linger.ms=1后,总共有100条消息会在一次请求中发送。这样会增加1ms的请求等待延时,使buffer未满的情况下接受更多的消息。注意,如果消息接连过来也会被放在一个batch处理。所以在繁忙的系统中,即便设置了linger.ms=0,也会发生批处理。然而将此设置成大于0的值,可以在繁忙的系统中,以相对较小的时延,使请求数更少并且更高效。
Buffer.memory 控制着生产者所能使用的缓冲区的最大内存。如果消息发送比传输到服务器的速度快,内存可能会被耗尽。当缓冲区空间被耗尽,send 方法将会被阻塞。阻塞时间由max.block.ms 控制,达到最大阻塞时间,send方法抛出TimeoutException。
key.serializer 和 value.serializer配置表明怎样把用户提供给ProducerRecord的key和value对象转化为字节。你可以对简单的string和字节类型数据使用ByteArraySerializer或StringSerializer进行序列化。
从kafka0.11开始,kafka生产者支持两种额外的模式:幂等生产者与事务型生产者。幂等生产者使kafka的至少发送一次(at least once)提升到了精确发送一次(exactly once)。生产者的retries 设置 将不再造成数据重复。事务型生产者允许往多个分区(和多个topic)中发送数据。
开启幂等生产者,需将enable.idempotence设置为true。这时retries会被默认为Integer.MAX_VALUE,acks默认设为all。幂等生产者的API并没有改变,所以现有的应用不用修改即可使用此特性。
为了发挥幂等生产者的优点,避免应用层重复发送消息是很必要的。同样的,如果应用支持幂等性,推荐不设置retires,使用默认设置即Integer.MAX_VALUE。此外,即便在无限的重试次数下,send方法依然返回error(比如buffer中的消息在传输以前过期),推荐的的做法是关闭生产者,检查最后一次发送消息的内容以确保没有重复。最后幂等生产者只能在一个session中保证消息的幂等性。
如果要使用事务性生产者和其附属API,你必须正确的设置transactional.id。一旦设置了此配置,幂等性配置也会被自动开启。此外replication.factor至少设置为3,并且min.insync.replicas应该设置为2。最后为了确保事务端到端的实现,消费者必须设置成只能消费提交确认的消息。
设置transactional.id的目的是在一个生产者实例中回复多个传输session。
所有新的事务型API都是阻塞的,可能会抛出失败。下面的例子演示了使用新版API的方,与上面的代码类似,除了所有100条消息会在一个事务中处理:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
如上面的代码所示,在一个生产者中只能有一个打开的事务。所有在producer.beginTransaction()与producer.commitTransaction();之间被发送的消息都会在一个事务中。当transactional.id被指定,被这个生产者发送的所有消息必须在一个事务中。
事务型生产者使用异常传递错误状态。特别的,没有必要在producer.send方法中使用回掉函数或在发挥出调用.get()函数。因为如果在producer.send()或者事务调用中出不可恢复的错误将会抛出一个异常。
在接收到kafkaexception时,调用producer.abortTransaction();来确保成功写入的消息被标记为遗弃(abort),以此来保证事务。
在0.10.0或更新的版本中,客户端可以直接跟broker进行交互。老的broker可能不支持一些新的特性。比如事务型API需要0.11.0或更新的broker版本。当在运行的broker上使用不支持的API时,你会收到UnsupportedVersionException异常。