【Kafka】(六)Java 操作 kafka Streams

简介: 【Kafka】(六)Java 操作 kafka Streams

文章目录


一、导入maven包

二、编写第一个Streams应用程序:将一个topic写入另一个topic

三、Line Split

四、单行映射成多行


一、导入maven包


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.1.1</version>
    </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
</dependency>
   <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>2.1.1</version>
    </dependency>


二、编写第一个Streams应用程序:将一个topic写入另一个topic


编写Streams应用程序的第一步是创建一个java.util.Properties映射,以指定不同的Streams执行配置值StreamsConfig。您需要设置的几个重要配置值是:StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,它指定用于建立与Kafka集群的初始连接的主机/端口对的列表,并且StreamsConfig.APPLICATION_ID_CONFIG它提供Streams应用程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信:

//程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"zj");
//用于建立与Kafka集群的初始连接的主机/端口对的列表
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.137:9092");


此外,您可以在同一映射中自定义其他配置,例如,记录键值对的默认序列化和反序列化库:

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());


接下来,我们将定义Streams应用程序的计算逻辑。在Kafka Streams中,该计算逻辑被定义为topology连接的处理器节点之一。我们可以使用拓扑构建器来构建这样的拓扑

final StreamsBuilder builder = new StreamsBuilder();
builder.stream("demo3").to("test2");


并将其描述打印到标准输出为

System.out.println(topology.describe());


如果我们停在这里,编译并运行程序,它将输出以下信息:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [my-replicated-topic])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: skindow-toptic)
      <-- KSTREAM-SOURCE-0000000000


如上所示,它说明构造的拓扑具有两个处理器节点,源节点KSTREAM-SOURCE-0000000000和汇聚节点KSTREAM-SINK-0000000001。 KSTREAM-SOURCE-0000000000连续读取Kafka主题的记录my-replicated-topic并将它们传送到下游节点KSTREAM-SINK-0000000001; KSTREAM-SINK-0000000001将写入每个接收到的记录以便另一个Kafka主题skindow-toptic (–>和<–箭头指示该节点的下游和上游处理器节点,即拓扑图中的“子节点”和“父节点”)。它还说明了这个简单的拓扑没有与之关联的全局状态存储 java.util.Properties 实例中指定的配置映射和的Topology对象。

final KafkaStreams stream = new KafkaStreams(topo,prop);


通过调用它的start()函数,我们可以触发该客户端的执行。close()在此客户端上调用之前,执行不会停止。例如,我们可以添加带倒计时锁存器的关闭钩子来捕获用户中断并在终止此程序时关闭客户端:

final CountDownLatch latch = new CountDownLatch(1);
        // 附加关闭处理程序来捕获control-c
        Runtime.getRuntime().addShutdownHook(new Thread("zj01"){
            @Override
            public void run(){
                stream.close();
                latch.countDown();
            }
        });
        try {
            stream.start();
            latch.await();
        }catch (InterruptedException e){
            //是非正常退出,就是说无论程序正在执行与否,都退出
            System.exit(1);
        }
        //正常退出,程序正常执行结束退出
        System.exit(0);
    }
}


完整的代码如下所示:

package com.njbdqn.services;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class MyStream {
    public static void main(String[] args) {
        Properties prop = new Properties();
        //程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"zj");
        //用于建立与Kafka集群的初始连接的主机/端口对的列表
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.137:9092");
        //记录键值对的默认序列化和反序列化库
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //定义Streams应用程序的计算逻辑,计算逻辑被定义为topology连接的处理器节点之一,构建流构建工具
        final StreamsBuilder builder = new StreamsBuilder();
        //将demo3写入另一个Kafka toptic(test2) 类似于算子组成的图模型
        builder.stream("demo3").to("test2");
        //构建Topology对象
        final Topology topo = builder.build();
        //构建 kafka流 API实例 将算子以及操作的服务器配置到kafka流
        final KafkaStreams stream = new KafkaStreams(topo,prop);
        final CountDownLatch latch = new CountDownLatch(1);
        // 附加关闭处理程序来捕获control-c
        Runtime.getRuntime().addShutdownHook(new Thread("zj01"){
            @Override
            public void run(){
                stream.close();
                latch.countDown();
            }
        });
        try {
            stream.start();
            latch.await();
        }catch (InterruptedException e){
            //是非正常退出,就是说无论程序正在执行与否,都退出
            System.exit(1);
        }
        //正常退出,程序正常执行结束退出
        System.exit(0);
    }
}


