Kafka生产者有三种方式进行消息的发送,这三种方式区别在于对于消息是否正常到达的处理。视频讲解如下:
下面分别介绍生产者的这三种消息发送方式。
第一种:fire-and-forget
该方式把消息发送给Kafka的Broker之后不关心其是否正常到达。在大多数情况下消息会正常到达,即使出错了生产者也会自动重试。但这种方式可能造成Kafka Broker没有接收到生产者的消息。因此这种方式适用于允许消息的丢失、并对吞吐量要求大的场景,比如用户点击日志上传。代码如下:
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerDemo { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 发送方式:fire-and-forget Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("mytopic1", "key" + i, "value" + i)); Thread.sleep(1000); } producer.close(); } }
第二种:同步发送
生产者使用send方法发送一条消息,该方法会返回一个Future对象。调用该对象的get方法可以阻塞当前线程并等待返回。这种方式适用对消息可靠性要求高的场景,比如支付的场景。在这种场景下要求消息不可丢失,如果丢失了将回滚相关的业务操作。代码如下:
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class KafkaProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 发送方式:同步发送 Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 10; i++) { RecordMetadata metadata = producer.send(new ProducerRecord<String, String> ("mytopic1", "key" + i, "value" + i)).get(); System.out.println("同步消息发送成功:" + i); } producer.close(); } }
第三种:异步发送
生产者使用send方法发送一条消息时指定回调函数,在Kafka Broker返回结果时调用。这个回调函数可以进行错误日志的记录或者重试。这种方式牺牲了一部分可靠性,但是吞吐量会比同步发送高很多。代码如下:
import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class KafkaProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 发送方式:异步发送 Producer<String, String> producer = new KafkaProducer(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("mytopic1", "key" + i, "value" + i),new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // 回调函数 if(exception != null) { System.out.println("消息异步发送出现错误!!!"); exception.printStackTrace(); } else { System.out.println("消息异步发送成功。 " + "Topic: " + metadata.topic() + ", Partition: " + metadata.partition() + ", Offset: " + metadata.offset()); } } }); } producer.close(); } }