【赵渝强老师】Kafka生产者的消息发送方式

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: Kafka生产者支持三种消息发送方式:1. **fire-and-forget**:发送后不关心结果,适用于允许消息丢失的场景;2. **同步发送**:通过Future对象确保消息成功送达,适用于高可靠性需求场景;3. **异步发送**:使用回调函数处理结果,吞吐量较高但牺牲部分可靠性。视频和代码示例详细讲解了这三种方式的具体实现。

b206.png

Kafka生产者有三种方式进行消息的发送,这三种方式区别在于对于消息是否正常到达的处理。视频讲解如下:


下面分别介绍生产者的这三种消息发送方式。


第一种:fire-and-forget


该方式把消息发送给Kafka的Broker之后不关心其是否正常到达。在大多数情况下消息会正常到达,即使出错了生产者也会自动重试。但这种方式可能造成Kafka Broker没有接收到生产者的消息。因此这种方式适用于允许消息的丢失、并对吞吐量要求大的场景,比如用户点击日志上传。代码如下:


import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerDemo {
  public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
        "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
        "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
        "org.apache.kafka.common.serialization.StringSerializer");
    // 发送方式:fire-and-forget
    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    for (int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<String, String>("mytopic1", "key" + i, "value" + i));
      Thread.sleep(1000);
    }
    producer.close();
  }
}


第二种:同步发送


生产者使用send方法发送一条消息,该方法会返回一个Future对象。调用该对象的get方法可以阻塞当前线程并等待返回。这种方式适用对消息可靠性要求高的场景,比如支付的场景。在这种场景下要求消息不可丢失,如果丢失了将回滚相关的业务操作。代码如下:


import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaProducerDemo {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
        "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
        "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
        "org.apache.kafka.common.serialization.StringSerializer");
    // 发送方式:同步发送
    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    for (int i = 0; i < 10; i++) {
      RecordMetadata metadata = 
          producer.send(new ProducerRecord<String, String>
                       ("mytopic1", "key" + i, "value" + i)).get();
      System.out.println("同步消息发送成功:" + i);
    }
    producer.close();
  }
}


第三种:异步发送


生产者使用send方法发送一条消息时指定回调函数,在Kafka Broker返回结果时调用。这个回调函数可以进行错误日志的记录或者重试。这种方式牺牲了一部分可靠性,但是吞吐量会比同步发送高很多。代码如下:


import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaProducerDemo {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
        "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
        "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
        "org.apache.kafka.common.serialization.StringSerializer");
    // 发送方式:异步发送
    Producer<String, String> producer = new KafkaProducer(props);
    for (int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<String, String>("mytopic1", "key" + i, "value" + i),new Callback() {
        
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
          // 回调函数
                  if(exception != null) {
                      System.out.println("消息异步发送出现错误!!!");
                      exception.printStackTrace();
                  } else {
                      System.out.println("消息异步发送成功。 " +
                             "Topic: " + metadata.topic() + 
                             ", Partition: " + metadata.partition() +
                             ", Offset: " + metadata.offset());
                  }
        }
      });
    }
    producer.close();
  }
}


目录
打赏
0
5
5
1
179
分享
相关文章
【赵渝强老师】Kafka生产者的执行过程
Kafka生产者(Producer)将消息序列化后发送到指定主题的分区。整个过程由主线程和Sender线程协调完成。主线程创建KafkaProducer对象及ProducerRecord,经过拦截器、序列化器和分区器处理后,消息进入累加器。Sender线程负责从累加器获取消息并发送至KafkaBroker,Broker返回响应或错误信息,生产者根据反馈决定是否重发。视频和图片详细展示了这一流程。
129 61
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
42 10
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
【赵渝强老师】Kafka的主题与分区
Kafka 中的消息按主题分类,生产者发送消息到特定主题,消费者订阅主题消费。主题可分多个分区,每个分区仅属一个主题。消息追加到分区时,Broker 分配唯一偏移量地址,确保消息在分区内的顺序性。Kafka 保证分区有序而非主题有序。示例中,Topic A 有 3 个分区,分区可分布于不同 Broker 上,支持负载均衡和容错。视频讲解及图示详见原文。
115 2
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
148 2
【赵渝强老师】Kafka分区的副本机制
在Kafka中,每个主题可有多个分区,每个分区有多个副本。其中仅有一个副本为Leader,负责对外服务,其余为Follower。当Leader所在Broker宕机时,Follower可被选为新的Leader,实现高可用。文中附有示意图及视频讲解。
173 0
【赵渝强老师】Kafka的体系架构
Kafka消息系统是一个分布式系统,包含生产者、消费者、Broker和ZooKeeper。生产者将消息发送到Broker,消费者从Broker中拉取消息并处理。主题按分区存储,每个分区有唯一的偏移量地址,确保消息顺序。Kafka支持负载均衡和容错。视频讲解和术语表进一步帮助理解。
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
87 1
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
210 0