Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(一)

简介: Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(一)

1、kafka生产者

1.1 生产者消息发送流程

1.1.1 发送原理

在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

6b57d2400a1f40a5aba07938577a1588.pngbatch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k

linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值数0ms,表示没有延迟。

应答acks:

0:生产者发生过来的数据,不需要等数据落盘应答。

1:生产者发生过来的数据,Leader收到数据后应答

-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。

1.1.2 生产者重要参数列表


微信图片_20230804133249.jpg

1.2 异步发送API

1.2.1 普通异步发送

1、需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker

2、代码编写

(1)创建工程(KafkaDemo)

(2)导入依赖

<dependencies>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>3.0.0</version>
 </dependency>
</dependencies>

(3)创建包名org.zhm.producer

(4)编写不带回调函数的API代码

package org.zhm.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
 * @ClassName CustomProducer
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 18:35
 * @Version 1.0
 */
public class CustomProducer {
    public static void main(String[] args) {
        //1、创建kafka生产者的配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //3、创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
        //4、调用send()方法,发生消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","zhm"+i));
        }
        //5、关闭资源
        kafkaProducer.close();
    }
}

(5)测试

①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

8c1f7e1e98d44b549fe4ad81362d4483.png

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

06f92e2fc23a4b729bd6288cfcda233f.png

1.2.2 带回调函数的异步发送

回调函数会在Producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息·(Exception),如果Exception为null,说明消息发生成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package org.zhm.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * @ClassName CustoProducerCallback
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 18:44
 * @Version 1.0
 */
public class CustoProducerCallback {
    public static void main(String[] args) throws InterruptedException {
        //1、创建kafka生产者的配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //key、value序列化(必须)
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3、创建kafka生产者对象
        KafkaProducer<String,String> producer=new KafkaProducer<>(properties);
        //4、调用send()方法 发送信息
        for (int i = 0; i < 6; i++) {
            //添加回调
            producer.send(new ProducerRecord<>("first", "zhm" + i), new Callback() {
                //该方法在Producer收到ack时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        //没有异常,输出信息到控制台
                        System.out.println("主题:"+recordMetadata.topic()+"->"+"分区:"
                                +recordMetadata.partition());
                    }
                    else {
                        //出现异常打印
                        e.printStackTrace();
                    }
                }
            });
            //延迟一会会看到数据发往不同分区
            Thread.sleep(20);
        }
        //5、关闭资源
        producer.close();
    }
}

1、测试

①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。


c0ac0c73271945f9b8f1ba2747f9e82a.png

1.3 同步发送API

只需在异步发送的基础上,再调用一下 get()方法即可。

package org.zhm.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
 * @ClassName CustomProducerSync
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 18:58
 * @Version 1.0
 */
public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1、创建kafka生产者的配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //key、value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3、创建kafka生产者对象
        KafkaProducer<String,String> producer=new KafkaProducer<>(properties);
        //4、调用send方法,发送信息
        for (int i = 0; i < 10; i++) {
            //异步发送 默认
//            producer.send(new ProducerRecord<>("first","zhm"+i));
            //同步发送
            producer.send(new ProducerRecord<>("first","zhmzhm"+i)).get();
        }
        //5、关闭资源
        producer.close();
    }
}

1、测试

①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。


3e10bb50bc4b4f8191d52875c7e83f2f.png

1.4 生产者分区

1.4.1 分区好处

1、便于合理使用储存资源,每个Partition在一个Broker上储存,可以把海量的数据按照分区切割成一块一块数据储存在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

2、提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

711fce8d637b4d66b5e1dba629880b4f.png

1.4.2 生产者发生消息的分区

1、默认分区器DefaultPartitioner

(1)指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0。

(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

(3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直

使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进

行使用(如果还是0会继续随机)。

2、案例一

将数据发往指定 partition 的情况

package org.zhm.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * @ClassName CustomProducerCallbackPartitions
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/12 19:10
 * @Version 1.0
 */
public class CustomProducerCallbackPartitions {
    public static void main(String[] args) {
        //1、创建kafka生产者的配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //键值序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3、创建生产者对象
        KafkaProducer<String ,String> producer=new KafkaProducer<String, String>(properties);
        //4、调用send方法,发送信息
        for (int i = 0; i < 5; i++) {
            //指定数据发送到1号分区,key1为空
            producer.send(new ProducerRecord<>("first", 1, "", "zhm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("主题:"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition());
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }
        //5、关闭资源
        producer.close();
    }
}

(1)测试

①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。


5e6039e2b8944336a7ad4e5add511dd4.png


相关文章
|
1月前
|
消息中间件 负载均衡 Kafka
【赵渝强老师】Kafka的主题与分区
Kafka 中的消息按主题分类,生产者发送消息到特定主题,消费者订阅主题消费。主题可分多个分区,每个分区仅属一个主题。消息追加到分区时,Broker 分配唯一偏移量地址,确保消息在分区内的顺序性。Kafka 保证分区有序而非主题有序。示例中,Topic A 有 3 个分区,分区可分布于不同 Broker 上,支持负载均衡和容错。视频讲解及图示详见原文。
|
1月前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
2月前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
1月前
|
消息中间件 Kafka
【赵渝强老师】Kafka分区的副本机制
在Kafka中,每个主题可有多个分区,每个分区有多个副本。其中仅有一个副本为Leader,负责对外服务,其余为Follower。当Leader所在Broker宕机时,Follower可被选为新的Leader,实现高可用。文中附有示意图及视频讲解。
|
2月前
|
消息中间件 分布式计算 算法
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
54 3
|
2月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
33 2
|
2月前
|
消息中间件 JSON 大数据
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
51 1
|
2月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
41 1
|
消息中间件 负载均衡 网络协议
Kafka实战(六) - 核心API全面解析
Kafka实战(六) - 核心API全面解析
195 0
Kafka实战(六) - 核心API全面解析
|
7天前
|
人工智能 自然语言处理 API
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动
谷歌推出的Multimodal Live API是一个支持多模态交互、低延迟实时互动的AI接口,能够处理文本、音频和视频输入,提供自然流畅的对话体验,适用于多种应用场景。
45 3
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动