1.stateless transformation
无状态的转换算子:流处理器不涉及状态的处理和存储
1.1 branch
分支 :将一个stream转换为1到多个Stream stream----->stream[]
//branch 分流 KStream<String, String>[] streams = kStream.branch((k, v) -> v.startsWith("A"), (k, v) -> v.startsWith("B"), (k, v) -> true); streams[0].foreach((k,v)-> System.out.println(k+" |" +v)); //遍历以A开头 streams[1].foreach((k,v)-> System.out.println(k+": "+v)); //遍历以B开头 streams[2].foreach((k,v)-> System.out.println(k+"||"+v)); //遍历其他
1.2 filter
过滤:将一个Stream经过boolean函数处理,保留符合条件的结果
//filter 过滤 保留record value为Hello开头的结果 kStream.filter((k,v) -> v.startsWith("Hello")).foreach((k,v) -> System.out.println(k+"\t"+v));
1.3 filterNot
翻转过滤:将一个Stream经过Boolean函数处理保留不符合条件的结果
//翻转过滤 保留不以Hello开头 KStream<String, String> stream = kStream.filterNot((k, v) -> v.startsWith("Hello")); stream.foreach((k,v)-> System.out.println(k+" :"+v));
1.4 flatMap
将一个record展开,产生0到多个record record—>record1,record2…
//flatMap展开 kStream.flatMap((k,v)->{ List<KeyValue<String, String>> keyValues = new ArrayList<>(); String[] words = v.split(" "); for (String word : words) { keyValues.add(new KeyValue<String, String>(k,word)); } return keyValues; }).foreach((k,v)-> System.out.println(k+" | "+v));
1.5 flatMapValues
将一条record变成多条record并且将多条记录展开
(k,v)–>(k,v1),(k,v2)…
//flatMapValues kStream.flatMapValues((v)-> Arrays.asList(v.split(" "))).foreach((k,v)-> System.out.println(k+" | "+v));
1.6 foreach
终止操作,为每一个record提供一种无状态的操作
.foreach((k,v) -> System.out.println(k+"\t"+v));
1.7 GroupByKey | GroupBy
GroupByKey:根据key进行分组
GroupBy:根据自定义的信息进行分组
kStream .flatMap((k, v) -> { String[] words = v.split(" "); List<KeyValue<String, String>> keyValues = new ArrayList<>(); for (String word : words) { keyValues.add(new KeyValue<String, String>(word, word)); } return keyValues; }) .groupByKey() .count() .toStream() .print(Printed.toSysOut()); //标准输出样式
1.8 map | mapValues
将一条record映射为另外一条record
kStream.map((k,v) -> new KeyValue<String,Long>(k,(long) v.length())).foreach((k,v) -> System.out.println(k +"\t"+v));
1.9 Merge
将两个流合并为一个
KStream<byte[], String> stream1 = ...; KStream<byte[], String> stream2 = ...; KStream<byte[], String> merged = stream1.merge(stream2);
1.10 Peek
作为程序执行的探针,一般用于debug调试,因为peek并不会对后续的流数据带来任何影响。
KStream<byte[], String> unmodifiedStream = stream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value));
1.11 Print
最终操作,将每一个record进行输出打印
stream.print(Printed.toSysOut()); stream.print(Printed.toFile("streams.out").withLabel("streams"));
1.12 SelectKey
修改记录中key(k,v)---->(newkey ,v)
KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
2.statful transformation
有状态的转换算子,处理器【Processor】在进行处理时需要更新状态或者从历史状态中恢复数据
2.1 Aggregate
聚合 有状态的转换算子
KTable<String, Long> kTable = kStream .flatMapValues(value -> Arrays.asList(value.split(" "))) .groupBy((k, v) -> v) // 第一参数:聚合的初始值 第二参数:聚合逻辑 第三个参数:【必须】指定状态存储的KV数据类型 .aggregate( ()-> 0L, (k,v,agg) -> 1L+agg, Materialized.<String,Long, KeyValueStore<Bytes,byte[]>>as("c152") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()));
2.2 Count
统计key相同的record出现的次数
// 指定状态存储的k v的结构类型 .count(Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("c158").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));
2.3 Reduce
规约 计算 有状态的转换算子
//Reducer KTable<String, Long> kTable = kStream .flatMapValues(value -> Arrays.asList(value.split(" "))) .map((String k, String v) -> new KeyValue<String, Long>(v, 1L)) .groupByKey(Grouped.with(Serdes.String(), Serdes.Long())) .reduce((v1, v2) -> v1 + v2, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("c152") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()));