(1)Map详解
完成对数据集Map端的转换,并行将每一条数据转换成新的一条数据,数据分区不发生变化
Java代码实现:
package com.aikfk.flink.dataset.transform; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 9:58 上午 */ public class MapJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * map */ DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() { @Override public String map(String line) throws Exception { return line.toUpperCase(); } }); mapSource.print(); /** * JAVA JAVA SPARK HIVE * HIVE JAVA JAVA SPARK * JAVA JAVA HADOOP */ } }
Scala代码实现:
package com.aikfk.flink.dataset.transform import org.apache.flink.api.scala.{ExecutionEnvironment,_} object MapScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment; val dataSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ).map(line => line.toUpperCase()) .print() } }
(2)FlatMap详解
将接入的每一条数据转换成多条数据输出,包括空值,比如我们前面所讲的行数据切割
Java代码实现:
package com.aikfk.flink.dataset.transform; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 8:26 下午 */ public class FlatMapJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * map */ DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() { @Override public String map(String line) throws Exception { return line.toUpperCase(); } }); /** * flatmap */ DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }); flatmapSource.print(); /** * (JAVA,1) * (JAVA,1) * (SPARK,1) * (HIVE,1) * .. */ } }
Scala代码实现:
package com.aikfk.flink.dataset.transform import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.util.Collector object FlatMapScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment; val dataSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ).map(line => line.toUpperCase) .flatMap((line : String,collector : Collector[(String,Int)]) => { // for (word <- line.split(" ")){ // collector.collect(word,1) // } (line.split(" ").foreach(word => collector.collect(word,1))) }) .print() } }
(3)Map优化之MapPartition详解
功能与Map相似,只是MapPartition操作是在DataSet中基于分区对数据进行处理,函数调用中会按照分区将数据通过Iterator的形式传入并返回任意数量的结果值,这是对Map的一个小优化.
Java代码实现:
package com.aikfk.flink.dataset.transform; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 8:48 下午 */ public class MapPartitionJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * mapPartition */ DataSet<String> mapPartition = dateSource.mapPartition(new MapPartitionFunction<String, String>() { @Override public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception { for (String line : iterable){ collector.collect(line.toUpperCase()); } } }); mapPartition.print(); /** * JAVA JAVA SPARK HIVE * HIVE JAVA JAVA SPARK * JAVA JAVA HADOOP */ } }
Scala代码实现:
package com.aikfk.flink.dataset.transform import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.util.Collector object MapPartitionScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment; val dataSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ) .mapPartition((iterator:Iterator[String] , collector : Collector[String]) => { for (line <- iterator){ collector.collect(line.toUpperCase) } }) .print() } }
(4)Filter详解
根据条件对出入数据进行过滤,当条件为True后,数据元素才会传输到下游的DataSet数据集中.
Java代码实现:
package com.aikfk.flink.dataset.transform; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 8:26 下午 */ public class FilterJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * map */ DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() { @Override public String map(String line) throws Exception { return line.toUpperCase(); } }); /** * flatmap */ DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }); /** * filter */ DataSet<Tuple2<String,Integer>> filterSource = flatmapSource.filter(new FilterFunction<Tuple2<String, Integer>>() { @Override public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return "SPARK".equals(stringIntegerTuple2.f0); } }); filterSource.print(); /** * (SPARK,1) * (SPARK,1) */ } }
Scala代码实现:
package com.aikfk.flink.dataset.transform import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.util.Collector object FilterScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment; val dataSource: Unit = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ).map(line => line.toUpperCase) .flatMap((line : String,collector : Collector[(String,Int)]) => { // for (word <- line.split(" ")){ // collector.collect(word,1) // } (line.split(" ").foreach(word => collector.collect(word,1))) }) .filter(tuple2 => (tuple2._1.equals("SPARK"))) .print() } }
(5)Reduce详解
通过两两合并,将数据集中的元素合并成一个元素,可以在整个数据集上使用。
reduce是读取一条一条的聚合
Java代码实现:
package com.aikfk.flink.dataset.transform; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 8:26 下午 */ public class ReduceJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * map */ DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() { @Override public String map(String line) throws Exception { return line.toUpperCase(); } }); /** * flatmap */ DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }); /** * reduce */ DataSet<Tuple2<String,Integer>> reduceSource = flatmapSource.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception { return new Tuple2<>(t1.f0,t1.f1 + t2.f1); } }); reduceSource.print(); /** * (HIVE,2) * (HADOOP,1) * (JAVA,6) * (SPARK,2) */ } }
Scala代码实现:
package com.aikfk.flink.dataset.transform import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.util.Collector object ReduceScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment; val dataSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ).map(line => line.toUpperCase) .flatMap((line : String,collector : Collector[(String,Int)]) => { // for (word <- line.split(" ")){ // collector.collect(word,1) // } (line.split(" ").foreach(word => collector.collect(word,1))) }) .groupBy(0) .reduce((x,y) => (x._1,x._2+y._2)) .print() } }
(6)ReduceGroup详解
GroupReduce算子应用在一个已经分组了的DataSet上,其会对每个分组都调用到用户定义的group-reduce函数。它与Reduce的区别在于用户定义的函数会立即获得整个组。
将一组元素合并成一个或者多个元素,可以在整个数据集上使用。这是对reduce程序的一个小优化。
reduceGroup是一次性读取,比如一次读取10个,然后统一的去做聚合
Java代码实现:
package com.aikfk.flink.dataset.transform; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 8:26 下午 */ public class ReduceGroupJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * map */ DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() { @Override public String map(String line) throws Exception { return line.toUpperCase(); } }); /** * flatmap */ DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }); /** * ReduceGroup */ DataSet<Tuple2<String,Integer>> reduceSource = flatmapSource.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception { String key = null; int count = 0; for (Tuple2<String,Integer> tuple2 : iterable){ key = tuple2.f0; count = count + tuple2.f1; } collector.collect(new Tuple2<>(key,count)); } }); reduceSource.print(); /** * (HIVE,2) * (HADOOP,1) * (JAVA,6) * (SPARK,2) */ } }
Scala代码实现:
package com.aikfk.flink.dataset.transform import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.util.Collector object ReduceGroupScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment; val dataSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ).map(line => line.toUpperCase) .flatMap((line : String,collector : Collector[(String,Int)]) => { // for (word <- line.split(" ")){ // collector.collect(word,1) // } (line.split(" ").foreach(word => collector.collect(word,1))) }) .groupBy(0) .reduceGroup((x => x reduce((x,y) => (x._1,x._2 + y._2)))) .print() } }
(7)ReduceGroup优化之CombineGroup详解
我们可以通过CombineGroup事先在每一台机器上进行聚合操作,再通过ReduceGroup将每台机器CombineGroup输出的结果进行聚合,这样的话,ReduceGroup需要汇总的数据量就少很多,从而加快计算的速度。
Java代码实现:
package com.aikfk.flink.dataset.transform; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 8:26 下午 */ public class CombineGroupJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * map */ DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() { @Override public String map(String line) throws Exception { return line.toUpperCase(); } }); /** * flatmap */ DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap( new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }); /** * CombineGroup */ DataSet<Tuple2<String,Integer>> combineGroupSource = flatmapSource.groupBy(0).combineGroup( new GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void combine(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception { String key = null; int count = 0; for (Tuple2<String,Integer> tuple2 : iterable){ key = tuple2.f0; count = count + tuple2.f1; } collector.collect(new Tuple2<>(key,count)); } }); /** * ReduceGroup */ DataSet<Tuple2<String,Integer>> reduceSource = combineGroupSource.groupBy(0).reduceGroup( new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception { String key = null; int count = 0; for (Tuple2<String,Integer> tuple2 : iterable){ key = tuple2.f0; count = count + tuple2.f1; } collector.collect(new Tuple2<>(key,count)); } }); reduceSource.print(); /** * (HIVE,2) * (HADOOP,1) * (JAVA,6) * (SPARK,2) */ } }
Scala代码实现:
package com.aikfk.flink.dataset.transform import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.util.Collector object CombineGroupScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment; val dataSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ).map(line => line.toUpperCase) .flatMap((line : String,collector : Collector[(String,Int)]) => { // for (word <- line.split(" ")){ // collector.collect(word,1) // } (line.split(" ").foreach(word => collector.collect(word,1))) }) .groupBy(0) .combineGroup((iterator,combine_collector : Collector[(String,Int)]) => { combine_collector.collect(iterator reduce((t1,t2) => (t1._1,t1._2 + t2._2))) }) .groupBy(0) .reduceGroup((x => x reduce((x,y) => (x._1,x._2 + y._2)))) .print() } }