Flink / Scala - DataSource 之 DataSet 获取数据总结

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。

一.引言

image.gif编辑

数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。创建数据集的机制一般抽象在 InputFormat 后面,这里有点类似 spark 的 sparkContext,Flink 的 ExecutionEnvironment 也提供了很多快捷的方法。主要分为下面几大类,基于文件的和基于集合的 :

File-Based 基于文件        
readTextFile(path) TextInputFormat -读取文件行并返回字符串。
readTextFileWithValue(path)  TextValueInputFormat -读取文件行并返回StringValues。StringValues是可变字符串。
readCsvFile(path) CsvInputFormat -解析逗号(或其他char)分隔字段的文件。返回由元组或pojo组成的数据集。支持基本java类型及其对应值作为字段类型。
readFileOfPrimitives(path, Class) 

PrimitiveInputFormat—解析以新行(或其他字符序列)分隔的原始数据类型的文件,如String或Integer。

readFileOfPrimitives(path, delimiter, Class) PrimitiveInputFormat—使用给定的分隔符,解析以新行(或其他字符序列)分隔的原始数据类型(如String或Integer)的文件。
Collection-Based 基于集合
fromCollection(Collection) 从Java.util.Collection创建一个数据集。集合中的所有元素必须具有相同的类型,当然也可以是 scala 的。
fromCollection(Iterator, Class) 从迭代器创建一个数据集。该类指定迭代器返回的元素的数据类型。
fromElements(T…)

根据给定的对象序列创建一个数据集。所有对象必须是相同的类型。

fromParallelCollection(SplittableIterator, Class) 

从一个迭代器中并行创建一个数据集。该类指定迭代器返回的元素的数据类型。

generateSequence(from, to) 在给定的间隔内并行生成数字序列。
Generic 泛型

readFile(inputFormat, path) 

FileInputFormat - 接受文件输入格式。

createInput(inputFormat) / inputFormat  接受通用的输入格式。

Tips:

介绍前这里先初始化好执行的 ExecutionEnvironment ,后面的示例都将基于改 env 实现。注意最下面 import 的隐式转换,Flink 基于 Scala 时很多方法都需要隐式转换,否则 api 执行会报错。

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._

image.gif

二.FileBased 基于文件

1.readTextFile

Api readTextFile 和 spark 的 textFile 很像,它可以读取本地的文件,也可以读取 HDFS 或者集群上的数据,并支持自动识别一些压缩格式文件,就像 textFile 可以直接读取 gz 一样。

val textLinesFromLocalFile: DataSet[String] = env.readTextFile("./myfile")
    val textLinesFromHdfs: DataSet[String] = env.raedTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
    //触发程序执行
    textLines.print()

image.gif

对于本地和集群的文件都可以直接调用执行,截止到 Flink v1.14.3 该接口支持以下压缩格式:

Compressed Method 压缩方法 File Extensions 文件扩展名 Parallelizable 可压缩
DEFLATE .deflate no
GZip .gz, .gzip no
Bzip2 .bz2 no
XZ .xz no
ZStandart .zst no

2.readTextFileWithValue

从文本文件中读取数据并以StringValue类型返回,StringValue类型为可变字符串。此方法和readTextFile方法类似,只不过是制定了数据类型,返回的不是 DataSet[String] 而是 DataSet[StringValue]

val textLines = env.readTextFileWithValue("./yourfile")
    //触发程序执行
    textLines.print()

image.gif

3.readCsvFile

csv 文件内容如下:

1,2,3,4,5
2,3,4,5,6
3,4,5,6,7
4,5,6,7,8
5,6,7,8,9

image.gif

A.基础读法

[(隐式转换)] 中指定了 csv 文件中各个元素的数据类型

val csvInput = env.readCsvFile[(String,String,String,String,String)]("./info.csv")
    csvInput.print()

image.gif

B.读取指定行

includedFields 使用数组,其中数字代表的含义为要保留的数据列对应的列数,这里还支持 ignoreFirstLine = true 参数可以去除带表头的 csv 文件。

