Kafka - 异步/同步发送API

简介: Kafka - 异步/同步发送API


异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>
package com.artisan.pc;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");
        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();
            System.out.println(art.offset());
            System.out.println("over - " + i);
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

输出

31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧


回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerWithCallBack {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");
        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 添加回调
            // 该方法在Producer收到ack时调用,为异步调用
            kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {
                // 没有异常,输出信息到控制台
                System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());
            });
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

控制台


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。

由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

package com.artisan.pc;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");
        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 通过Future接口的get实现同步阻塞
            kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}


相关文章
|
2月前
|
机器学习/深度学习 人工智能 缓存
电商 API 接口:开启全平台商品信息同步新时代
在数字化浪潮下,电商平台激增,消费者跨平台购物成为常态。然而,商品信息分散导致数据不一致、库存混乱等问题。电商 API 接口应运而生,通过标准化数据交换,实现多平台商品信息实时同步,提升运营效率、降低成本、增强用户体验,成为企业数字化转型的关键引擎。
175 0
|
1月前
|
存储 JSON 监控
淘宝/天猫:通过商品详情API实现多店铺商品信息批量同步,确保价格、库存实时更新
在电商运营中,管理多个淘宝或天猫店铺的商品信息(如价格、库存)耗时易错。本文介绍如何通过淘宝/天猫开放平台的商品详情API,实现自动化批量同步,确保信息实时更新。内容涵盖API调用、多店铺数据处理、实时更新策略及注意事项,助您高效管理多店铺商品信息。
80 0
|
2月前
|
人工智能 JSON 监控
电商 API 赋能,多平台促销活动精准同步
在电商多平台促销中,手动管理易导致价格混乱、库存超卖等问题。电商 API 通过自动化实现促销活动的精准同步,提升效率、减少错误,助力企业高效运营,实现销售增长与成本节约,是电商数字化转型的关键工具。
69 0
|
3月前
|
供应链 安全 API
淘宝API一键同步库存,销量翻倍轻松实现!
在电商竞争激烈的当下,库存管理是提升销量的关键。淘宝开放平台提供API接口,支持一键同步库存,实现线上线下数据实时更新,避免缺货或超卖。本文详解操作步骤,并附Python示例代码,助你轻松掌握自动化库存管理,提升转化率,实现销量翻倍。
256 0
|
3月前
|
JSON 人工智能 自然语言处理
根据标题利用API实现电商多平台同步:省时省力生成文章
在电商运营中,多平台同步产品信息、库存和订单数据至关重要。手动操作耗时易错,而通过API实现自动化同步,不仅能提升效率,还可结合内容生成工具自动创建产品文章。本文详解如何利用API完成数据同步与内容生成,帮助商家节省时间、减少错误、优化成本,提升运营效率。
56 0
|
7月前
|
JavaScript 前端开发 API
JavaScript中通过array.map()实现数据转换、创建派生数组、异步数据流处理、复杂API请求、DOM操作、搜索和过滤等,array.map()的使用详解(附实际应用代码)
array.map()可以用来数据转换、创建派生数组、应用函数、链式调用、异步数据流处理、复杂API请求梳理、提供DOM操作、用来搜索和过滤等,比for好用太多了,主要是写法简单,并且非常直观,并且能提升代码的可读性,也就提升了Long Term代码的可维护性。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
12月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
177 5
|
API C#
异步轮询 Web API 的实现与 C# 示例
异步轮询 Web API 的实现与 C# 示例
272 1
|
9月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
12月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
409 1

热门文章

最新文章