(1)Map详解
调用用户定义的MapFunction对DataStream数据进行处理,形成新的DataStream,其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。
Map[DataStream -> DataStream]
DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } });
(2)FlatMap详解
处理输入一个元素产生一个或者多个元素的计算场景。
FlatMap[DataStream -> DataStream]
dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });
(3)Filter详解
处理输入一个元素产生一个或者多个元素的计算场景。
Filter[DataStream -> DataStream]
dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } });
(4)KeyBy详解
将数据集中相同的Key值的数据放置在相同的分区中,也就是对数据集执行Partition操作
KeyBy[DataStream -> KeyedStream
注意:两种情况不能使用KeyBy方法对数据集进行重新分区
- 数据集类型为P0J0s类型
- 数据集类型为数组结构
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey" dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
(5)Reduce详解
对数据集滚动进行聚合处理,其中定义的ReduceFuction必须满足运算结合律和交换律。
Reduce[KeyedStream -> DataStream]
keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });
(6)Aggregations详解
Aggregations是DataStream接口提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果。
Aggregations[KeyedStream -> DataStream]
keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key");
以上Transform我统一写一个wordcount:
package com.aikfk.flink.datastream.transform; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/11 1:22 下午 */ public class Transform { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStreamSource = env.socketTextStream("bigdata-pro-m07",9999); /** * map() */ DataStream<Tuple2<String,String>> mapResult = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String line) throws Exception { String[] word = line.split(" "); return new Tuple2<>(word[0],word[1]); } }); /** * flatmap() */ DataStream<Tuple2<String, Integer>> flatmapResult = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }); /** * filter() -> keyBy() -> reduce() */ DataStream<Tuple2<String,Integer>> result = flatmapResult.filter(new FilterFunction<Tuple2<String, Integer>>() { @Override public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { String saprk = "spark"; return !stringIntegerTuple2.f0.equals(saprk); } }) .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2.f0; } }) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception { return new Tuple2<>(t1.f0,t1.f1 + t2.f1); } }); result.print(); env.execute("stream"); } }
(7)Union详解
Union主要是将两个或者两个以上的数据集合并成一个数据集,需要保证两个数据集的格式一致。
Union[DataStream -> DataStream]
package com.aikfk.flink.datastream.transform; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/11 1:22 下午 */ public class UnionJava { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStreamSource1 = env.socketTextStream("bigdata-pro-m07",9999); DataStream<String> dataStreamSource2 = env.socketTextStream("bigdata-pro-m07",9998); DataStream<String> dataStreamSource = dataStreamSource1.union(dataStreamSource2); /** * flatmap() */ DataStream<Tuple2<String, Integer>> flatmapResult = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }); /** * filter() -> keyBy() -> reduce() */ DataStream<Tuple2<String,Integer>> result = flatmapResult.filter(new FilterFunction<Tuple2<String, Integer>>() { @Override public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { String saprk = "spark"; return !stringIntegerTuple2.f0.equals(saprk); } }) .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2.f0; } }) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception { return new Tuple2<>(t1.f0,t1.f1 + t2.f1); } }); result.print(); env.execute("stream"); } }
(8)Connect详解
Connnect主要是为了合并两种或者多种不同数据类型的数据集,合并会保留原来的数据集的数据类型。
Connect[KeyedStream -> DataStream]
package com.aikfk.flink.datastream.transform; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; /** * @author :caizhengjie * @description:TODO * @date :2021/3/11 1:22 下午 */ public class ConnectJava { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Integer>> dataStreamSource1 = env.fromElements(new Tuple2<>("spark",1),new Tuple2<>("java",3)); DataStream<Tuple2<String,Integer>> dataStreamSource2 = env.fromElements(new Tuple2<>("hive",2),new Tuple2<>("hadoop",5)); /** * connect() */ ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectedStreams = dataStreamSource1 .connect(dataStreamSource2).keyBy(0,0); DataStream<Tuple2<String,Integer>> mapResult = connectedStreams.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map1(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<>(stringIntegerTuple2.f0, stringIntegerTuple2.f1 + 10); } @Override public Tuple2<String, Integer> map2(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2; } }); mapResult.print(); env.execute("stream"); } }
(9)Side Out详解
Side Outs提供了根据条件对DataStream数据集进行拆分,原来是Split算子可以提供这个功能,但在Flink的后续版本中已经不推荐使用Split算子了。
SideOut[DataStream -> SingleOutputStreamOperator -> DataStream]
package com.aikfk.flink.datastream.transform; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; /** * @author :caizhengjie * @description:TODO * @date :2021/3/11 1:22 下午 */ public class SideOut { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); OutputTag<Tuple2<String,Integer>> outputTag = new OutputTag<Tuple2<String,Integer>>("side-output"){}; DataStream<Tuple2<String,Integer>> dataStreamSource = env.fromElements( new Tuple2<>("alex",11000), new Tuple2<>("lili",3200), new Tuple2<>("lucy",3400), new Tuple2<>("pony",13000), new Tuple2<>("tony",33000), new Tuple2<>("herry",4500), new Tuple2<>("cherry",9000), new Tuple2<>("jack",13450) ); /** * mainDataStream为拆分出薪资小于10000的数据集 */ SingleOutputStreamOperator<Tuple2<String,Integer>> mainDataStream = dataStreamSource .process(new ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void processElement(Tuple2<String, Integer> stringIntegerTuple2, Context context, Collector<Tuple2<String, Integer>> collector) throws Exception { if (stringIntegerTuple2.f1 > 10000){ context.output(outputTag,stringIntegerTuple2); } else { collector.collect(stringIntegerTuple2); } } }); /** * sideOutputStream为拆分出薪资大于10000的数据集 */ DataStream<Tuple2<String,Integer>> sideOutputStream = mainDataStream.getSideOutput(outputTag); sideOutputStream.print(); /** * 6> (tony,33000) * 5> (pony,13000) * 2> (alex,11000) * 9> (jack,13450) */ mainDataStream.print(); /** * 13> (herry,4500) * 10> (lucy,3400) * 9> (lili,3200) * 14> (cherry,9000) */ env.execute("stream"); } }
(10)Iterate详解
Iterate算子适合迭代计算场景,通过每一次的迭代计算,并将计算结果反馈到下一次迭代计算中。
SideOut[DataStream -> IterativeStream -> DataStream]
IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value <= 0; } });
(11)分区详解
自定义分区:
通过实现partitonCustom()方法对数据集创建自定义分区。
自定义分区[DataStream -> DataStream]
dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0);
package com.aikfk.flink.datastream.transform; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/12 7:38 下午 */ public class PartitionCustom { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Integer>> dataStream = env.socketTextStream("bigdata-pro-m07",9999) .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }); /** * 自定义分区 */ dataStream.partitionCustom(new Partitioner<String>() { @Override public int partition(String key, int numPartitions) { int partition = key.hashCode() % numPartitions; System.out.println("key: " + key + " partition: " + partition + " numPartitions: " + numPartitions); return partition; } }, new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2.f0; } }); env.execute("partition"); } }
key: java partition: 2 numPartitions: 16 key: java partition: 2 numPartitions: 16 key: hive partition: 0 numPartitions: 16 key: hive partition: 0 numPartitions: 16 key: hbase partition: 9 numPartitions: 16
Random分区:
通过随机的方式对数据集进行分区,分区相对比较平衡。
Random分区[DataStream -> DataStream]
dataStream.shuffle();
Rebalance分区:
通过轮训的方式对数据集进行分区,分区相对比较平衡。
Rebalance分区[DataStream -> DataStream]
dataStream.rebalance();