上篇blog安装了可视化的监控工具后,就到了我们最常用的环节,也就是通过代码来控制Kafka,使用API来调用。Kafka文档地址为Kafka官方文档,接下来我们会充分使用到官方文档中的示例,本篇blog分为如下几个部分:
- 环境准备:创建一个java project,用来进行kafka代码的编写
- 生产者API:探讨生产者的发送方式,使用不同的生产者接口发送【同步发送、异步发送】
- 消费者API:探讨生产者的发送方式,使用不同的生产者接口发送【offset提交】
接下来按照如下流程来一起学习吧,奥利给!
环境准备
首先新建一个java project,打开idea新建一个maven项目:
然后引入kafka的的maven依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>kafka</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>7</source> <target>7</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.5</version> </dependency> </dependencies> </project>
生产者API
在官方文档中,我们可以看到Kafka的消费者API列表生产者API,这些都是当前Kafka支持的生产者相关的API,有如下四种构造方法:
也有如下13种方法【非抽象的实例方法】:接下来分成几个模式分别介绍下
发送方式
发送方式分为两种,同步发送和异步发送,主体的发送流程二者是相同的,主体流程如下:
- 首先创建ProducerRecord对象,此对象除了包括需要发送的数据value之外还必须指定topic,另外也可以指定key和分区。当发送ProducerRecord的时候,生产者做的第一件事就是把key和value序列化为ByteArrays,以便它们可以通过网络发送。
- 接下来,数据会被发送到分区器。如果在ProducerRecord中指定了一个分区,那么分区器会直接返回指定的分区;否则,分区器通常会基于ProducerRecord的key值计算出一个分区。一旦分区被确定,生产者就知道数据会被发送到哪个topic和分区。然后数据会被添加到同一批发送到相同topic和分区的数据里面,一个单独的线程会负责把那些批数据发送到对应的brokers。
- 当broker接收到数据的时候,如果数据已被成功写入到Kafka,会返回一个包含topic、分区和偏移量offset的RecordMetadata对象;如果broker写入数据失败,会返回一个异常信息给生产者。当生产者接收到异常信息时会尝试重新发送数据,如果尝试失败则抛出异常。
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 ——main 线程和 Sender 线程,以及 一个线程共享变量 ——RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker
只有数据积累到 batch.size
之后,sender 才会发送数据。如果数据迟迟未达到 batch.size,sender 等待 linger.time
之后就会发送数据,也就是发往broker的数据是一批一批过去的。
异步发送
异步发送的含义是:消息的发送者只是将消息发送过去,并不关心消息的发送状态,如果leader在发送ack后宕机的话,重复发送的消息将不能保证原来的顺序。最好选用带回调函数的方法。
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; public class Producer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092"); //ack模式,all是最慢但最安全的 props.put("acks", "-1"); //失败重试次数 props.put("retries", 1); //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端 props.put("batch.size", 10); //props.put("max.request.size",10); //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端 props.put("linger.ms", 10000); //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端 //buffer.memory要大于batch.size,否则会报申请内存不足的错误 props.put("buffer.memory", 10240); //序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer=new KafkaProducer(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("tml-second", Integer.toString(i), "tml-second消息:"+i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("消息发送状态监测"); } }); producer.close(); } }
我们可以从机器上看到消息记录
为了更准确一些,我们用命令消费一下:
同步发送
同步发送用的比较少,唯一的不同就是他要求发送时按照顺序,如果当条数据发送失败,那么就阻塞线程,这样就保证了消息的严格顺序【即使在重试状态下发送的消息】
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.concurrent.ExecutionException; public class Producer { public static void main(String[] args) throws InterruptedException, ExecutionException { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092"); //ack模式,all是最慢但最安全的 props.put("acks", "-1"); //失败重试次数 props.put("retries", 1); //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端 props.put("batch.size", 10); //props.put("max.request.size",10); //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端 props.put("linger.ms", 10000); //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端 //buffer.memory要大于batch.size,否则会报申请内存不足的错误 props.put("buffer.memory", 10240); //序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer=new KafkaProducer(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("tml-second", Integer.toString(i), "tml-second消息:"+i)).get(); producer.close(); } }
防止消息重复提交
在生产者策略的时候我们提到过,需要防止消息重复提交,也即精准一次提交,我们有两种级别,一种是幂等模式【一个broker的会话周期精准一次】,另一种是事务模式【全局的精准一次】。
幂等模式
代码写法类似,只需要给配置里加一个配置项
//幂等模式 props.put("enable.idempotence", true);
一旦设置了该属性,那么retries默认是Integer.MAX_VALUE ,acks默认是all【-1】。
事务模式
事务模式的写法略有不同:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092"); props.put("transactional.id", "my_transactional_id"); org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { //数据发送必须在beginTransaction()和commitTransaction()中间,否则会报状态不对的异常 producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("tml-second", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // 这些异常不能被恢复,因此必须要关闭并退出Producer producer.close(); } catch (KafkaException e) { // 出现其它异常,终止事务 producer.abortTransaction(); } producer.close(); } }
消费者API
在官方文档中,我们可以看到Kafka的消费者API列表消费者API,有构造方法,和实例方法。构造方法有如下四种:
也有45种方法【非抽象的实例方法】以及4种弃用方法。消费者提交方式有以下几种:
- 自动提交:kafka管理offset的提交
- 手动提交:手动同步提交和手动异步提交
按照这种结构我们看下提交方式。
自动提交offset
提交的代码如下:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "tml-group"); //开启offset自动提交 props.put("enable.auto.commit", "true"); //自动提交时间间隔 props.put("auto.commit.interval.ms", "1000"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //实例化一个消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费者订阅主题,可以订阅多个主题 consumer.subscribe(Arrays.asList("tml-second")); //死循环不停的从broker中拿数据 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
可以看到提交的效果
手动同步提交offset
通常从Kafka拿到的消息是要做业务处理,而且业务处理完成才算真正消费成功,所以需要客户端控制offset提交时间
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "tml_group"); //开启offset自动提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //实例化一个消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费者订阅主题,可以订阅多个主题 consumer.subscribe(Arrays.asList("tml-second")); final int minBatchSize = 50; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { //insertIntoDb(buffer); for (ConsumerRecord bf : buffer) { System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value()); } consumer.commitSync(); buffer.clear(); } } } }
手动异步提交offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.*; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "tml_group"); //开启offset自动提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //实例化一个消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费者订阅主题,可以订阅多个主题 consumer.subscribe(Arrays.asList("tml-second")); final int minBatchSize = 50; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { //insertIntoDb(buffer); for (ConsumerRecord bf : buffer) { System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for" + offsets); } } }); buffer.clear(); } } } }
趟了无数的坑,终于把Kafka学习完了,接下来开始Redis之旅,开始由业务架构向基础架构渗透,上可接客户,中可玩儿平台,下可探基础。完成SaaS、PaaS以及IaaS的闭环
部分内容来自 https://blog.csdn.net/wangzhanzheng/article/details/80801059