Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(一)

简介: Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(一)

1、kafka生产者

1.1 生产者消息发送流程

1.1.1 发送原理

在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

6b57d2400a1f40a5aba07938577a1588.pngbatch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k

linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值数0ms,表示没有延迟。

应答acks:

0:生产者发生过来的数据,不需要等数据落盘应答。

1:生产者发生过来的数据,Leader收到数据后应答

-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。

1.1.2 生产者重要参数列表


微信图片_20230804133249.jpg

1.2 异步发送API

1.2.1 普通异步发送

1、需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker

2、代码编写

(1)创建工程(KafkaDemo)

(2)导入依赖

<dependencies>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>3.0.0</version>
 </dependency>
</dependencies>

(3)创建包名org.zhm.producer

(4)编写不带回调函数的API代码

package org.zhm.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
 * @ClassName CustomProducer
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 18:35
 * @Version 1.0
 */
public class CustomProducer {
    public static void main(String[] args) {
        //1、创建kafka生产者的配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //3、创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
        //4、调用send()方法,发生消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","zhm"+i));
        }
        //5、关闭资源
        kafkaProducer.close();
    }
}

(5)测试

①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

8c1f7e1e98d44b549fe4ad81362d4483.png

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

06f92e2fc23a4b729bd6288cfcda233f.png

1.2.2 带回调函数的异步发送

回调函数会在Producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息·(Exception),如果Exception为null,说明消息发生成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package org.zhm.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * @ClassName CustoProducerCallback
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 18:44
 * @Version 1.0
 */
public class CustoProducerCallback {
    public static void main(String[] args) throws InterruptedException {
        //1、创建kafka生产者的配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //key、value序列化(必须)
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3、创建kafka生产者对象
        KafkaProducer<String,String> producer=new KafkaProducer<>(properties);
        //4、调用send()方法 发送信息
        for (int i = 0; i < 6; i++) {
            //添加回调
            producer.send(new ProducerRecord<>("first", "zhm" + i), new Callback() {
                //该方法在Producer收到ack时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        //没有异常,输出信息到控制台
                        System.out.println("主题:"+recordMetadata.topic()+"->"+"分区:"
                                +recordMetadata.partition());
                    }
                    else {
                        //出现异常打印
                        e.printStackTrace();
                    }
                }
            });
            //延迟一会会看到数据发往不同分区
            Thread.sleep(20);
        }
        //5、关闭资源
        producer.close();
    }
}

1、测试

①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。


c0ac0c73271945f9b8f1ba2747f9e82a.png

1.3 同步发送API

只需在异步发送的基础上,再调用一下 get()方法即可。

package org.zhm.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
 * @ClassName CustomProducerSync
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 18:58
 * @Version 1.0
 */
public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1、创建kafka生产者的配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //key、value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3、创建kafka生产者对象
        KafkaProducer<String,String> producer=new KafkaProducer<>(properties);
        //4、调用send方法,发送信息
        for (int i = 0; i < 10; i++) {
            //异步发送 默认
//            producer.send(new ProducerRecord<>("first","zhm"+i));
            //同步发送
            producer.send(new ProducerRecord<>("first","zhmzhm"+i)).get();
        }
        //5、关闭资源
        producer.close();
    }
}

1、测试

①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。


3e10bb50bc4b4f8191d52875c7e83f2f.png

1.4 生产者分区

1.4.1 分区好处

1、便于合理使用储存资源,每个Partition在一个Broker上储存,可以把海量的数据按照分区切割成一块一块数据储存在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

2、提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

711fce8d637b4d66b5e1dba629880b4f.png

1.4.2 生产者发生消息的分区

1、默认分区器DefaultPartitioner

(1)指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0。

(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

(3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直

使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进

行使用(如果还是0会继续随机)。

2、案例一

将数据发往指定 partition 的情况

package org.zhm.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * @ClassName CustomProducerCallbackPartitions
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 19:10
 * @Version 1.0
 */
public class CustomProducerCallbackPartitions {
    public static void main(String[] args) {
        //1、创建kafka生产者的配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //键值序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3、创建生产者对象
        KafkaProducer<String ,String> producer=new KafkaProducer<String, String>(properties);
        //4、调用send方法,发送信息
        for (int i = 0; i < 5; i++) {
            //指定数据发送到1号分区,key1为空
            producer.send(new ProducerRecord<>("first", 1, "", "zhm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("主题:"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition());
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }
        //5、关闭资源
        producer.close();
    }
}

(1)测试

①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。


5e6039e2b8944336a7ad4e5add511dd4.png


相关文章
|
13天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
42 2
|
1月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
59 5
|
1月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
26 2
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
129 58
|
1月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
32 1
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
31 1
|
2月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
3月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
3月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
|
25天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
下一篇
无影云桌面