Flink之DataSet转换操作(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Map详解


完成对数据集Map端的转换,并行将每一条数据转换成新的一条数据,数据分区不发生变化

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/8 9:58 上午
 */
public class MapJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        mapSource.print();
        /**
         * JAVA JAVA SPARK HIVE
         * HIVE JAVA JAVA SPARK
         * JAVA JAVA HADOOP
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment,_}
object MapScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase())
      .print()
  }
}


(2)FlatMap详解


将接入的每一条数据转换成多条数据输出,包括空值,比如我们前面所讲的行数据切割

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class FlatMapJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        flatmapSource.print();
        /**
         * (JAVA,1)
         * (JAVA,1)
         * (SPARK,1)
         * (HIVE,1)
         * ..
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object FlatMapScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .print()
  }
}


(3)Map优化之MapPartition详解


功能与Map相似,只是MapPartition操作是在DataSet中基于分区对数据进行处理,函数调用中会按照分区将数据通过Iterator的形式传入并返回任意数量的结果值,这是对Map的一个小优化.

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:48 下午
 */
public class MapPartitionJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * mapPartition
         */
        DataSet<String> mapPartition = dateSource.mapPartition(new MapPartitionFunction<String, String>() {
            @Override
            public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception {
                for (String line : iterable){
                    collector.collect(line.toUpperCase());
                }
            }
        });
        mapPartition.print();
        /**
         * JAVA JAVA SPARK HIVE
         * HIVE JAVA JAVA SPARK
         * JAVA JAVA HADOOP
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object MapPartitionScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    )
      .mapPartition((iterator:Iterator[String] , collector : Collector[String]) => {
        for (line <- iterator){
          collector.collect(line.toUpperCase)
        }
      })
      .print()
  }
}


(4)Filter详解


根据条件对出入数据进行过滤,当条件为True后,数据元素才会传输到下游的DataSet数据集中.

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class FilterJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * filter
         */
        DataSet<Tuple2<String,Integer>> filterSource = flatmapSource.filter(new FilterFunction<Tuple2<String, Integer>>() {
            @Override
            public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return "SPARK".equals(stringIntegerTuple2.f0);
            }
        });
        filterSource.print();
        /**
         * (SPARK,1)
         * (SPARK,1)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object FilterScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource: Unit = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .filter(tuple2 => (tuple2._1.equals("SPARK")))
      .print()
  }
}


(5)Reduce详解


通过两两合并,将数据集中的元素合并成一个元素,可以在整个数据集上使用。

reduce是读取一条一条的聚合

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class ReduceJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * reduce
         */
        DataSet<Tuple2<String,Integer>> reduceSource =  flatmapSource.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
            }
        });
        reduceSource.print();
        /**
         * (HIVE,2)
         * (HADOOP,1)
         * (JAVA,6)
         * (SPARK,2)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object ReduceScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .groupBy(0)
      .reduce((x,y) => (x._1,x._2+y._2))
      .print()
  }
}


(6)ReduceGroup详解


GroupReduce算子应用在一个已经分组了的DataSet上,其会对每个分组都调用到用户定义的group-reduce函数。它与Reduce的区别在于用户定义的函数会立即获得整个组。


将一组元素合并成一个或者多个元素,可以在整个数据集上使用。这是对reduce程序的一个小优化。


reduceGroup是一次性读取,比如一次读取10个,然后统一的去做聚合

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class ReduceGroupJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * ReduceGroup
         */
        DataSet<Tuple2<String,Integer>> reduceSource = flatmapSource.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String key = null;
                int count = 0;
                for (Tuple2<String,Integer> tuple2 : iterable){
                    key = tuple2.f0;
                    count = count + tuple2.f1;
                }
                collector.collect(new Tuple2<>(key,count));
            }
        });
        reduceSource.print();
        /**
         * (HIVE,2)
         * (HADOOP,1)
         * (JAVA,6)
         * (SPARK,2)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object ReduceGroupScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .groupBy(0)
      .reduceGroup((x => x reduce((x,y) => (x._1,x._2 + y._2))))
      .print()
  }
}

(7)ReduceGroup优化之CombineGroup详解


我们可以通过CombineGroup事先在每一台机器上进行聚合操作,再通过ReduceGroup将每台机器CombineGroup输出的结果进行聚合,这样的话,ReduceGroup需要汇总的数据量就少很多,从而加快计算的速度。

Java代码实现:

package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 8:26 下午
 */
public class CombineGroupJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> dateSource = env.fromElements(
                "java java spark hive",
                "hive java java spark",
                "java java hadoop"
        );
        /**
         * map
         */
        DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                return line.toUpperCase();
            }
        });
        /**
         * flatmap
         */
        DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(
                new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });
        /**
         * CombineGroup
         */
        DataSet<Tuple2<String,Integer>> combineGroupSource = flatmapSource.groupBy(0).combineGroup(
                new GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void combine(Iterable<Tuple2<String, Integer>> iterable,
                                Collector<Tuple2<String, Integer>> collector) throws Exception {
                String key = null;
                int count = 0;
                for (Tuple2<String,Integer> tuple2 : iterable){
                    key = tuple2.f0;
                    count = count + tuple2.f1;
                }
                collector.collect(new Tuple2<>(key,count));
            }
        });
        /**
         * ReduceGroup
         */
        DataSet<Tuple2<String,Integer>> reduceSource = combineGroupSource.groupBy(0).reduceGroup(
                new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> iterable,
                               Collector<Tuple2<String, Integer>> collector) throws Exception {
                String key = null;
                int count = 0;
                for (Tuple2<String,Integer> tuple2 : iterable){
                    key = tuple2.f0;
                    count = count + tuple2.f1;
                }
                collector.collect(new Tuple2<>(key,count));
            }
        });
        reduceSource.print();
        /**
         * (HIVE,2)
         * (HADOOP,1)
         * (JAVA,6)
         * (SPARK,2)
         */
    }
}

Scala代码实现:

package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object CombineGroupScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment;
    val dataSource = env.fromElements(
      "java java spark hive",
      "hive java java spark",
      "java java hadoop"
    ).map(line => line.toUpperCase)
      .flatMap((line : String,collector : Collector[(String,Int)]) => {
//        for (word <- line.split(" ")){
//          collector.collect(word,1)
//        }
        (line.split(" ").foreach(word => collector.collect(word,1)))
      })
      .groupBy(0)
      .combineGroup((iterator,combine_collector : Collector[(String,Int)]) => {
        combine_collector.collect(iterator reduce((t1,t2) => (t1._1,t1._2 + t2._2)))
      })
      .groupBy(0)
      .reduceGroup((x => x reduce((x,y) => (x._1,x._2 + y._2))))
      .print()
  }
}



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
12
分享
相关文章
|
3月前
|
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
89 0
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
37 1
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
118 3
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
60 1
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之怎么向一个未定义列的表中写入数据
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之perjob提交给yarn,报错显示无法连接yarn- Connecting to ResourceManager,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之cdc postgres数据库,当表行记录修改后报错,该如何修改
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之整库同步mysql到starRock提交任务异常,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等