1、kafka生产者
1.1 生产者消息发送流程
1.1.1 发送原理
在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值数0ms,表示没有延迟。
应答acks:
0:生产者发生过来的数据,不需要等数据落盘应答。
1:生产者发生过来的数据,Leader收到数据后应答
-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。
1.1.2 生产者重要参数列表
1.2 异步发送API
1.2.1 普通异步发送
1、需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
2、代码编写
(1)创建工程(KafkaDemo)
(2)导入依赖
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> </dependencies>
(3)创建包名org.zhm.producer
(4)编写不带回调函数的API代码
package org.zhm.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @ClassName CustomProducer * @Description TODO * @Author Zouhuiming * @Date 2023/6/12 18:35 * @Version 1.0 */ public class CustomProducer { public static void main(String[] args) { //1、创建kafka生产者的配置对象 Properties properties=new Properties(); //2、给kafka配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //key,value序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //3、创建kafka生产者对象 KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties); //4、调用send()方法,发生消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first","zhm"+i)); } //5、关闭资源 kafkaProducer.close(); } }
(5)测试
①在 hadoop102 上开启 Kafka 消费者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
1.2.2 带回调函数的异步发送
回调函数会在Producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息·(Exception),如果Exception为null,说明消息发生成功,如果Exception不为null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
package org.zhm.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * @ClassName CustoProducerCallback * @Description TODO * @Author Zouhuiming * @Date 2023/6/12 18:44 * @Version 1.0 */ public class CustoProducerCallback { public static void main(String[] args) throws InterruptedException { //1、创建kafka生产者的配置对象 Properties properties=new Properties(); //2、给kafka配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //key、value序列化(必须) properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); //3、创建kafka生产者对象 KafkaProducer<String,String> producer=new KafkaProducer<>(properties); //4、调用send()方法 发送信息 for (int i = 0; i < 6; i++) { //添加回调 producer.send(new ProducerRecord<>("first", "zhm" + i), new Callback() { //该方法在Producer收到ack时调用,为异步调用 @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e==null){ //没有异常,输出信息到控制台 System.out.println("主题:"+recordMetadata.topic()+"->"+"分区:" +recordMetadata.partition()); } else { //出现异常打印 e.printStackTrace(); } } }); //延迟一会会看到数据发往不同分区 Thread.sleep(20); } //5、关闭资源 producer.close(); } }
1、测试
①在 hadoop102 上开启 Kafka 消费者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
1.3 同步发送API
只需在异步发送的基础上,再调用一下 get()方法即可。
package org.zhm.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * @ClassName CustomProducerSync * @Description TODO * @Author Zouhuiming * @Date 2023/6/12 18:58 * @Version 1.0 */ public class CustomProducerSync { public static void main(String[] args) throws ExecutionException, InterruptedException { //1、创建kafka生产者的配置对象 Properties properties=new Properties(); //2、给kafka配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //key、value序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); //3、创建kafka生产者对象 KafkaProducer<String,String> producer=new KafkaProducer<>(properties); //4、调用send方法,发送信息 for (int i = 0; i < 10; i++) { //异步发送 默认 // producer.send(new ProducerRecord<>("first","zhm"+i)); //同步发送 producer.send(new ProducerRecord<>("first","zhmzhm"+i)).get(); } //5、关闭资源 producer.close(); } }
1、测试
①在 hadoop102 上开启 Kafka 消费者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
1.4 生产者分区
1.4.1 分区好处
1、便于合理使用储存资源,每个Partition在一个Broker上储存,可以把海量的数据按照分区切割成一块一块数据储存在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
2、提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
1.4.2 生产者发生消息的分区
1、默认分区器DefaultPartitioner
(1)指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0。
(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。
(3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直
使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。
例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进
行使用(如果还是0会继续随机)。
2、案例一
将数据发往指定 partition 的情况
package org.zhm.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * @ClassName CustomProducerCallbackPartitions * @Description TODO * @Author Zouhuiming * @Date 2023/6/12 19:10 * @Version 1.0 */ public class CustomProducerCallbackPartitions { public static void main(String[] args) { //1、创建kafka生产者的配置对象 Properties properties=new Properties(); //2、给kafka配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //键值序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); //3、创建生产者对象 KafkaProducer<String ,String> producer=new KafkaProducer<String, String>(properties); //4、调用send方法,发送信息 for (int i = 0; i < 5; i++) { //指定数据发送到1号分区,key1为空 producer.send(new ProducerRecord<>("first", 1, "", "zhm" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e==null){ System.out.println("主题:"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition()); }else { e.printStackTrace(); } } }); } //5、关闭资源 producer.close(); } }
(1)测试
①在 hadoop102 上开启 Kafka 消费者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。