【Kafka】(四)Kafka Streams 转换算子详解1

简介: 【Kafka】(四)Kafka Streams 转换算子详解1

1.stateless transformation


无状态的转换算子:流处理器不涉及状态的处理和存储


1.1 branch


分支 :将一个stream转换为1到多个Stream stream----->stream[]

//branch 分流
        KStream<String, String>[] streams = kStream.branch((k, v) -> v.startsWith("A"), (k, v) -> v.startsWith("B"), (k, v) -> true);
        streams[0].foreach((k,v)-> System.out.println(k+" |" +v)); //遍历以A开头
        streams[1].foreach((k,v)-> System.out.println(k+": "+v)); //遍历以B开头
        streams[2].foreach((k,v)-> System.out.println(k+"||"+v)); //遍历其他


1.2 filter


过滤:将一个Stream经过boolean函数处理,保留符合条件的结果

//filter 过滤 保留record value为Hello开头的结果
 kStream.filter((k,v) -> v.startsWith("Hello")).foreach((k,v) -> System.out.println(k+"\t"+v));


1.3 filterNot


翻转过滤:将一个Stream经过Boolean函数处理保留不符合条件的结果

//翻转过滤 保留不以Hello开头
        KStream<String, String> stream = kStream.filterNot((k, v) -> v.startsWith("Hello"));
        stream.foreach((k,v)-> System.out.println(k+" :"+v));


1.4 flatMap


将一个record展开,产生0到多个record record—>record1,record2…

//flatMap展开
        kStream.flatMap((k,v)->{
          List<KeyValue<String, String>> keyValues = new ArrayList<>();
            String[] words = v.split(" ");
            for (String word : words) {
                keyValues.add(new KeyValue<String, String>(k,word));
            }
            return keyValues;
        }).foreach((k,v)-> System.out.println(k+" | "+v));


1.5 flatMapValues


将一条record变成多条record并且将多条记录展开

(k,v)–>(k,v1),(k,v2)…

//flatMapValues
   kStream.flatMapValues((v)-> Arrays.asList(v.split(" "))).foreach((k,v)-> System.out.println(k+" | "+v));


1.6 foreach


终止操作,为每一个record提供一种无状态的操作

.foreach((k,v) -> System.out.println(k+"\t"+v));


1.7 GroupByKey | GroupBy


GroupByKey:根据key进行分组

GroupBy:根据自定义的信息进行分组

kStream
                .flatMap((k, v) -> {
                    String[] words = v.split(" ");
                    List<KeyValue<String, String>> keyValues = new ArrayList<>();
                    for (String word : words) {
                        keyValues.add(new KeyValue<String, String>(word, word));
                    }
                    return keyValues;
                })
                .groupByKey()
                .count()
                .toStream()
                .print(Printed.toSysOut()); //标准输出样式


1.8 map | mapValues


将一条record映射为另外一条record

kStream.map((k,v) -> new KeyValue<String,Long>(k,(long) v.length())).foreach((k,v) -> System.out.println(k +"\t"+v));


1.9 Merge


将两个流合并为一个

KStream<byte[], String> stream1 = ...;
KStream<byte[], String> stream2 = ...;
KStream<byte[], String> merged = stream1.merge(stream2);


1.10 Peek


作为程序执行的探针,一般用于debug调试,因为peek并不会对后续的流数据带来任何影响。

KStream<byte[], String> unmodifiedStream = stream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value));


1.11 Print


最终操作,将每一个record进行输出打印

stream.print(Printed.toSysOut());
stream.print(Printed.toFile("streams.out").withLabel("streams"));


1.12 SelectKey


修改记录中key(k,v)---->(newkey ,v)

KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])


2.statful transformation


有状态的转换算子,处理器【Processor】在进行处理时需要更新状态或者从历史状态中恢复数据


aHR0cDovL2thZmthLmFwYWNoZS5vcmcvMjMvaW1hZ2VzL3N0cmVhbXMtc3RhdGVmdWxfb3BlcmF0aW9ucy5wbmc.png


2.1 Aggregate


聚合 有状态的转换算子

KTable<String, Long> kTable = kStream
                .flatMapValues(value -> Arrays.asList(value.split(" ")))
                .groupBy((k, v) -> v)
                // 第一参数:聚合的初始值  第二参数:聚合逻辑  第三个参数:【必须】指定状态存储的KV数据类型
                .aggregate(
                        ()-> 0L,
                        (k,v,agg) -> 1L+agg,
                        Materialized.<String,Long, KeyValueStore<Bytes,byte[]>>as("c152")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long()));


2.2 Count


统计key相同的record出现的次数

// 指定状态存储的k v的结构类型
.count(Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("c158").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));


2.3 Reduce


规约 计算 有状态的转换算子

//Reducer
        KTable<String, Long> kTable = kStream
                .flatMapValues(value ->
                        Arrays.asList(value.split(" ")))
                .map((String k, String v) -> new KeyValue<String, Long>(v, 1L))
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
                .reduce((v1, v2) -> v1 + v2, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("c152")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()));



目录
相关文章
|
消息中间件 关系型数据库 MySQL
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
55 0
|
6月前
|
消息中间件 负载均衡 Kafka
一文读懂Kafka API:Producer、Consumer和Streams全解析
大家好,今天我们将深入探讨Kafka的三大核心API。通过这篇文章,你将了解如何使用Producer API发布记录流,利用Consumer API订阅和处理数据,以及通过Streams API实现复杂的流处理。一起开启Kafka的探索之旅吧!
203 2
|
5月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成
|
5月前
|
消息中间件 Java Kafka
Java中的流处理框架:Kafka Streams与Flink
Java中的流处理框架:Kafka Streams与Flink
|
6月前
|
消息中间件 Java Kafka
教程:Spring Boot集成Kafka Streams流处理框架
教程:Spring Boot集成Kafka Streams流处理框架
|
7月前
|
消息中间件 存储 监控
Kafka Streams:深度探索实时流处理应用程序
Apache Kafka Streams 是一款强大的实时流处理库,为构建实时数据处理应用提供了灵活且高性能的解决方案。本文将深入探讨 Kafka Streams 的核心概念、详细原理,并提供更加丰富的示例代码,以帮助大家深入理解和应用这一流处理框架。
|
消息中间件 数据采集 SQL
【Kafka】(二十四)轻量级流计算 Kafka Streams 实践总结
【Kafka】(二十四)轻量级流计算 Kafka Streams 实践总结
764 0
【Kafka】(二十四)轻量级流计算 Kafka Streams 实践总结
|
消息中间件 jstorm 分布式计算
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!2
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!2
466 0
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!2