Flink之DataSet转换操作(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(8)Aggregate详解


通过Aggregate Function将一组元素值合并成单个值,可以在整个DataSet数据集上使用。

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class AggregateJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * aggregate
         */
        DataSet<Tuple2<String,Integer>> aggregateSource =  flatmapSource.groupBy(0).aggregate(Aggregations.SUM,1);
        aggregateSource.print();
        /**
         * (HIVE,2)
         * (HADOOP,1)
         * (JAVA,6)
         * (SPARK,2)
         */
    }
}


(9)Join详解


Join的几种方式:


根据指定的条件关联两个数据集,然后根据选择的字段形成一个数据集。关联的key可以通过Key表达式、Key-selector函数、字段位置以及Case Class字段指定。


1.对于两个Tuple类型的数据集可以通过字段位置进行关联,左边数据集的字段通过where方法指定,右边数据集的字段通过equalTo()方式指定

dataSet_1.join(dataSet_2).where(0).equalTo(0)
where(左边的关联的字段).equal(右边关联的字段)
  1. 关联过程中指定自定义Join Funtion
dataSet_1.join(dataSet_2).where(0).equalTo(0){
  (left,right) => (left.id , right.name)
}


3.通过JoinWithTiny或者JoinWithHuge标识数据集的大小

dataSet_1.joinWithTiny(dataSet_2).where(0).equalTo(0)提示F1ink第二个数据集是小数据集
dataSet_1.joinWithHuge(dataSet_2).where(0).equalTo(0)提示Flink第二个数据集是大数据集

Join的优化:

Flink提供了Join算法提示,可以让F1ink更加灵活高效地执行Join操作


1.将第一个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况

dataSet_1.join(dataSet_2, JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0)
  1. 将第二个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况
dataSet_1.join(dataSet_2, JoinHint.BROADCAST_HASH_SECOND).where(0).equalTo(0)
  1. 和不设定Hint相同,将优化工作交给系统自动处理
dataSet_1.join(dataSet_2, JoinHint.OPTIMIZER_CHOOSES).where(0).equalTo(0)

4.将两个数据集重新分区,并将第一个数据集转换成HashTable存储,该策略适用于第一个数据集比第二个数据集小,但两个数据集相对都比较大的情况

dataSet_1.join(dataSet_2, JoinHint.PARTITION_HASH_FIRST).where(0).equalTo(0)
  1. 将两个数据集重新分区,并将第二个数据集转换成HashTable存储,该策略适用于第二个数据集比第一个数据集小,但两个数据集相对都比较大的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_HASH_SECOND).where(0).equalTo(0)
  1. 将两个数据集重新分区,并将每个分区排序,该策略适用于两个数据集已经排好序的情况
dataSet_1.join(dataSet_2, JoinHint.PARTITION_SORT_MEGER).where(0).equalTo(0)

join代码实现

数据源:

dept.csv

100,技术部
200,市场部
300,营销部
400,采购部

employee.csv

100,alex,15000
100,jack,34000
200,tony,5000
200,jone,6700
300,cherry,12000
100,lili,8000
400,lucy,7800

Java代码实现:

POJO类

package com.aikfk.flink.base;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/8 3:23 下午
 */
public class EmployeePOJO {
    public String deptId;
    public String name;
    public int salary;
    public EmployeePOJO() {
    }
    public EmployeePOJO(String deptId, String name, int salary) {
        this.deptId = deptId;
        this.name = name;
        this.salary = salary;
    }
    @Override
    public String toString() {
        return "EmployeePOJO{" +
                "deptId='" + deptId + '\'' +
                ", name='" + name + '\'' +
                ", salary=" + salary +
                '}';
    }
}
package com.aikfk.flink.dataset.transform;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/8 3:13 下午
 */
