kafka生产者异步和同步发送API

简介: kafka的生产者异步和同步发送API

异步发送API

  1. 导入依赖
<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
</dependencies>
  1. 编写代码

需要注意:

  • KafkaProducer:需要创建一个生产者对象,用来发送数据
  • ProducerConfig:获取所需的一系列配置参数
  • ProducerRecord:每条数据都要封装成一个ProducerRecord对象
  • 回调函数会在producer收到ack时调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        //1.创建kafka生产者的配置对象
        Properties properties = new Properties();
        //2.给kafka配置对象添加配置信息
        properties.put("bootstrap.servers","hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //批次处理
        properties.put("batch.size",16384);
        //等待时间(默认0,现在kafka作为事件流平台,这个值就不用设置了,来一条刷写一条)
        properties.put("linger.ms",1);
        //RecordAccumulator(线程共享变量)缓冲区大小,默认32M
        properties.put("buffer.memory",33554432);
        //key,value的序列化(必须要进行的)
        properties.put("key.zerializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置ack
        properties.put("acks", "all");
         // 重试次数
        properties.put("retries", 3);
        //3.创建kafka的生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //4.调用send,发送信息
        for (int i = 0; i < 10 ; i++) {
            //添加回调(回调函数可以方便看到消息发送的主题,分区和偏移量)
            kafkaProducer.send(new ProducerRecord<>("producer", "kafka" + i), new Callback() {
                //该方法在收到ack时异步调用
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e == null){
                        //如果没有异常,输出信息到控制台
                        System.out.println("success->"+ recordMetadata.offset());
                    }else{
                        //出现异常打印
                        e.printStackTrace();
                    }

                }
            });
        }
        //5.关闭资源,(关闭资源的时候会自动将数据刷写到消费者)
        kafkaProducer.close();
    }
}

同步发送API
同步发送的意思是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息
        properties.put("bootstrap.servers","hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // key,value序列化(必须要进行的)
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          // 设置ack
        properties.put("acks", "all");
        // 重试次数
        properties.put("retries", 3);
        // 批次大小 默认16K
        properties.put("batch.size", 16384);
        // 等待时间
        properties.put("linger.ms", 1);
        // RecordAccumulator缓冲区大小 默认32M
        properties.put("buffer.memory", 33554432);
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            //不带回调函数.
            // 异步发送 默认
           //kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
            // 同步发送,java中的方法处理同步
            kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}
相关文章
|
2月前
|
缓存 边缘计算 监控
89_批量推理:异步API调用
在当今数据密集型应用和大模型部署的时代,批量推理已成为提升系统性能和资源利用率的关键技术。随着深度学习模型规模的不断扩大和应用场景的日益复杂,如何高效地处理大量推理请求成为技术团队面临的重要挑战。传统的同步API调用方式在面对高并发、大规模数据处理时,往往会遇到响应延迟高、资源利用不充分等问题。异步API调用作为一种更高效的处理模式,通过非阻塞操作和并发处理能力,为批量推理场景提供了理想的解决方案。
|
7月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
345 16
|
10月前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的执行过程
Kafka生产者(Producer)将消息序列化后发送到指定主题的分区。整个过程由主线程和Sender线程协调完成。主线程创建KafkaProducer对象及ProducerRecord,经过拦截器、序列化器和分区器处理后,消息进入累加器。Sender线程负责从累加器获取消息并发送至KafkaBroker,Broker返回响应或错误信息,生产者根据反馈决定是否重发。视频和图片详细展示了这一流程。
264 61
|
9月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
496 10
|
9月前
|
JavaScript 前端开发 API
JavaScript中通过array.map()实现数据转换、创建派生数组、异步数据流处理、复杂API请求、DOM操作、搜索和过滤等,array.map()的使用详解(附实际应用代码)
array.map()可以用来数据转换、创建派生数组、应用函数、链式调用、异步数据流处理、复杂API请求梳理、提供DOM操作、用来搜索和过滤等,比for好用太多了,主要是写法简单,并且非常直观,并且能提升代码的可读性,也就提升了Long Term代码的可维护性。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
10月前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的消息发送方式
Kafka生产者支持三种消息发送方式:1. **fire-and-forget**:发送后不关心结果,适用于允许消息丢失的场景;2. **同步发送**:通过Future对象确保消息成功送达,适用于高可靠性需求场景;3. **异步发送**:使用回调函数处理结果,吞吐量较高但牺牲部分可靠性。视频和代码示例详细讲解了这三种方式的具体实现。
356 5
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
495 2
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
217 5
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
178 1
|
API C#
异步轮询 Web API 的实现与 C# 示例
异步轮询 Web API 的实现与 C# 示例
396 1