大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生网关 MSE Higress,422元/月
简介: 大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka (正在更新…)

章节内容

上节我们完成了:


Kafka介绍

ZK的基本环境

Kafka下载解压配置

Kafka启动配置

Kafka启动服务

Kafka启动

上节我们通过sh脚本启动,但是当我们的SSH关闭的时候,Kafka服务也退出。

这里我们可以使用 Kakfa 的守护进程的方式启动,就可以在后台运行了。


kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties

启动之后,我们可以通过 ps 工具看到:

ps aux | grep kafka

返回结果如下图:

sh脚本使用

topics.sh

kakfa-topics.sh 用于管理主题

查看所有

kafka-topics.sh --list --zookeeper h121.wzk.icu:2181

当前执行返回的是空的,因为我们没有任何主题。

创建主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_1 --partitions 1 --replication-factor 1

执行结果中,我们可以观察到,已经顺利的完成了。

查看主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_topic_1

执行结果中,我们可以观察到,已经顺利的完成了。

删除主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --delete --topic wzk_topic_1

新建主题(用于测试)

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_test --partitions 1 --replication-factor 1

producer.sh

kafka-console-producer.sh 用于生产消息

生成数据

kafka-console-producer.sh --topic wzk_topic_test --broker-list h121.wzk.icu:9092

手动生成一批数据来进行测试:

consumer.sh

kafka-console-consumer.sh 用于消费消息

消费数据

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test

此时,我们需要再开启一个 Producer 产生数据,它才会继续消费。

从头消费

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test --from-beginning

从头开始消费的话,我们可以看到消费者已经把刚才我们写入的数据都消费了

Java API

架构图

POM

kafka-clients 是 Apache Kafka 提供的一个 Java 库,用于与 Kafka 进行交互。它是 Kafka 的核心组件之一,提供了对 Kafka 生产者和消费者的实现,使得 Java 应用程序可以方便地将数据发送到 Kafka 主题或从中读取数据。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.2</version>
</dependency>

Producer(生产者): 生产者负责将消息发送到 Kafka 的指定主题(Topic)。每条消息由一个键值对(key-value pair)组成,Kafka 会根据键对消息进行分区(Partitioning)。

Consumer(消费者): 消费者从 Kafka 的主题中读取消息。消费者组(Consumer Group)允许多个消费者协调工作,共同处理来自主题的消息。

Topic(主题): Kafka 中的逻辑通道,用于存储消息。每个主题可以有多个分区(Partition),消息在分区内是有序的,但在不同分区间无序。

Partition(分区): 主题的物理分片,允许 Kafka 在分布式环境中扩展性能。每个分区可以有一个或多个副本(Replica),其中一个作为 Leader,其他作为 Follower。

常用配置

bootstrap.servers: Kafka broker 的地址列表,生产者和消费者通过这个地址连接到 Kafka 集群。

key.serializer / value.serializer: 生产者消息的键和值的序列化类。

key.deserializer / value.deserializer: 消费者消息的键和值的反序列化类。

acks: 生产者配置,用于指定 Kafka 对消息确认的级别(0, 1, all)。

enable.auto.commit: 消费者配置,是否自动提交偏移量。默认是 true。

auto.offset.reset: 消费者配置,当消费者组没有初始偏移量或偏移量不存在时,如何处理(earliest, latest, none)。

生产者1测试


public class TestProducer01 {

    public static void main(String[] args) throws Exception {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "h121.wzk.icu:9092");
        configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("acks", "1");
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);
        ProducerRecord<Integer, String> record = new ProducerRecord<>(
                "wzk_topic_test",
                0, 0,
                "hello world by java!"
        );
        Future<RecordMetadata> future = producer.send(record);
        future.get(3_000, TimeUnit.SECONDS);
        producer.close();
    }

}

生产者1运行

运行结果如下图:

消费者01运行


public class TestConsumer01 {

    public static void main(String[] args) throws Exception {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "h121.wzk.icu:9092");
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("group.id", "wzk-test");

        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);

        final List<String> topics = Arrays.asList("wzk_topic_test");
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                collection.forEach(item -> {
                    System.out.println("剥夺的分区: " + item.partition());
                });
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                collection.forEach(item -> {
                    System.out.println("接收的分区: " + item.partition());
                });
            }
        });

        final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
        final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
        topic1Iterable.forEach(record -> {
            System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
            System.out.println("消息的key:" + record.key());
            System.out.println("消息的偏移量:" + record.offset());
            System.out.println("消息的分区号:" + record.partition());
            System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
            System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
            System.out.println("消息的时间戳:" + record.timestamp());
            System.out.println("消息的时间戳类型:" + record.timestampType());
            System.out.println("消息的主题:" + record.topic());
            System.out.println("消息的值:" + record.value());
        });

        consumer.close();
    }

}

消费者01测试

控制台运行截图如下:

相关文章
|
1天前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
12 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
1天前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
13 5
|
1天前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
13 4
|
1天前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
12 3
|
1天前
|
消息中间件 缓存 大数据
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
13 3
|
1天前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
11 2
|
1天前
|
缓存 分布式计算 NoSQL
大数据-43 Redis 功能扩展 Lua 脚本 对Redis扩展 eval redis.call redis.pcall
大数据-43 Redis 功能扩展 Lua 脚本 对Redis扩展 eval redis.call redis.pcall
11 2
|
2天前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
6 1
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
105 9
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
59 3