val csvInput2 = env.readCsvFile[(String, Double)]("./info.csv", 
        includedFields = Array(0, 3))
    csvInput2.print()

image.gif

C.读取生成指定类

scala 支持 caseClass 快速定义数据类,这里 [] 内代表返回的数据类型,pojoFields 指定对应列的 colName, 由于只给出了三列而原始数据有五列,所以只返回对应三列的数据

case class Person(a: String, b: String, c: String)
    val csvInput3 = env.readCsvFile[Person]("./info.csv",
          pojoFields = Array("name", "age", "gender"))
    csvInput3.print()

image.gif

Person(4,5,6)
Person(1,2,3)
Person(5,6,7)
Person(2,3,4)
Person(3,4,5)

image.gif

4.readFileOfPrimitives

读取一个原始数据类型(如String,Integer)的文件,返回一个对应的原始类型的DataSet集合。这里第一个参数为对应文件 path,第二个参数为分割符,以上面的 csv 文件数据为例,读取文件时会自动分割原始数据,得到类似的 DateSet[1,2,3,4,5,6,......],原始方法中还有一个 class 参数指定输出数据类型,这里隐式方法 env.readFileOfPrimitives[String] 的 [Class] 已经实现了该功能,所以 readFileOfPrimitives(path, delimiter, Class) 可以看作是 readFileOfPrimitives(path, Class) 的一个扩展。

val textLinesOfPrimitives = env.readFileOfPrimitives[String]("./info.csv",
     delimiter = ",")
    textLinesOfPrimitives.print()

image.gif

Line: 1,2,3,4,5
DataSet.print():
1
2
3
4
5

image.gif

三. Collection-Based 基于集合

1.fromCollection

该方法有两个参数类型,一种是直接从 collection 中初始化,还有一种是从 iterator 中初始化,两者基本类似。这里如果是 java 则对应 java.util.Collection ,scala 则对应 scala.collection

//1.用Array创建DataSet
    val dataSet1: DataSet[String] = env.fromCollection(Array("spark", "flink"))
    dataSet1.print()
    //2.用Iterable创建DataSet
    val dataSet2: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
    dataSet2.print()

image.gif

由于 collection 中包含多种数据结构,写法相同,下面给出一些可以用于初始化的常见数据结构 :

Array,ArrayBuffer,List,ListBuffer,Vector,mutable.Queue,mutable.Stack,Stream,Seq,Set,Iteratable, Iterator,mutable.ArraySeq,mutable.ArrayStack,Map,Range。

还有一个特殊的 generateSequence 可以生成 DataSet :

val numbers = env.generateSequence(1, 10000000)

image.gif

2.fromElements

根据给定的对象序列创建数据集。所有对象必须是相同的类型。这个就比较好理解了,直接给出相同类型的元素即可,fromCollection 和 fromElements 身上都可以看到一丝 spark.parallelize 序列化函数的影子

//1.用element创建DataSet(fromElements)
    val dataSet1: DataSet[String] = env.fromElements("spark", "flink")
    dataSet1.print()
    //2.用Tuple创建DataSet(fromElements)
    val dataSet2: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    dataSet2.print()

image.gif

3. fromParallelCollection

package org.apache.flink.util;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.annotation.Public;
@Public
public abstract class SplittableIterator<T> implements Iterator<T>, Serializable {
    private static final long serialVersionUID = 200377674313072307L;
    public SplittableIterator() {
    }
    public abstract Iterator<T>[] split(int var1);
    public Iterator<T> getSplit(int num, int numPartitions) {
        if (numPartitions >= 1 && num >= 0 && num < numPartitions) {
            return this.split(numPartitions)[num];
        } else {
            throw new IllegalArgumentException();
        }
    }
    public abstract int getMaximumNumberOfSplits();
}

image.gif