三、Line Split


由于每个源流的记录都是一个String键入的键值对,让我们将值字符串视为文本行,并将其拆分为带有FlatMapValues运算符的单词:

builder.stream("demo3").flatMapValues(new ValueMapper<Object, Iterable<Object>>() {
            @Override
            public Iterable<Object> apply(Object s) {
                return Arrays.asList(s.toString().split(","));
            }
        });


完整的代码如下所示:

package com.njbdqn.services;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class MyStream {
    public static void main(String[] args) {
        Properties prop = new Properties();
        //程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"zj");
        //用于建立与Kafka集群的初始连接的主机/端口对的列表
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.137:9092");
        //记录键值对的默认序列化和反序列化库
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //定义Streams应用程序的计算逻辑,计算逻辑被定义为topology连接的处理器节点之一,构建流构建工具
        final StreamsBuilder builder = new StreamsBuilder();
        builder.stream("demo3").flatMapValues(new ValueMapper<Object, Iterable<Object>>() {
            @Override
            public Iterable<Object> apply(Object s) {
                return Arrays.asList(s.toString().split(","));
            }
        });
        //构建Topology对象
        final Topology topo = builder.build();
        //打印算子结果
        System.out.println(topo.describe().toString());
        //构建 kafka流 API实例 将算子以及操作的服务器配置到kafka流
        final KafkaStreams stream = new KafkaStreams(topo,prop);
        final CountDownLatch latch = new CountDownLatch(1);
        // 附加关闭处理程序来捕获control-c
        Runtime.getRuntime().addShutdownHook(new Thread("zj01"){
            @Override
            public void run(){
                stream.close();
                latch.countDown();
            }
        });
        try {
            stream.start();
            latch.await();
        }catch (InterruptedException e){
            //是非正常退出,就是说无论程序正在执行与否,都退出
            System.exit(1);
        }
        //正常退出,程序正常执行结束退出
        System.exit(0);
    }
}


四、单行映射成多行


20200211225834628.png


package com.njbdqn.services;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class MyStream {
    public static void main(String[] args) {
        Properties prop = new Properties();
        //程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"zj");
        //用于建立与Kafka集群的初始连接的主机/端口对的列表
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.122:9092");
        //记录键值对的默认序列化和反序列化库
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //定义Streams应用程序的计算逻辑,计算逻辑被定义为topology连接的处理器节点之一,构建流构建工具
        final StreamsBuilder builder = new StreamsBuilder();
        builder.stream("demo4").filter((k,v)->v.toString().split(",").length==2)
                .flatMap((k,v)->{
                    List<KeyValue<String,String>> keyValues = new ArrayList<>();
                    String[] info = v.toString().split(",");
                    String[] friends = info[1].split(" ");
                    for (String friend:friends){
                        keyValues.add(new KeyValue<String, String>(info[0].toString(),friend));
                    }
                    return keyValues;
                }).foreach(((k,v)-> System.out.println(k+"======="+v)));
        //构建Topology对象
        final Topology topo = builder.build();
        //打印算子结构
        // System.out.println(topo.describe().toString());
        //构建 kafka流 API实例 将算子以及操作的服务器配置到kafka流
        final KafkaStreams stream = new KafkaStreams(topo,prop);
        final CountDownLatch latch = new CountDownLatch(1);
        // 附加关闭处理程序来捕获
        Runtime.getRuntime().addShutdownHook(new Thread("zj01"){
            @Override
            public void run(){
                stream.close();
                latch.countDown();
            }
        });
        try {
            stream.start();
            latch.await();
        }catch (InterruptedException e){
            //是非正常退出,就是说无论程序正在执行与否,都退出
            System.exit(1);
        }
        //正常退出,程序正常执行结束退出
        System.exit(0);
    }
}


得出转换后的结果


20200211230012526.png

目录
相关文章
|
17天前
|
Java
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式。本文介绍了 Streams 的基本概念和使用方法,包括创建 Streams、中间操作和终端操作,并通过多个案例详细解析了过滤、映射、归并、排序、分组和并行处理等操作,帮助读者更好地理解和掌握这一重要特性。
25 2
|
21天前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
2月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
31 2
|
2月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
46 3
|
2月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
47 4
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
70 2
|
消息中间件 Java Kafka
Java实现Kafka生产者和消费者的示例
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。
408 0
Java实现Kafka生产者和消费者的示例
|
11天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
2天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
1天前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
19 1