Flink / Scala - DataSource 之 DataSet 获取数据总结

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。

一.引言

image.gif编辑

数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。创建数据集的机制一般抽象在 InputFormat 后面,这里有点类似 spark 的 sparkContext,Flink 的 ExecutionEnvironment 也提供了很多快捷的方法。主要分为下面几大类,基于文件的和基于集合的 :

File-Based 基于文件        
readTextFile(path) TextInputFormat -读取文件行并返回字符串。
readTextFileWithValue(path)  TextValueInputFormat -读取文件行并返回StringValues。StringValues是可变字符串。
readCsvFile(path) CsvInputFormat -解析逗号(或其他char)分隔字段的文件。返回由元组或pojo组成的数据集。支持基本java类型及其对应值作为字段类型。
readFileOfPrimitives(path, Class) 

PrimitiveInputFormat—解析以新行(或其他字符序列)分隔的原始数据类型的文件,如String或Integer。

readFileOfPrimitives(path, delimiter, Class) PrimitiveInputFormat—使用给定的分隔符,解析以新行(或其他字符序列)分隔的原始数据类型(如String或Integer)的文件。
Collection-Based 基于集合
fromCollection(Collection) 从Java.util.Collection创建一个数据集。集合中的所有元素必须具有相同的类型,当然也可以是 scala 的。
fromCollection(Iterator, Class) 从迭代器创建一个数据集。该类指定迭代器返回的元素的数据类型。
fromElements(T…)

根据给定的对象序列创建一个数据集。所有对象必须是相同的类型。

fromParallelCollection(SplittableIterator, Class) 

从一个迭代器中并行创建一个数据集。该类指定迭代器返回的元素的数据类型。

generateSequence(from, to) 在给定的间隔内并行生成数字序列。
Generic 泛型

readFile(inputFormat, path) 

FileInputFormat - 接受文件输入格式。

createInput(inputFormat) / inputFormat  接受通用的输入格式。

Tips:

介绍前这里先初始化好执行的 ExecutionEnvironment ,后面的示例都将基于改 env 实现。注意最下面 import 的隐式转换,Flink 基于 Scala 时很多方法都需要隐式转换,否则 api 执行会报错。

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._

image.gif

二.FileBased 基于文件

1.readTextFile

Api readTextFile 和 spark 的 textFile 很像,它可以读取本地的文件,也可以读取 HDFS 或者集群上的数据,并支持自动识别一些压缩格式文件,就像 textFile 可以直接读取 gz 一样。

val textLinesFromLocalFile: DataSet[String] = env.readTextFile("./myfile")
    val textLinesFromHdfs: DataSet[String] = env.raedTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
    //触发程序执行
    textLines.print()

image.gif

对于本地和集群的文件都可以直接调用执行,截止到 Flink v1.14.3 该接口支持以下压缩格式:

Compressed Method 压缩方法 File Extensions 文件扩展名 Parallelizable 可压缩
DEFLATE .deflate no
GZip .gz, .gzip no
Bzip2 .bz2 no
XZ .xz no
ZStandart .zst no

2.readTextFileWithValue

从文本文件中读取数据并以StringValue类型返回,StringValue类型为可变字符串。此方法和readTextFile方法类似,只不过是制定了数据类型,返回的不是 DataSet[String] 而是 DataSet[StringValue]

val textLines = env.readTextFileWithValue("./yourfile")
    //触发程序执行
    textLines.print()

image.gif

3.readCsvFile

csv 文件内容如下:

1,2,3,4,5
2,3,4,5,6
3,4,5,6,7
4,5,6,7,8
5,6,7,8,9

image.gif

A.基础读法

[(隐式转换)] 中指定了 csv 文件中各个元素的数据类型

val csvInput = env.readCsvFile[(String,String,String,String,String)]("./info.csv")
    csvInput.print()

image.gif

B.读取指定行