fromParallelCollection 的参数为 SplittableIterator, SplittableIterator是个抽象类,它定义了抽象方法 split 以及 getMaximumNumberOfSplits;它有两个实现类,分别是LongValueSequenceIterator以及NumberSequenceIterator。两个实现类实现了常用 number 的迭代器实现和 Long 的迭代器实现,有兴趣的小伙伴可以去看下 SplittableIterator 和各自实现类的源码,没兴趣的话你就只需要知道该方法可以并行读取迭代器并返回指定元素的数据类型。

val start = System.currentTimeMillis()
    val it = (0 to 100).iterator
    val dataSetSingle: DataSet[Int] = env.fromCollection(it)
    dataSetSingle.print()
    println("Single thread Cost: ", (System.currentTimeMillis() - start))
    val start1 = System.currentTimeMillis()
    val itSequence = new NumberSequenceIterator(0, 100)
    val dataSetParellel = env.fromParallelCollection(itSequence)
    dataSetParellel.print()
    println("Parallel thread Cost: ", (System.currentTimeMillis() - start1))

image.gif

二者主要体现在并行的效率上 :

(Single thread Cost: 3886)
(Parallel thread Cost: 939)

image.gif

四.Generic 泛型

1.readFile

A.ExecutionEnvironment

该方法接受文件输入格式,指定 inputFormat 和 path 即可输出文件内容

val data = env.readFile(new TextInputFormat(null), "./info.csv")
    data.print()

image.gif

1,2,3,4,5
3,4,5,6,7
5,6,7,8,9
4,5,6,7,8
2,3,4,5,6

image.gif

B.StreamExecutionEnvironment

上述 env 采用 ExecutionEnvironment.getExecutionEnvironment,可以看作是 sparkContext 处理离线任务,还有一种 StreamExecutionEnvironment 可以看作是 StreamingContext 处理流式任务,该 env 也拥有 readFile api :

val envStreaming = StreamExecutionEnvironment.getExecutionEnvironment
    val dataSource = envStreaming.readFile(new TextInputFormat(null), "./info.csv", 
      FileProcessingMode.PROCESS_CONTINUOUSLY,
      5000L)
    dataSource.print()
    envStreaming.execute()

image.gif

ExecutionEnvironment 执行时只读取文件一次,StreamingExecutionEnvironment 在 PROCESS_CONTINUOUSLY 模式下会根据 interval = 5000L ms 持续扫描文件,如果文件发生修改则重新读取文件内容,这里 interval 可以自定义。如果选择 PROCESS_ONE 模式,则会退化为 ExecutionEnvironment 的 readFIle 即只读一次。

2.createInput

该方法下接受通用输入格式。该方法和 spark.HadoopRDD 接口比较类似了,自定义的部分比较大。

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.createInput(HadoopInputs.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file"))

image.gif

五.总结

ExecutionEnvironment 模型下主要以静态 DateSet 为 DataSource 并进行后续处理,很多接口的含义和执行与 spark 很类似,其主要思想为批处理,后续介绍 DataSet 常用的 transform 函数与批处理方法。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
5天前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
18 2
|
1天前
|
资源调度 Oracle Java
实时计算 Flink版产品使用问题之在YARN集群上运行时,如何查看每个并行度的详细处理数据情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
SQL Oracle Java
实时计算 Flink版产品使用问题之采集Oracle数据时,为什么无法采集到其他TABLESPACE的表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
SQL Oracle 数据处理
实时计算 Flink版产品使用问题之如何优化数据读取速度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
分布式计算 Oracle 关系型数据库
实时计算 Flink版产品使用问题之获取Oracle的数据时无法获取clob类型的数据,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
Kubernetes 关系型数据库 API
实时计算 Flink版产品使用问题之连接的PG表长时间无数据写入,WAL日志持续增长,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
存储 Java 关系型数据库
实时计算 Flink版产品使用问题之以jar包方式同步数据是否需要定义存储oss的位置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5天前
|
消息中间件 分布式计算 Kafka
流计算引擎数据问题之MillWheel 和 Flink 实现数据流的同步处理如何解决
流计算引擎数据问题之MillWheel 和 Flink 实现数据流的同步处理如何解决
12 0
|
5天前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
14 0