kafka生产者异步和同步发送API

简介: kafka的生产者异步和同步发送API

异步发送API

  1. 导入依赖
<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
</dependencies>
  1. 编写代码

需要注意:

  • KafkaProducer:需要创建一个生产者对象,用来发送数据
  • ProducerConfig:获取所需的一系列配置参数
  • ProducerRecord:每条数据都要封装成一个ProducerRecord对象
  • 回调函数会在producer收到ack时调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        //1.创建kafka生产者的配置对象
        Properties properties = new Properties();
        //2.给kafka配置对象添加配置信息
        properties.put("bootstrap.servers","hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //批次处理
        properties.put("batch.size",16384);
        //等待时间(默认0,现在kafka作为事件流平台,这个值就不用设置了,来一条刷写一条)
        properties.put("linger.ms",1);
        //RecordAccumulator(线程共享变量)缓冲区大小,默认32M
        properties.put("buffer.memory",33554432);
        //key,value的序列化(必须要进行的)
        properties.put("key.zerializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置ack
        properties.put("acks", "all");
         // 重试次数
        properties.put("retries", 3);
        //3.创建kafka的生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //4.调用send,发送信息
        for (int i = 0; i < 10 ; i++) {
            //添加回调(回调函数可以方便看到消息发送的主题,分区和偏移量)
            kafkaProducer.send(new ProducerRecord<>("producer", "kafka" + i), new Callback() {
                //该方法在收到ack时异步调用
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e == null){
                        //如果没有异常,输出信息到控制台
                        System.out.println("success->"+ recordMetadata.offset());
                    }else{
                        //出现异常打印
                        e.printStackTrace();
                    }

                }
            });
        }
        //5.关闭资源,(关闭资源的时候会自动将数据刷写到消费者)
        kafkaProducer.close();
    }
}

同步发送API
同步发送的意思是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息
        properties.put("bootstrap.servers","hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // key,value序列化(必须要进行的)
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          // 设置ack
        properties.put("acks", "all");
        // 重试次数
        properties.put("retries", 3);
        // 批次大小 默认16K
        properties.put("batch.size", 16384);
        // 等待时间
        properties.put("linger.ms", 1);
        // RecordAccumulator缓冲区大小 默认32M
        properties.put("buffer.memory", 33554432);
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            //不带回调函数.
            // 异步发送 默认
           //kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
            // 同步发送,java中的方法处理同步
            kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}
相关文章
|
2月前
|
机器学习/深度学习 人工智能 缓存
电商 API 接口:开启全平台商品信息同步新时代
在数字化浪潮下,电商平台激增,消费者跨平台购物成为常态。然而,商品信息分散导致数据不一致、库存混乱等问题。电商 API 接口应运而生,通过标准化数据交换,实现多平台商品信息实时同步,提升运营效率、降低成本、增强用户体验,成为企业数字化转型的关键引擎。
172 0
|
1月前
|
存储 JSON 监控
淘宝/天猫:通过商品详情API实现多店铺商品信息批量同步,确保价格、库存实时更新
在电商运营中,管理多个淘宝或天猫店铺的商品信息(如价格、库存)耗时易错。本文介绍如何通过淘宝/天猫开放平台的商品详情API,实现自动化批量同步,确保信息实时更新。内容涵盖API调用、多店铺数据处理、实时更新策略及注意事项,助您高效管理多店铺商品信息。
79 0
|
2月前
|
人工智能 JSON 监控
电商 API 赋能,多平台促销活动精准同步
在电商多平台促销中,手动管理易导致价格混乱、库存超卖等问题。电商 API 通过自动化实现促销活动的精准同步,提升效率、减少错误,助力企业高效运营,实现销售增长与成本节约,是电商数字化转型的关键工具。
68 0
|
3月前
|
供应链 安全 API
淘宝API一键同步库存,销量翻倍轻松实现!
在电商竞争激烈的当下,库存管理是提升销量的关键。淘宝开放平台提供API接口,支持一键同步库存,实现线上线下数据实时更新,避免缺货或超卖。本文详解操作步骤,并附Python示例代码,助你轻松掌握自动化库存管理,提升转化率,实现销量翻倍。
252 0
|
3月前
|
JSON 人工智能 自然语言处理
根据标题利用API实现电商多平台同步:省时省力生成文章
在电商运营中,多平台同步产品信息、库存和订单数据至关重要。手动操作耗时易错,而通过API实现自动化同步,不仅能提升效率,还可结合内容生成工具自动创建产品文章。本文详解如何利用API完成数据同步与内容生成,帮助商家节省时间、减少错误、优化成本,提升运营效率。
56 0
|
9月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
12月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
404 1
|
12月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
276 1
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
937 9
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章