(8)Aggregate详解
通过Aggregate Function将一组元素值合并成单个值,可以在整个DataSet数据集上使用。
Java代码实现:
package com.aikfk.flink.dataset.transform; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 8:26 下午 */ public class AggregateJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * map */ DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() { @Override public String map(String line) throws Exception { return line.toUpperCase(); } }); /** * flatmap */ DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }); /** * aggregate */ DataSet<Tuple2<String,Integer>> aggregateSource = flatmapSource.groupBy(0).aggregate(Aggregations.SUM,1); aggregateSource.print(); /** * (HIVE,2) * (HADOOP,1) * (JAVA,6) * (SPARK,2) */ } }
(9)Join详解
Join的几种方式:
根据指定的条件关联两个数据集,然后根据选择的字段形成一个数据集。关联的key可以通过Key表达式、Key-selector函数、字段位置以及Case Class字段指定。
1.对于两个Tuple类型的数据集可以通过字段位置进行关联,左边数据集的字段通过where方法指定,右边数据集的字段通过equalTo()方式指定
dataSet_1.join(dataSet_2).where(0).equalTo(0) where(左边的关联的字段).equal(右边关联的字段)
- 关联过程中指定自定义Join Funtion
dataSet_1.join(dataSet_2).where(0).equalTo(0){ (left,right) => (left.id , right.name) }
3.通过JoinWithTiny或者JoinWithHuge标识数据集的大小
dataSet_1.joinWithTiny(dataSet_2).where(0).equalTo(0)提示F1ink第二个数据集是小数据集 dataSet_1.joinWithHuge(dataSet_2).where(0).equalTo(0)提示Flink第二个数据集是大数据集
Join的优化:
Flink提供了Join算法提示,可以让F1ink更加灵活高效地执行Join操作
1.将第一个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况
dataSet_1.join(dataSet_2, JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0)
- 将第二个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况
dataSet_1.join(dataSet_2, JoinHint.BROADCAST_HASH_SECOND).where(0).equalTo(0)
- 和不设定Hint相同,将优化工作交给系统自动处理
dataSet_1.join(dataSet_2, JoinHint.OPTIMIZER_CHOOSES).where(0).equalTo(0)
4.将两个数据集重新分区,并将第一个数据集转换成HashTable存储,该策略适用于第一个数据集比第二个数据集小,但两个数据集相对都比较大的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_HASH_FIRST).where(0).equalTo(0)
- 将两个数据集重新分区,并将第二个数据集转换成HashTable存储,该策略适用于第二个数据集比第一个数据集小,但两个数据集相对都比较大的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_HASH_SECOND).where(0).equalTo(0)
- 将两个数据集重新分区,并将每个分区排序,该策略适用于两个数据集已经排好序的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_SORT_MEGER).where(0).equalTo(0)
join代码实现
数据源:
dept.csv
100,技术部 200,市场部 300,营销部 400,采购部
employee.csv
100,alex,15000 100,jack,34000 200,tony,5000 200,jone,6700 300,cherry,12000 100,lili,8000 400,lucy,7800
Java代码实现:
POJO类
package com.aikfk.flink.base; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 3:23 下午 */ public class EmployeePOJO { public String deptId; public String name; public int salary; public EmployeePOJO() { } public EmployeePOJO(String deptId, String name, int salary) { this.deptId = deptId; this.name = name; this.salary = salary; } @Override public String toString() { return "EmployeePOJO{" + "deptId='" + deptId + '\'' + ", name='" + name + '\'' + ", salary=" + salary + '}'; } }
package com.aikfk.flink.dataset.transform; import com.aikfk.flink.base.EmployeePOJO; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 3:13 下午 */ public class JoinJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取csv数据(方式二:映射成Tuple类,带有两个个字段) DataSet<Tuple2<String,String>> deptSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv") .types(String.class,String.class); deptSource.print(); /** * (200,市场部) * (100,技术部) * (400,采购部) * (300,营销部) */ // 读取csv数据(方式一:映射POJO类对象) DataSet<EmployeePOJO> employeeSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") .pojoType(EmployeePOJO.class,"deptId","name","salary"); employeeSource.print(); /** * EmployeePOJO{deptId='100', name='alex', salary=15000} * EmployeePOJO{deptId='200', name='jone', salary=6700} * EmployeePOJO{deptId='100', name='lili', salary=8000} * EmployeePOJO{deptId='400', name='lucy', salary=7800} * EmployeePOJO{deptId='300', name='cherry', salary=12000} * EmployeePOJO{deptId='200', name='tony', salary=5000} * EmployeePOJO{deptId='100', name='jack', salary=34000} */ /** * join() -> map() */ DataSet<Tuple3<String,String,String>> joinResult = deptSource.join(employeeSource).where(0).equalTo("deptId") .map(new MapFunction<Tuple2<Tuple2<String, String>, EmployeePOJO>, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(Tuple2<Tuple2<String, String>, EmployeePOJO> tuple2) throws Exception { return new Tuple3<>(tuple2.f0.f0,tuple2.f0.f1,tuple2.f1.name); } }); joinResult.print(); /** * (100,技术部,jack) * (100,技术部,alex) * (400,采购部,lucy) * (100,技术部,lili) * (300,营销部,cherry) * (200,市场部,jone) * (200,市场部,tony) */ } }
Scala代码实现:
package com.aikfk.flink.dataset.transform import org.apache.flink.api.scala.{ExecutionEnvironment, _} object JoinScala { case class employee(deptId:String,name:String,salary:Int) def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment; // 读取csv数据(方式二:映射成Tuple类,带有两个个字段) val deptSource = env.readCsvFile[(String,String)]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv") // 读取csv数据(方式一:映射POJO类对象) val employeeSource = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") // join -> map val joinResult = deptSource.join(employeeSource).where(0).equalTo("deptId") .map(tuple2 => (tuple2._1._1,tuple2._1._2,tuple2._2.name)) joinResult.print() /** * (400,采购部,lucy) * (100,技术部,jack) * (100,技术部,alex) * (100,技术部,lili) * (300,营销部,cherry) * (200,市场部,tony) * (200,市场部,jone) */ } }
(10)Union详解
合并两个DataSet数据集,两个数据集的数据元素格式必须相同,多个数据集可以连接合并。
dataSet_1.union(dataSet_2)
Union代码实现:
数据源:
employee.csv
100,alex,15000 100,jack,34000 200,tony,5000 200,jone,6700 300,cherry,12000 100,lili,8000 400,lucy,7800
employee2.csv
100,zhang,1400 100,li,3500 200,liu,6000 200,cai,6800 300,wang,13000 100,cao,8900 400,peng,7800
POJO类:
package com.aikfk.flink.base; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 3:23 下午 */ public class EmployeePOJO { public String deptId; public String name; public int salary; public EmployeePOJO() { } public EmployeePOJO(String deptId, String name, int salary) { this.deptId = deptId; this.name = name; this.salary = salary; } @Override public String toString() { return "EmployeePOJO{" + "deptId='" + deptId + '\'' + ", name='" + name + '\'' + ", salary=" + salary + '}'; } }
package com.aikfk.flink.base; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 7:40 下午 */ public class DeptSalaryPOJO { public String deptId; public int salary; public DeptSalaryPOJO() { } public DeptSalaryPOJO(String deptId, int salary) { this.deptId = deptId; this.salary = salary; } @Override public String toString() { return "DeptSalaryPOJO{" + "deptId='" + deptId + '\'' + ", salary=" + salary + '}'; } }
package com.aikfk.flink.base; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 7:57 下午 */ public class DeptPOJO { public String deptId; public String name; public DeptPOJO() { } public DeptPOJO(String deptId, String name) { this.deptId = deptId; this.name = name; } @Override public String toString() { return "DeptPOJO{" + "deptId='" + deptId + '\'' + ", name='" + name + '\'' + '}'; } }
Java代码实现:
package com.aikfk.flink.dataset.transform; import com.aikfk.flink.base.DeptPOJO; import com.aikfk.flink.base.DeptSalaryPOJO; import com.aikfk.flink.base.EmployeePOJO; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 7:36 下午 */ public class UnionJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取csv数据(方式一:映射POJO类对象) DataSet<EmployeePOJO> employeeSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") .pojoType(EmployeePOJO.class,"deptId","name","salary"); // 读取csv数据(方式一:映射POJO类对象) DataSet<EmployeePOJO> employee2Source = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee2.csv") .pojoType(EmployeePOJO.class,"deptId","name","salary"); // 读取csv数据(方式一:映射POJO类对象) DataSet<DeptPOJO> deptSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv") .pojoType(DeptPOJO.class,"deptId","name"); /** * union() -> map() -> groupBy() -> reduce() */ DataSet<DeptSalaryPOJO> unionResult = employeeSource.union(employee2Source) .map(new MapFunction<EmployeePOJO, DeptSalaryPOJO>() { @Override public DeptSalaryPOJO map(EmployeePOJO employeePOJO) throws Exception { return new DeptSalaryPOJO(employeePOJO.deptId,employeePOJO.salary); } }) .groupBy("deptId") .reduce(new ReduceFunction<DeptSalaryPOJO>() { @Override public DeptSalaryPOJO reduce(DeptSalaryPOJO t1, DeptSalaryPOJO t2) throws Exception { return new DeptSalaryPOJO(t1.deptId,t1.salary + t2.salary); } }); unionResult.print(); /** * DeptSalaryPOJO{deptId='100', salary=70800} * DeptSalaryPOJO{deptId='400', salary=15600} * DeptSalaryPOJO{deptId='300', salary=25000} * DeptSalaryPOJO{deptId='200', salary=24500} */ /** * join() -> map() */ DataSet<Tuple3<String, String, Integer>> joinResult = unionResult.join(deptSource).where("deptId").equalTo("deptId") .map(new MapFunction<Tuple2<DeptSalaryPOJO, DeptPOJO>, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(Tuple2<DeptSalaryPOJO, DeptPOJO> tuple2) throws Exception { return new Tuple3<>(tuple2.f0.deptId,tuple2.f1.name,tuple2.f0.salary); } }); joinResult.print(); /** * (100,技术部,70800) * (400,采购部,15600) * (300,营销部,25000) * (200,市场部,24500) */ } }
Scala代码实现:
package com.aikfk.flink.dataset.transform import org.apache.flink.api.scala.{ExecutionEnvironment, _} object UnionScala { case class employee(deptId:String,name:String,salary:Int) def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment; // 读取csv数据(方式一:映射POJO类对象) val employeeSource = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") // 读取csv数据(方式一:映射POJO类对象) val employeeSource2 = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee2.csv") // 读取csv数据(方式二:映射成Tuple类,带有两个个字段) val deptSource = env.readCsvFile[(String,String)]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv") /** * union() -> map() -> groupBy() -> reduce() */ val unionResult = employeeSource.union(employeeSource2) .map(emp => (emp.deptId,emp.salary)) .groupBy(0) .reduce((x ,y) => (x._1,x._2 + y._2)) unionResult.print() /** * (100,70800) * (400,15600) * (300,25000) * (200,24500) */ /** * join() -> map() */ val joinResult = unionResult.join(deptSource).where(0).equalTo(0) .map(tuple => (tuple._1._1,tuple._2._2,tuple._1._2) ) joinResult.print() /** * (400,采购部,15600) * (100,技术部,70800) * (300,营销部,25000) * (200,市场部,24500) */ } }