includedFields 使用数组,其中数字代表的含义为要保留的数据列对应的列数,这里还支持 ignoreFirstLine = true 参数可以去除带表头的 csv 文件。

val csvInput2 = env.readCsvFile[(String, Double)]("./info.csv", 
        includedFields = Array(0, 3))
    csvInput2.print()

image.gif

C.读取生成指定类

scala 支持 caseClass 快速定义数据类,这里 [] 内代表返回的数据类型,pojoFields 指定对应列的 colName, 由于只给出了三列而原始数据有五列,所以只返回对应三列的数据

case class Person(a: String, b: String, c: String)
    val csvInput3 = env.readCsvFile[Person]("./info.csv",
          pojoFields = Array("name", "age", "gender"))
    csvInput3.print()

image.gif

Person(4,5,6)
Person(1,2,3)
Person(5,6,7)
Person(2,3,4)
Person(3,4,5)

image.gif

4.readFileOfPrimitives

读取一个原始数据类型(如String,Integer)的文件,返回一个对应的原始类型的DataSet集合。这里第一个参数为对应文件 path,第二个参数为分割符,以上面的 csv 文件数据为例,读取文件时会自动分割原始数据,得到类似的 DateSet[1,2,3,4,5,6,......],原始方法中还有一个 class 参数指定输出数据类型,这里隐式方法 env.readFileOfPrimitives[String] 的 [Class] 已经实现了该功能,所以 readFileOfPrimitives(path, delimiter, Class) 可以看作是 readFileOfPrimitives(path, Class) 的一个扩展。

val textLinesOfPrimitives = env.readFileOfPrimitives[String]("./info.csv",
     delimiter = ",")
    textLinesOfPrimitives.print()

image.gif

Line: 1,2,3,4,5
DataSet.print():
1
2
3
4
5

image.gif

三. Collection-Based 基于集合

1.fromCollection

该方法有两个参数类型,一种是直接从 collection 中初始化,还有一种是从 iterator 中初始化,两者基本类似。这里如果是 java 则对应 java.util.Collection ,scala 则对应 scala.collection

//1.用Array创建DataSet
    val dataSet1: DataSet[String] = env.fromCollection(Array("spark", "flink"))
    dataSet1.print()
    //2.用Iterable创建DataSet
    val dataSet2: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
    dataSet2.print()

image.gif

由于 collection 中包含多种数据结构,写法相同,下面给出一些可以用于初始化的常见数据结构 :

Array,ArrayBuffer,List,ListBuffer,Vector,mutable.Queue,mutable.Stack,Stream,Seq,Set,Iteratable, Iterator,mutable.ArraySeq,mutable.ArrayStack,Map,Range。

还有一个特殊的 generateSequence 可以生成 DataSet :

val numbers = env.generateSequence(1, 10000000)

image.gif

2.fromElements

根据给定的对象序列创建数据集。所有对象必须是相同的类型。这个就比较好理解了,直接给出相同类型的元素即可,fromCollection 和 fromElements 身上都可以看到一丝 spark.parallelize 序列化函数的影子

//1.用element创建DataSet(fromElements)
    val dataSet1: DataSet[String] = env.fromElements("spark", "flink")
    dataSet1.print()
    //2.用Tuple创建DataSet(fromElements)
    val dataSet2: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    dataSet2.print()

image.gif

3. fromParallelCollection

package org.apache.flink.util;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.annotation.Public;
@Public
public abstract class SplittableIterator<T> implements Iterator<T>, Serializable {
    private static final long serialVersionUID = 200377674313072307L;
    public SplittableIterator() {
    }
    public abstract Iterator<T>[] split(int var1);
    public Iterator<T> getSplit(int num, int numPartitions) {
        if (numPartitions >= 1 && num >= 0 && num < numPartitions) {
            return this.split(numPartitions)[num];
        } else {
            throw new IllegalArgumentException();
        }
    }
    public abstract int getMaximumNumberOfSplits();
}

