Flink之DataSet转换操作(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Map详解


完成对数据集Map端的转换,并行将每一条数据转换成新的一条数据,数据分区不发生变化

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/8 9:58 上午
 */
public class MapJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        mapSource.print();
        /**
         * JAVA JAVA SPARK HIVE
         * HIVE JAVA JAVA SPARK
         * JAVA JAVA HADOOP
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment,_}
object MapScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase())
      .print()
  }
}


(2)FlatMap详解


将接入的每一条数据转换成多条数据输出,包括空值,比如我们前面所讲的行数据切割

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class FlatMapJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        flatmapSource.print();
        /**
         * (JAVA,1)
         * (JAVA,1)
         * (SPARK,1)
         * (HIVE,1)
         * ..
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object FlatMapScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .print()
  }
}


(3)Map优化之MapPartition详解


功能与Map相似,只是MapPartition操作是在DataSet中基于分区对数据进行处理,函数调用中会按照分区将数据通过Iterator的形式传入并返回任意数量的结果值,这是对Map的一个小优化.

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:48 下午
 */
public class MapPartitionJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * mapPartition
         */
        DataSet<String> mapPartition = dateSource.mapPartition(new MapPartitionFunction<String, String>() {
            @Override
            public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception {
                for (String line : iterable){
                    collector.collect(line.toUpperCase());
                }
            }
        });
        mapPartition.print();
        /**
         * JAVA JAVA SPARK HIVE
         * HIVE JAVA JAVA SPARK
         * JAVA JAVA HADOOP
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object MapPartitionScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    )
      .mapPartition((iterator:Iterator[String] , collector : Collector[String]) => {
        for (line <- iterator){
          collector.collect(line.toUpperCase)
        }
      })
      .print()
  }
}


(4)Filter详解


根据条件对出入数据进行过滤,当条件为True后,数据元素才会传输到下游的DataSet数据集中.

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class FilterJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * filter
         */
        DataSet<Tuple2<String,Integer>> filterSource = flatmapSource.filter(new FilterFunction<Tuple2<String, Integer>>() {
            @Override
            public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return "SPARK".equals(stringIntegerTuple2.f0);
            }
        });
        filterSource.print();
        /**
         * (SPARK,1)
         * (SPARK,1)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object FilterScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource: Unit = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .filter(tuple2 => (tuple2._1.equals("SPARK")))
      .print()
  }
}


(5)Reduce详解


通过两两合并,将数据集中的元素合并成一个元素,可以在整个数据集上使用。

reduce是读取一条一条的聚合

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class ReduceJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * reduce
         */
        DataSet<Tuple2<String,Integer>> reduceSource =  flatmapSource.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
            }
        });
        reduceSource.print();
        /**
         * (HIVE,2)
         * (HADOOP,1)
         * (JAVA,6)
         * (SPARK,2)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object ReduceScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .groupBy(0)
      .reduce((x,y) => (x._1,x._2+y._2))
      .print()
  }
}


(6)ReduceGroup详解


GroupReduce算子应用在一个已经分组了的DataSet上,其会对每个分组都调用到用户定义的group-reduce函数。它与Reduce的区别在于用户定义的函数会立即获得整个组。


将一组元素合并成一个或者多个元素,可以在整个数据集上使用。这是对reduce程序的一个小优化。


reduceGroup是一次性读取,比如一次读取10个,然后统一的去做聚合

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class ReduceGroupJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * ReduceGroup
         */
        DataSet<Tuple2<String,Integer>> reduceSource = flatmapSource.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String key = null;
                int count = 0;
                for (Tuple2<String,Integer> tuple2 : iterable){
                    key = tuple2.f0;
                    count = count + tuple2.f1;
                }
                collector.collect(new Tuple2<>(key,count));
            }
        });
        reduceSource.print();
        /**
         * (HIVE,2)
         * (HADOOP,1)
         * (JAVA,6)
         * (SPARK,2)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object ReduceGroupScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .groupBy(0)
      .reduceGroup((x => x reduce((x,y) => (x._1,x._2 + y._2))))
      .print()
  }
}

(7)ReduceGroup优化之CombineGroup详解


我们可以通过CombineGroup事先在每一台机器上进行聚合操作,再通过ReduceGroup将每台机器CombineGroup输出的结果进行聚合,这样的话,ReduceGroup需要汇总的数据量就少很多,从而加快计算的速度。

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class CombineGroupJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(
                new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * CombineGroup
         */
        DataSet<Tuple2<String,Integer>> combineGroupSource = flatmapSource.groupBy(0).combineGroup(
                new GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void combine(Iterable<Tuple2<String, Integer>> iterable,
                                Collector<Tuple2<String, Integer>> collector) throws Exception {
                String key = null;
                int count = 0;
                for (Tuple2<String,Integer> tuple2 : iterable){
                    key = tuple2.f0;
                    count = count + tuple2.f1;
                }
                collector.collect(new Tuple2<>(key,count));
            }
        });
        /**
         * ReduceGroup
         */
        DataSet<Tuple2<String,Integer>> reduceSource = combineGroupSource.groupBy(0).reduceGroup(
                new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> iterable,
                               Collector<Tuple2<String, Integer>> collector) throws Exception {
                String key = null;
                int count = 0;
                for (Tuple2<String,Integer> tuple2 : iterable){
                    key = tuple2.f0;
                    count = count + tuple2.f1;
                }
                collector.collect(new Tuple2<>(key,count));
            }
        });
        reduceSource.print();
        /**
         * (HIVE,2)
         * (HADOOP,1)
         * (JAVA,6)
         * (SPARK,2)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object CombineGroupScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .groupBy(0)
      .combineGroup((iterator,combine_collector : Collector[(String,Int)]) => {
        combine_collector.collect(iterator reduce((t1,t2) => (t1._1,t1._2 + t2._2)))
      })
      .groupBy(0)
      .reduceGroup((x => x reduce((x,y) => (x._1,x._2 + y._2))))
      .print()
  }
}



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
23 3
|
3月前
|
流计算
在Flink中,如果需要进行split和where操作
【2月更文挑战第6天】在Flink中,如果需要进行split和where操作
20 1
|
5月前
|
数据处理 数据库 流计算
Flink 操作mapper、sink解析
Flink 操作mapper、sink解析
28 0
|
7月前
|
分布式计算 Hadoop 大数据
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
|
10天前
|
机器学习/深度学习 人工智能 流计算
人工智能平台PAI 操作报错合集之在集群上提交了包含alink相关功能的flink任务,但是却报错如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
14天前
|
Apache 流计算 开发者
[AIGC] Flink中的Max和Reduce操作:区别及使用场景
[AIGC] Flink中的Max和Reduce操作:区别及使用场景
|
3月前
|
Oracle 关系型数据库 MySQL
Flink CDC数据同步问题之丢失update操作如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
4月前
|
SQL Java 数据库连接
这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
【1月更文挑战第17天】【1月更文挑战第85篇】这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
23 8
|
4月前
|
消息中间件 Java Kafka
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
37 0
|
4月前
|
监控 Java 流计算
Flink中的窗口操作是什么?请解释其作用和使用场景。
Flink中的窗口操作是什么?请解释其作用和使用场景。
27 0