3、Kafka快速入门
对kafka的操作有2种方式,一种是通过命令行方式,一种是通过API方式。
3.1、通过命令行Kafka
Kafka在bin目录下提供了shell脚本文件,可以对Kafka进行操作,分别是:
通过命令行的方式,我们将体验下kafka,以便我们对kafka有进一步的认知。
3.1.1、topic的操作
3.1.1.1、创建topic
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic #执行结果: Created topic "my-kafka-topic".
参数说明:
- zookeeper:参数是必传参数,用于配置 Kafka 集群与 ZooKeeper 连接地址。至少写一个。
- partitions:参数用于设置主题分区数,该配置为必传参数。
- replication-factor:参数用来设置主题副本数 ,该配置也是必传参数。
- topic:指定topic的名称。
3.1.1.2、查看topic列表
kafka-topics.sh --list --zookeeper node01:2181 __consumer_offsets my-kafka-topic
可以查看列表。
如果需要查看topic的详细信息,需要使用describe命令。
kafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic #若不指定topic,则查看所有topic的信息 kafka-topics.sh --describe --zookeeper node01:2181
3.1.1.3、删除topic
通过kafka-topics.sh执行删除动作,需要在server.properties文件中配置 delete.topic.enable=true,该配置默认为 false。
否则执行该脚本并未真正删除主题 ,将该topic标记为删除状态 。
kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic # 执行如下 [root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic Topic my-kafka-topic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. # 如果将delete.topic.enable=true [root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2 Topic my-kafka-topic2 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. # 说明:虽然设置后,删除时依然提示没有设置为true,实际上已经删除了。
3.1.2、生产者的操作
kafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic
可以看到,已经向topic发送了消息。
3.1.3、消费者的操作
kafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic # 通过以上命令,可以看到消费者可以接收生产者发送的消息 # 如果需要从头开始接收数据,需要添加--from-beginning参数 kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic
3.2、通过Java Api操作Kafka
除了通过命令行的方式操作kafka外,还可以通过Java api的方式操作,这种方式将更加的常用。
3.2.1、创建工程
导入依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>itcast-bigdata</artifactId> <groupId>cn.itcast.bigdata</groupId> <version>1.0.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>itcast-bigdata-kafka</artifactId> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <build> <plugins> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
3.2.2、topic的操作
由于主题的元数据信息是注册在 ZooKeeper 相 应节点之中,所以对主题的操作实质是对 ZooKeeper 中记录主题元数据信息相关路径的操作。Kafka将对 ZooKeeper 的相关操作封装成一 个 ZkUtils 类 , 井封装了一个AdrninUtils 类调用 ZkClient 类的相关方法以实现对 Kafka 元数据 的操作,包括对主题、代理、消费者等相关元数据的操作。对主题操作的相关 API调用较简单, 相应操作都是通过调用 AdminUtils类的相应方法来完成的。
package cn.itcast.kafka; import kafka.admin.AdminUtils; import kafka.utils.ZkUtils; import org.apache.kafka.common.security.JaasUtils; import org.junit.Test; import java.util.Properties; public class TestKafkaTopic { @Test public void testCreateTopic() { ZkUtils zkUtils = null; try { //参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制 zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled()); String topicName = "my-kafka-topic-test1"; if (!AdminUtils.topicExists(zkUtils, topicName)) { //参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式 AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6()); System.out.println(topicName + " 创建成功!"); } else { System.out.println(topicName + " 已存在!"); } } finally { if (null != zkUtils) { zkUtils.close(); } } } }
测试结果:
3.2.2.1、删除topic
@Test public void testDeleteTopic() { ZkUtils zkUtils = null; try { //参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制 zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled()); String topicName = "my-kafka-topic-test1"; if (AdminUtils.topicExists(zkUtils, topicName)) { //参数:zkUtils,topic名称 AdminUtils.deleteTopic(zkUtils, topicName); System.out.println(topicName + " 删除成功!"); } else { System.out.println(topicName + " 不已存在!"); } } finally { if (null != zkUtils) { zkUtils.close(); } } }
测试结果:
3.2.3、生产者的操作
package cn.itcast.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import java.util.Properties; public class TestProducer { @Test public void testProducer() throws InterruptedException { Properties config = new Properties(); // 设置kafka服务列表,多个用逗号分隔 config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092"); // 设置序列化消息 Key 的类 config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置序列化消息 value 的类 config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 初始化 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config); for (int i = 0; i < 100 ; i++) { ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i); // 发送消息 kafkaProducer.send(record); System.out.println("发送消息 --> " + i); Thread.sleep(100); } kafkaProducer.close(); } }
3.2.4、消费者的操作
package cn.itcast.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import javax.sound.midi.Soundbank; import java.util.Arrays; import java.util.Properties; public class TestConsumer { @Test public void testConsumer() { Properties config = new Properties(); // 设置kafka服务列表,多个用逗号分隔 config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092"); // 设置消费者分组id config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); // 设置序反列化消息 Key 的类 config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 设置序反列化消息 value 的类 config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config); // 订阅topic kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic")); while (true) { // 使用死循环不断的拉取数据 ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) { String value = record.value(); long offset = record.offset(); System.out.println("value = " + value + ", offset = " + offset); } } } }