image.gif

fromParallelCollection 的参数为 SplittableIterator, SplittableIterator是个抽象类,它定义了抽象方法 split 以及 getMaximumNumberOfSplits;它有两个实现类,分别是LongValueSequenceIterator以及NumberSequenceIterator。两个实现类实现了常用 number 的迭代器实现和 Long 的迭代器实现,有兴趣的小伙伴可以去看下 SplittableIterator 和各自实现类的源码,没兴趣的话你就只需要知道该方法可以并行读取迭代器并返回指定元素的数据类型。

val start = System.currentTimeMillis()
    val it = (0 to 100).iterator
    val dataSetSingle: DataSet[Int] = env.fromCollection(it)
    dataSetSingle.print()
    println("Single thread Cost: ", (System.currentTimeMillis() - start))
    val start1 = System.currentTimeMillis()
    val itSequence = new NumberSequenceIterator(0, 100)
    val dataSetParellel = env.fromParallelCollection(itSequence)
    dataSetParellel.print()
    println("Parallel thread Cost: ", (System.currentTimeMillis() - start1))

image.gif

二者主要体现在并行的效率上 :

(Single thread Cost: 3886)
(Parallel thread Cost: 939)

image.gif

四.Generic 泛型

1.readFile

A.ExecutionEnvironment

该方法接受文件输入格式,指定 inputFormat 和 path 即可输出文件内容

val data = env.readFile(new TextInputFormat(null), "./info.csv")
    data.print()

image.gif

1,2,3,4,5
3,4,5,6,7
5,6,7,8,9
4,5,6,7,8
2,3,4,5,6

image.gif

B.StreamExecutionEnvironment

上述 env 采用 ExecutionEnvironment.getExecutionEnvironment,可以看作是 sparkContext 处理离线任务,还有一种 StreamExecutionEnvironment 可以看作是 StreamingContext 处理流式任务,该 env 也拥有 readFile api :

val envStreaming = StreamExecutionEnvironment.getExecutionEnvironment
    val dataSource = envStreaming.readFile(new TextInputFormat(null), "./info.csv", 
      FileProcessingMode.PROCESS_CONTINUOUSLY,
      5000L)
    dataSource.print()
    envStreaming.execute()

image.gif

ExecutionEnvironment 执行时只读取文件一次,StreamingExecutionEnvironment 在 PROCESS_CONTINUOUSLY 模式下会根据 interval = 5000L ms 持续扫描文件,如果文件发生修改则重新读取文件内容,这里 interval 可以自定义。如果选择 PROCESS_ONE 模式,则会退化为 ExecutionEnvironment 的 readFIle 即只读一次。

2.createInput

该方法下接受通用输入格式。该方法和 spark.HadoopRDD 接口比较类似了,自定义的部分比较大。

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.createInput(HadoopInputs.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file"))

image.gif

五.总结

ExecutionEnvironment 模型下主要以静态 DateSet 为 DataSource 并进行后续处理,很多接口的含义和执行与 spark 很类似,其主要思想为批处理,后续介绍 DataSet 常用的 transform 函数与批处理方法。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错之往GREENPLUM 6 写数据,用postgresql-42.2.9.jar 报 ON CONFLICT (uuid) DO UPDATE SET 语法有问题。怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
217 3
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
50 3
|
1月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
54 0
|
1月前
|
Java Shell 流计算
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
24 1
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
|
1月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
76 3
|
1月前
|
存储 Java 数据处理
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
36 1
|
1月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
41 0
|
4月前
|
SQL Java 数据处理
实时计算 Flink版产品使用问题之使用MavenShadePlugin进行relocation并遇到只包含了Java代码而未包含Scala代码,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
流计算 Windows
【极数系列】Flink集成DataSource读取Socket请求数据(09)
【极数系列】Flink集成DataSource读取Socket请求数据(09)
125 3