异步发送
普通异步发送
需求:创建Kafka生产者,采用异步的方式发送到Kafka broker
异步发送流程
Code
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.0</version> </dependency>
package com.artisan.pc; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1. 创建kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给kafka配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092"); // key,value序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 3. 创建kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 4. 调用send方法,发送消息 for (int i = 0; i < 10; i++) { RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get(); System.out.println(art.offset()); System.out.println("over - " + i); } // 5. 关闭资源 kafkaProducer.close(); } }
输出
31 over - 0 32 over - 1 33 over - 2 34 over - 3 35 over - 4 36 over - 5 37 over - 6 38 over - 7 39 over - 8 40 over - 9
忽略我这个offset … 我都发了好多次了…
看控制台的吧
带回调函数的异步发送
回调函数callback()会在producer收到ack时调用,为异步调用。
该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。
- 如果Exception为null,说明消息发送成功,
- 如果Exception不为null,说明消息发送失败
带回调函数的异步发送流程
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
Code
package com.artisan.pc; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class CustomProducerWithCallBack { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1. 创建kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给kafka配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092"); // key,value序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 3. 创建kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 4. 调用send方法,发送消息 for (int i = 0; i < 10; i++) { // 添加回调 // 该方法在Producer收到ack时调用,为异步调用 kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> { // 没有异常,输出信息到控制台 System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset()); }); } // 5. 关闭资源 kafkaProducer.close(); } }
控制台
同步发送API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
package com.artisan.pc; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class CustomProducerSync { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1. 创建kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给kafka配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092"); // key,value序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 3. 创建kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 4. 调用send方法,发送消息 for (int i = 0; i < 10; i++) { // 通过Future接口的get实现同步阻塞 kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ; } // 5. 关闭资源 kafkaProducer.close(); } }