Kafka入门宝典(详细截图版)(二)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Kafka入门宝典(详细截图版)(二)

3、Kafka快速入门


对kafka的操作有2种方式,一种是通过命令行方式,一种是通过API方式。

3.1、通过命令行Kafka

Kafka在bin目录下提供了shell脚本文件,可以对Kafka进行操作,分别是:

image.png

通过命令行的方式,我们将体验下kafka,以便我们对kafka有进一步的认知。

3.1.1、topic的操作

3.1.1.1、创建topic
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
#执行结果:
Created topic "my-kafka-topic".

参数说明:

  • zookeeper:参数是必传参数,用于配置 Kafka 集群与 ZooKeeper 连接地址。至少写一个。
  • partitions:参数用于设置主题分区数,该配置为必传参数。
  • replication-factor:参数用来设置主题副本数 ,该配置也是必传参数。
  • topic:指定topic的名称。
3.1.1.2、查看topic列表
kafka-topics.sh --list --zookeeper node01:2181
__consumer_offsets
my-kafka-topic

可以查看列表。

如果需要查看topic的详细信息,需要使用describe命令。

kafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic
#若不指定topic,则查看所有topic的信息
kafka-topics.sh --describe --zookeeper node01:2181
3.1.1.3、删除topic

通过kafka-topics.sh执行删除动作,需要在server.properties文件中配置 delete.topic.enable=true,该配置默认为 false。

否则执行该脚本并未真正删除主题 ,将该topic标记为删除状态 。

kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
# 执行如下
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
Topic my-kafka-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 如果将delete.topic.enable=true
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2
Topic my-kafka-topic2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 说明:虽然设置后,删除时依然提示没有设置为true,实际上已经删除了。

3.1.2、生产者的操作

kafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic

可以看到,已经向topic发送了消息。

3.1.3、消费者的操作

kafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic
# 通过以上命令,可以看到消费者可以接收生产者发送的消息
# 如果需要从头开始接收数据,需要添加--from-beginning参数
kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic

image.png

3.2、通过Java Api操作Kafka

除了通过命令行的方式操作kafka外,还可以通过Java api的方式操作,这种方式将更加的常用。

3.2.1、创建工程

image.png

导入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>itcast-bigdata</artifactId>
        <groupId>cn.itcast.bigdata</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>itcast-bigdata-kafka</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3.2.2、topic的操作

由于主题的元数据信息是注册在 ZooKeeper 相 应节点之中,所以对主题的操作实质是对 ZooKeeper 中记录主题元数据信息相关路径的操作。Kafka将对 ZooKeeper 的相关操作封装成一 个 ZkUtils 类 , 井封装了一个AdrninUtils 类调用 ZkClient 类的相关方法以实现对 Kafka 元数据 的操作,包括对主题、代理、消费者等相关元数据的操作。对主题操作的相关 API调用较简单, 相应操作都是通过调用 AdminUtils类的相应方法来完成的。

package cn.itcast.kafka;
import kafka.admin.AdminUtils;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import java.util.Properties;
public class TestKafkaTopic {
    @Test
    public void testCreateTopic() {
        ZkUtils zkUtils = null;
        try {
            //参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制
            zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
            String topicName = "my-kafka-topic-test1";
            if (!AdminUtils.topicExists(zkUtils, topicName)) {
                //参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
                AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
                System.out.println(topicName + " 创建成功!");
            } else {
                System.out.println(topicName + " 已存在!");
            }
        } finally {
            if (null != zkUtils) {
                zkUtils.close();
            }
        }
    }
}

测试结果:

image.png

3.2.2.1、删除topic
@Test
    public void testDeleteTopic() {
        ZkUtils zkUtils = null;
        try {
            //参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制
            zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
            String topicName = "my-kafka-topic-test1";
            if (AdminUtils.topicExists(zkUtils, topicName)) {
                //参数:zkUtils,topic名称
                AdminUtils.deleteTopic(zkUtils, topicName);
                System.out.println(topicName + " 删除成功!");
            } else {
                System.out.println(topicName + " 不已存在!");
            }
        } finally {
            if (null != zkUtils) {
                zkUtils.close();
            }
        }
    }

测试结果:

image.png

3.2.3、生产者的操作

package cn.itcast.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.util.Properties;
public class TestProducer {
    @Test
    public void testProducer() throws InterruptedException {
        Properties config = new Properties();
        // 设置kafka服务列表,多个用逗号分隔
        config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
        // 设置序列化消息 Key 的类
        config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置序列化消息 value 的类
        config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 初始化
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
        for (int i = 0; i < 100 ; i++) {
            ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
            // 发送消息
            kafkaProducer.send(record);
            System.out.println("发送消息 --> " + i);
            Thread.sleep(100);
        }
        kafkaProducer.close();
    }
}

3.2.4、消费者的操作

package cn.itcast.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import javax.sound.midi.Soundbank;
import java.util.Arrays;
import java.util.Properties;
public class TestConsumer {
    @Test
    public void testConsumer() {
        Properties config = new Properties();
        // 设置kafka服务列表,多个用逗号分隔
        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
        // 设置消费者分组id
        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        // 设置序反列化消息 Key 的类
        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 设置序反列化消息 value 的类
        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
        // 订阅topic
        kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));
        while (true) { // 使用死循环不断的拉取数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                String value = record.value();
                long offset = record.offset();
                System.out.println("value = " + value + ", offset = " + offset);
            }
        }
    }
}
相关文章
|
6月前
|
消息中间件 Java Kafka
kafka入门demo
kafka入门demo
74 0
|
消息中间件 监控 关系型数据库
【Kafka系列】(一)Kafka入门(下)
【Kafka系列】(一)Kafka入门(下)
|
6月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
102 5
|
6月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
250 1
|
6月前
|
消息中间件 存储 Kafka
Kafka【基础入门】
Kafka【基础入门】
69 1
|
6月前
|
消息中间件 存储 分布式计算
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
80 0
|
6月前
|
消息中间件 算法 Kafka
Kafka入门,这一篇就够了(安装,topic,生产者,消费者)
Kafka入门,这一篇就够了(安装,topic,生产者,消费者)
296 0
|
消息中间件 存储 Kafka
(四)kafka从入门到精通之安装教程
Kafka是一个高性能、低延迟、分布式的分布式数据库,可以在分布式环境中实现数据的实时同步和分发。Zookeeper是一种开源的分布式数据存储系统,它可以在分布式环境中存储和管理数据库中的数据。它的主要作用是实现数据的实时同步和分发,可以用于实现分布式数据库、分布式文件系统、分布式日志系统等。Zookeeper的设计目标是高可用性、高性能、低延迟,它支持多种客户端协议,包括TCP和HTTP,可以方便地与其他分布式系统进行集成。
131 0
|
消息中间件 传感器 Kafka
(三)kafka从入门到精通之使用场景
Kafka 是一种流处理平台,主要用于处理大量数据流,如实时事件、日志文件和传感器数据等。Kafka的目的是实现高吞吐量、低延迟和高可用性的数据处理。Kafka提供了一个高度可扩展的架构,可以轻松地添加和删除节点,并且能够处理数百亿条消息/分区。Kafka的消息可以容错,即使某个节点失败,消息也会在集群中的其他节点上得到处理。总的来说,Kafka 是一个非常强大的数据处理平台,可以用于实时数据处理、日志文件处理、传感器数据处理和流处理等场景。
152 0
|
消息中间件 存储 Java
【Kafka系列】(一)Kafka入门(上)
【Kafka系列】(一)Kafka入门
下一篇
无影云桌面