public class JoinJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式二:映射成Tuple类,带有两个个字段)
        DataSet<Tuple2<String,String>> deptSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv")
                .types(String.class,String.class);
        deptSource.print();
        /**
         * (200,市场部)
         * (100,技术部)
         * (400,采购部)
         * (300,营销部)
         */
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<EmployeePOJO> employeeSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .pojoType(EmployeePOJO.class,"deptId","name","salary");
        employeeSource.print();
        /**
         * EmployeePOJO{deptId='100', name='alex', salary=15000}
         * EmployeePOJO{deptId='200', name='jone', salary=6700}
         * EmployeePOJO{deptId='100', name='lili', salary=8000}
         * EmployeePOJO{deptId='400', name='lucy', salary=7800}
         * EmployeePOJO{deptId='300', name='cherry', salary=12000}
         * EmployeePOJO{deptId='200', name='tony', salary=5000}
         * EmployeePOJO{deptId='100', name='jack', salary=34000}
         */
        /**
         * join() -> map()
         */
        DataSet<Tuple3<String,String,String>> joinResult = deptSource.join(employeeSource).where(0).equalTo("deptId")
                .map(new MapFunction<Tuple2<Tuple2<String, String>, EmployeePOJO>, Tuple3<String, String, String>>() {
                    @Override
                    public Tuple3<String, String, String> map(Tuple2<Tuple2<String, String>, EmployeePOJO> tuple2) throws Exception {
                        return new Tuple3<>(tuple2.f0.f0,tuple2.f0.f1,tuple2.f1.name);
                    }
                });
        joinResult.print();
        /**
         * (100,技术部,jack)
         * (100,技术部,alex)
         * (400,采购部,lucy)
         * (100,技术部,lili)
         * (300,营销部,cherry)
         * (200,市场部,jone)
         * (200,市场部,tony)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object JoinScala {
  case class employee(deptId:String,name:String,salary:Int)
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    // 读取csv数据(方式二:映射成Tuple类,带有两个个字段)
    val deptSource = env.readCsvFile[(String,String)]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv")
    // 读取csv数据(方式一:映射POJO类对象)
    val employeeSource = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
    // join -> map
    val joinResult = deptSource.join(employeeSource).where(0).equalTo("deptId")
      .map(tuple2 => (tuple2._1._1,tuple2._1._2,tuple2._2.name))
    joinResult.print()
    /**
     * (400,采购部,lucy)
     * (100,技术部,jack)
     * (100,技术部,alex)
     * (100,技术部,lili)
     * (300,营销部,cherry)
     * (200,市场部,tony)
     * (200,市场部,jone)
     */
  }
}


(10)Union详解


合并两个DataSet数据集,两个数据集的数据元素格式必须相同,多个数据集可以连接合并。

dataSet_1.union(dataSet_2)

Union代码实现:

数据源:

employee.csv

100,alex,15000
100,jack,34000
200,tony,5000
200,jone,6700
300,cherry,12000
100,lili,8000
400,lucy,7800

employee2.csv

100,zhang,1400
100,li,3500
200,liu,6000
200,cai,6800
300,wang,13000
100,cao,8900
400,peng,7800

POJO类:

package com.aikfk.flink.base;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/8 3:23 下午
 */
public class EmployeePOJO {
    public String deptId;
    public String name;
    public int salary;
    public EmployeePOJO() {
    }
    public EmployeePOJO(String deptId, String name, int salary) {
        this.deptId = deptId;
        this.name = name;
        this.salary = salary;
    }
    @Override
    public String toString() {
        return "EmployeePOJO{" +
                "deptId='" + deptId + '\'' +
                ", name='" + name + '\'' +
                ", salary=" + salary +
                '}';
    }
}
package com.aikfk.flink.base;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/8 7:40 下午
 */
public class DeptSalaryPOJO {
    public String deptId;
    public int salary;
    public DeptSalaryPOJO() {
    }
    public DeptSalaryPOJO(String deptId, int salary) {
        this.deptId = deptId;
        this.salary = salary;
    }
    @Override
    public String toString() {
        return "DeptSalaryPOJO{" +
                "deptId='" + deptId + '\'' +
                ", salary=" + salary +
                '}';
    }
}
package com.aikfk.flink.base;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/8 7:57 下午
 */
public class DeptPOJO {
    public String deptId;
    public String name;
    public DeptPOJO() {
    }
    public DeptPOJO(String deptId, String name) {
        this.deptId = deptId;
        this.name = name;
    }
    @Override
    public String toString() {
        return "DeptPOJO{" +
                "deptId='" + deptId + '\'' +
                ", name='" + name + '\'' +
                '}';
    }
}

Java代码实现:

package com.aikfk.flink.dataset.transform;
import com.aikfk.flink.base.DeptPOJO;
import com.aikfk.flink.base.DeptSalaryPOJO;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/8 7:36 下午
 */
