(1)map:输入一个元素,输出一个元素,可以用来做一些清洗,转换工作。DataStream → DataStream
val streamMap = stream.map { x => x * 2 }
(2)flatMap:和Map相似,可以理解为将输入的元素压平,从而对输出结果的数量不做要求,可以为0、1或者多个,多用于拆分操作。DataStream → DataStream
val streamFlatMap = stream.flatMap{ x => x.split(" ") }
(3)filter:过滤筛选,将所有符合判断条件的结果集输出,DataStream → DataStream
val streamFilter = stream.filter{ x => x > 1 }
(4)KeyBy:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的,返回KeyedStream。DataStream -> KeyedStream
注意:以下类型无法作为key①POJO类,且没有实现hashCode函数②任意形式的数组类型
dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple
(5)滚动聚合算子(Rolling Aggregation)
对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果,常见的有sum(),min(),max(),minBy(),maxBy()等,KeyedStream → DataStream
min(),max(), minBy(),maxBy()这些算子可以针对KeyedStream的每一个支流做聚合
keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key")
min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含的最小值的元素(同样元原理适用于max和maxBy)
(6)fold:用一个初始的一个值,与其每个元素进行滚动合并操作。KeyedStream → DataStream
val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })
当应用于序列(1,2,3,4,5)时,发出序列“start-1”、“start-1-2”、“start-1-2”,…
(6)reduce:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。KeyedStream → DataStream
case class WC(val word: String, val count: Int)
val wordCounts = stream.groupBy("word").reduce { (w1, w2) => new WC(w1.word, w1.count + w2.count) }
(7) Split 和 Select
Split :根据某些特征把一个DataStream拆分成两个或者多个DataStream。DataStream → SplitStream
Select:从一个SplitStream中获取一个或者多个DataStream。SplitStream→DataStream
val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } )
val even = split select "even" val odd = split select "odd" val all = split.select("even","odd")
(8) Connect和 CoMap、CoFlatMap
Connect:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据形式不发生任何变化,两个流相互独立。DataStream,DataStream → ConnectedStreams
CoMap,CoFlatMap:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。ConnectedStreams → DataStream
someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream)
connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false )
Connect与 Union 区别:
①Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
② Connect只能操作两个流,Union可以操作多个。
(9)iterate
在流程中创建一个反馈循环,将一个操作的输出重定向到之前的操作。DataStream --> IterativeStream --> DataStream
initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } }
(10)extract timestamps
提取记录中的时间戳来跟需要事件时间的window一起发挥作用。DataStream --> DataStream
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。