生产者API文档
http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html.
版本说明
Kafka 0.10.0.0 及以后的版本,对生产者代码的底层实现进行了重构。
katka.producer.Producer类被org.apache.kafka.clients.producer.KafkaProducer类替换。
注意:在开发中,直接使用kafka的新版本API: org.apache.kafka.clients.producer.KafkaProducer作为生产者即可! 千万不要使用以前的katka.producer.Producer
同步和异步
Kafka 系统支持两种不同的发送方式–同步模式(Sync)和异步模式(ASync)
- 同步模式
- 异步模式
导入Maven的pom依赖
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> </dependencies> <build> <plugins> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
import org.apache.kafka.clients.producer.*; import java.util.Properties; /** * 演示使用kafka生产者向topic中发送消息 * kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning group.id=test_topic_group --topic test_topic * * @author Zsorrain */ public class MyKafkaProducer { public static void main(String[] args) throws Exception { //TODO 1.准备连接参数 Properties props = new Properties(); //指定kafka的broker地址 /* props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); == props.put("bootstrap.servers","node1:9092");这两个语句用哪个都行 因为ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 会自动调用bootstrap.servers 底层执行代码:public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; */ props.put("bootstrap.servers", "node1:9092"); //"node1:9092"需要修改为自己连接的服务器地址以及对应的Kafka端口号 //消息确认机制 //acks=0,意思就是我的KafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,我就不管他了,直接就认为这个消息发送成功了 //acks=1,意思就是说只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。 //acks=all/-1,意思就是说Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。 //all即所有副本都同步到数据时send方法才返回, 以此来完全判断数据是否发送成功, 理论上来讲数据不会丢失 props.put("acks", "all"); //retries和retries.backoff.ms决定了重试机制,也就是如果一个请求失败了可以重试几次,每次重试的间隔是多少毫秒 props.put("retries", 0); props.put("retries.backoff.ms", 20); //buffer.memory //Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲的, //也就是说,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高。 //buffer.memory的本质就是用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB,满了就会阻塞用户线程,不让继续往Kafka写消息了。 props.put("buffer.memory", 33554432); // batch.size是producer批量发送的基本单位,默认是16384Bytes,即16kB;KafkaProducer的Sender线程会把Batch打包成Request发送到Kafka服务器上去 props.put("batch.size", 16384); // lingger.ms是指一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。 //举个例子,首先假设你的Batch是16KB,那么你得估算一下,正常情况下,一般多久会凑够16KB(1个Batch),比如正常来说可能20ms就会凑够一个Batch。 //那么你的linger.ms可以设置为25ms,也就是说,正常来说,大部分的Batch在20ms内都会凑满,但是你的linger.ms可以保证,哪怕遇到低峰时期,20ms凑不满一个Batch,还是会在25ms之后强制Batch发送出去。 //那么producer是按照batch.size大小批量发送消息呢,还是按照linger.ms的时间间隔批量发送消息呢?--满足batch.size和ling.ms之一,producer便开始发送消息 props.put("linger.ms", 25); //这个参数决定了每次发送给Kafka服务器请求的最大大小,同时也会限制你一条消息的最大大小也不能超过这个参数设置的值 props.put("max.request.size", 163840); //k-v序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //TODO 2.根据参数创建producer生产者连接对象 Producer<String, String> producer = new KafkaProducer<String, String>(props); //TODO 3.异步发送 for (int i = 0; i < 10; i++) {//循环发送10条消息到Kafka //将需要发送到Kafka的消息封装为record对象 ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key_" + i, "value_" + i); //异步发送消息,传入需要发送的record,和该record真正发送成功后的需要执行回调函数 producer.send(record, new Callback() { //Callback是一个接口,使用匿名内部类重写Callback类中的onCompletion方法 //onCompletion方法会在record真正发送成功后执行 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata != null) { //异步发送,record真正发送成功后才会执行该方法,所以可以在该方法里面获取到metadata System.out.println("异步发送后获得分区为 :" + metadata.partition() + " ,同步发送后获得offset为 :" + metadata.offset()); } } }); } System.out.println("这条消息在异步发送代码最后,但是最先被打印,说明异步消息在会返回结果后在执行发送信息"); //TODO 3.同步发送 for (int i = 10; i < 20; i++) {//循环发送10条消息到Kafka //将需要发送到kafka的消息封装到record对象 ProducerRecord<String, String> record = new ProducerRecord<>("order", "key_" + i, "value_" + i); //同步发送消息,并返回消息的元数据,如消息发送到哪个partition(分区)了,offset(偏移量)是多少 RecordMetadata metadata = producer.send(record).get(); System.out.println("同步发送后获得分区为 :" + metadata.partition() + " ,同步发送后获得offset为 :" + metadata.offset()); } System.out.println("这条消息在同步发送代码最后,最后被打印,说明同步同步消息执行发送消息完返回成功结果"); //TODO 4.关闭资源 producer.close(); } }