public class UnionJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<EmployeePOJO> employeeSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .pojoType(EmployeePOJO.class,"deptId","name","salary");
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<EmployeePOJO> employee2Source = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee2.csv")
                .pojoType(EmployeePOJO.class,"deptId","name","salary");
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<DeptPOJO> deptSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv")
                .pojoType(DeptPOJO.class,"deptId","name");
        /**
         * union() -> map() -> groupBy() -> reduce()
         */
        DataSet<DeptSalaryPOJO> unionResult = employeeSource.union(employee2Source)
                .map(new MapFunction<EmployeePOJO, DeptSalaryPOJO>() {
                    @Override
                    public DeptSalaryPOJO map(EmployeePOJO employeePOJO) throws Exception {
                        return new DeptSalaryPOJO(employeePOJO.deptId,employeePOJO.salary);
                    }
                })
                .groupBy("deptId")
                .reduce(new ReduceFunction<DeptSalaryPOJO>() {
                    @Override
                    public DeptSalaryPOJO reduce(DeptSalaryPOJO t1, DeptSalaryPOJO t2) throws Exception {
                        return new DeptSalaryPOJO(t1.deptId,t1.salary + t2.salary);
                    }
                });
        unionResult.print();
        /**
         * DeptSalaryPOJO{deptId='100', salary=70800}
         * DeptSalaryPOJO{deptId='400', salary=15600}
         * DeptSalaryPOJO{deptId='300', salary=25000}
         * DeptSalaryPOJO{deptId='200', salary=24500}
         */
        /**
         * join() -> map()
         */
        DataSet<Tuple3<String, String, Integer>> joinResult = unionResult.join(deptSource).where("deptId").equalTo("deptId")
            .map(new MapFunction<Tuple2<DeptSalaryPOJO, DeptPOJO>, Tuple3<String, String, Integer>>() {
                @Override
                public Tuple3<String, String, Integer> map(Tuple2<DeptSalaryPOJO, DeptPOJO> tuple2) throws Exception {
                    return new Tuple3<>(tuple2.f0.deptId,tuple2.f1.name,tuple2.f0.salary);
                }
            });
        joinResult.print();
        /**
         * (100,技术部,70800)
         * (400,采购部,15600)
         * (300,营销部,25000)
         * (200,市场部,24500)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object UnionScala {
  case class employee(deptId:String,name:String,salary:Int)
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    // 读取csv数据(方式一:映射POJO类对象)
    val employeeSource = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
    // 读取csv数据(方式一:映射POJO类对象)
    val employeeSource2 = env.readCsvFile[employee]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee2.csv")
    // 读取csv数据(方式二:映射成Tuple类,带有两个个字段)
    val deptSource = env.readCsvFile[(String,String)]("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv")
    /**
     * union() -> map() -> groupBy() -> reduce()
     */
    val unionResult = employeeSource.union(employeeSource2)
      .map(emp => (emp.deptId,emp.salary))
      .groupBy(0)
      .reduce((x ,y) => (x._1,x._2 + y._2))
    unionResult.print()
    /**
     * (100,70800)
     * (400,15600)
     * (300,25000)
     * (200,24500)
     */
    /**
     * join() -> map()
     */
    val joinResult = unionResult.join(deptSource).where(0).equalTo(0)
      .map(tuple => (tuple._1._1,tuple._2._2,tuple._1._2) )
    joinResult.print()
    /**
     * (400,采购部,15600)
     * (100,技术部,70800)
     * (300,营销部,25000)
     * (200,市场部,24500)
     */
  }
}




相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
23 3
|
3月前
|
流计算
在Flink中,如果需要进行split和where操作
【2月更文挑战第6天】在Flink中,如果需要进行split和where操作
19 1
|
5月前
|
数据处理 数据库 流计算
Flink 操作mapper、sink解析
Flink 操作mapper、sink解析
27 0
|
7月前
|
分布式计算 Hadoop 大数据
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
|
3天前
|
机器学习/深度学习 人工智能 流计算
人工智能平台PAI 操作报错合集之在集群上提交了包含alink相关功能的flink任务,但是却报错如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
7天前
|
Apache 流计算 开发者
[AIGC] Flink中的Max和Reduce操作:区别及使用场景
[AIGC] Flink中的Max和Reduce操作:区别及使用场景
|
3月前
|
Oracle 关系型数据库 MySQL
Flink CDC数据同步问题之丢失update操作如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
4月前
|
SQL Java 数据库连接
这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
【1月更文挑战第17天】【1月更文挑战第85篇】这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
23 8
|
4月前
|
消息中间件 Java Kafka
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
36 0
|
4月前
|
监控 Java 流计算
Flink中的窗口操作是什么?请解释其作用和使用场景。
Flink中的窗口操作是什么?请解释其作用和使用场景。
27 0