Flink / Scala - DataSource 之 DataSet 获取数据总结

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。

一.引言

image.gif编辑

数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。创建数据集的机制一般抽象在 InputFormat 后面,这里有点类似 spark 的 sparkContext,Flink 的 ExecutionEnvironment 也提供了很多快捷的方法。主要分为下面几大类,基于文件的和基于集合的 :

File-Based 基于文件        
readTextFile(path) TextInputFormat -读取文件行并返回字符串。
readTextFileWithValue(path)  TextValueInputFormat -读取文件行并返回StringValues。StringValues是可变字符串。
readCsvFile(path) CsvInputFormat -解析逗号(或其他char)分隔字段的文件。返回由元组或pojo组成的数据集。支持基本java类型及其对应值作为字段类型。
readFileOfPrimitives(path, Class) 

PrimitiveInputFormat—解析以新行(或其他字符序列)分隔的原始数据类型的文件,如String或Integer。

readFileOfPrimitives(path, delimiter, Class) PrimitiveInputFormat—使用给定的分隔符,解析以新行(或其他字符序列)分隔的原始数据类型(如String或Integer)的文件。
Collection-Based 基于集合
fromCollection(Collection) 从Java.util.Collection创建一个数据集。集合中的所有元素必须具有相同的类型,当然也可以是 scala 的。
fromCollection(Iterator, Class) 从迭代器创建一个数据集。该类指定迭代器返回的元素的数据类型。
fromElements(T…)

根据给定的对象序列创建一个数据集。所有对象必须是相同的类型。

fromParallelCollection(SplittableIterator, Class) 

从一个迭代器中并行创建一个数据集。该类指定迭代器返回的元素的数据类型。

generateSequence(from, to) 在给定的间隔内并行生成数字序列。
Generic 泛型

readFile(inputFormat, path) 

FileInputFormat - 接受文件输入格式。

createInput(inputFormat) / inputFormat  接受通用的输入格式。

Tips:

介绍前这里先初始化好执行的 ExecutionEnvironment ,后面的示例都将基于改 env 实现。注意最下面 import 的隐式转换,Flink 基于 Scala 时很多方法都需要隐式转换,否则 api 执行会报错。

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._

image.gif

二.FileBased 基于文件

1.readTextFile

Api readTextFile 和 spark 的 textFile 很像,它可以读取本地的文件,也可以读取 HDFS 或者集群上的数据,并支持自动识别一些压缩格式文件,就像 textFile 可以直接读取 gz 一样。

val textLinesFromLocalFile: DataSet[String] = env.readTextFile("./myfile")
    val textLinesFromHdfs: DataSet[String] = env.raedTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
    //触发程序执行
    textLines.print()

image.gif

对于本地和集群的文件都可以直接调用执行,截止到 Flink v1.14.3 该接口支持以下压缩格式:

Compressed Method 压缩方法 File Extensions 文件扩展名 Parallelizable 可压缩
DEFLATE .deflate no
GZip .gz, .gzip no
Bzip2 .bz2 no
XZ .xz no
ZStandart .zst no

2.readTextFileWithValue

从文本文件中读取数据并以StringValue类型返回,StringValue类型为可变字符串。此方法和readTextFile方法类似,只不过是制定了数据类型,返回的不是 DataSet[String] 而是 DataSet[StringValue]

val textLines = env.readTextFileWithValue("./yourfile")
    //触发程序执行
    textLines.print()

image.gif

3.readCsvFile

csv 文件内容如下:

1,2,3,4,5
2,3,4,5,6
3,4,5,6,7
4,5,6,7,8
5,6,7,8,9

image.gif

A.基础读法

[(隐式转换)] 中指定了 csv 文件中各个元素的数据类型

val csvInput = env.readCsvFile[(String,String,String,String,String)]("./info.csv")
    csvInput.print()

image.gif

B.读取指定行

includedFields 使用数组,其中数字代表的含义为要保留的数据列对应的列数,这里还支持 ignoreFirstLine = true 参数可以去除带表头的 csv 文件。

val csvInput2 = env.readCsvFile[(String, Double)]("./info.csv", 
        includedFields = Array(0, 3))
    csvInput2.print()

image.gif

C.读取生成指定类

scala 支持 caseClass 快速定义数据类,这里 [] 内代表返回的数据类型,pojoFields 指定对应列的 colName, 由于只给出了三列而原始数据有五列,所以只返回对应三列的数据

case class Person(a: String, b: String, c: String)
    val csvInput3 = env.readCsvFile[Person]("./info.csv",
          pojoFields = Array("name", "age", "gender"))
    csvInput3.print()

image.gif

Person(4,5,6)
Person(1,2,3)
Person(5,6,7)
Person(2,3,4)
Person(3,4,5)

image.gif

4.readFileOfPrimitives

读取一个原始数据类型(如String,Integer)的文件,返回一个对应的原始类型的DataSet集合。这里第一个参数为对应文件 path,第二个参数为分割符,以上面的 csv 文件数据为例,读取文件时会自动分割原始数据,得到类似的 DateSet[1,2,3,4,5,6,......],原始方法中还有一个 class 参数指定输出数据类型,这里隐式方法 env.readFileOfPrimitives[String] 的 [Class] 已经实现了该功能,所以 readFileOfPrimitives(path, delimiter, Class) 可以看作是 readFileOfPrimitives(path, Class) 的一个扩展。

val textLinesOfPrimitives = env.readFileOfPrimitives[String]("./info.csv",
     delimiter = ",")
    textLinesOfPrimitives.print()

image.gif

Line: 1,2,3,4,5
DataSet.print():
1
2
3
4
5

image.gif

三. Collection-Based 基于集合

1.fromCollection

该方法有两个参数类型,一种是直接从 collection 中初始化,还有一种是从 iterator 中初始化,两者基本类似。这里如果是 java 则对应 java.util.Collection ,scala 则对应 scala.collection

//1.用Array创建DataSet
    val dataSet1: DataSet[String] = env.fromCollection(Array("spark", "flink"))
    dataSet1.print()
    //2.用Iterable创建DataSet
    val dataSet2: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
    dataSet2.print()

image.gif

由于 collection 中包含多种数据结构,写法相同,下面给出一些可以用于初始化的常见数据结构 :

Array,ArrayBuffer,List,ListBuffer,Vector,mutable.Queue,mutable.Stack,Stream,Seq,Set,Iteratable, Iterator,mutable.ArraySeq,mutable.ArrayStack,Map,Range。

还有一个特殊的 generateSequence 可以生成 DataSet :

val numbers = env.generateSequence(1, 10000000)

image.gif

2.fromElements

根据给定的对象序列创建数据集。所有对象必须是相同的类型。这个就比较好理解了,直接给出相同类型的元素即可,fromCollection 和 fromElements 身上都可以看到一丝 spark.parallelize 序列化函数的影子

//1.用element创建DataSet(fromElements)
    val dataSet1: DataSet[String] = env.fromElements("spark", "flink")
    dataSet1.print()
    //2.用Tuple创建DataSet(fromElements)
    val dataSet2: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    dataSet2.print()

image.gif

3. fromParallelCollection

package org.apache.flink.util;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.annotation.Public;
@Public
public abstract class SplittableIterator<T> implements Iterator<T>, Serializable {
    private static final long serialVersionUID = 200377674313072307L;
    public SplittableIterator() {
    }
    public abstract Iterator<T>[] split(int var1);
    public Iterator<T> getSplit(int num, int numPartitions) {
        if (numPartitions >= 1 && num >= 0 && num < numPartitions) {
            return this.split(numPartitions)[num];
        } else {
            throw new IllegalArgumentException();
        }
    }
    public abstract int getMaximumNumberOfSplits();
}

image.gif

fromParallelCollection 的参数为 SplittableIterator, SplittableIterator是个抽象类,它定义了抽象方法 split 以及 getMaximumNumberOfSplits;它有两个实现类,分别是LongValueSequenceIterator以及NumberSequenceIterator。两个实现类实现了常用 number 的迭代器实现和 Long 的迭代器实现,有兴趣的小伙伴可以去看下 SplittableIterator 和各自实现类的源码,没兴趣的话你就只需要知道该方法可以并行读取迭代器并返回指定元素的数据类型。

val start = System.currentTimeMillis()
    val it = (0 to 100).iterator
    val dataSetSingle: DataSet[Int] = env.fromCollection(it)
    dataSetSingle.print()
    println("Single thread Cost: ", (System.currentTimeMillis() - start))
    val start1 = System.currentTimeMillis()
    val itSequence = new NumberSequenceIterator(0, 100)
    val dataSetParellel = env.fromParallelCollection(itSequence)
    dataSetParellel.print()
    println("Parallel thread Cost: ", (System.currentTimeMillis() - start1))

image.gif

二者主要体现在并行的效率上 :

(Single thread Cost: 3886)
(Parallel thread Cost: 939)

image.gif

四.Generic 泛型

1.readFile

A.ExecutionEnvironment

该方法接受文件输入格式,指定 inputFormat 和 path 即可输出文件内容

val data = env.readFile(new TextInputFormat(null), "./info.csv")
    data.print()

image.gif

1,2,3,4,5
3,4,5,6,7
5,6,7,8,9
4,5,6,7,8
2,3,4,5,6

image.gif

B.StreamExecutionEnvironment

上述 env 采用 ExecutionEnvironment.getExecutionEnvironment,可以看作是 sparkContext 处理离线任务,还有一种 StreamExecutionEnvironment 可以看作是 StreamingContext 处理流式任务,该 env 也拥有 readFile api :

val envStreaming = StreamExecutionEnvironment.getExecutionEnvironment
    val dataSource = envStreaming.readFile(new TextInputFormat(null), "./info.csv", 
      FileProcessingMode.PROCESS_CONTINUOUSLY,
      5000L)
    dataSource.print()
    envStreaming.execute()

image.gif

ExecutionEnvironment 执行时只读取文件一次,StreamingExecutionEnvironment 在 PROCESS_CONTINUOUSLY 模式下会根据 interval = 5000L ms 持续扫描文件,如果文件发生修改则重新读取文件内容,这里 interval 可以自定义。如果选择 PROCESS_ONE 模式,则会退化为 ExecutionEnvironment 的 readFIle 即只读一次。

2.createInput

该方法下接受通用输入格式。该方法和 spark.HadoopRDD 接口比较类似了,自定义的部分比较大。

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.createInput(HadoopInputs.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file"))

image.gif

五.总结

ExecutionEnvironment 模型下主要以静态 DateSet 为 DataSource 并进行后续处理,很多接口的含义和执行与 spark 很类似,其主要思想为批处理,后续介绍 DataSet 常用的 transform 函数与批处理方法。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
消息中间件 关系型数据库 Kafka
flink cdc 数据问题之数据丢失如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
109 0
|
1月前
|
关系型数据库 MySQL Java
flink cdc 同步问题之多表数据如何同步
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
100 0
|
1月前
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之flink-cdc任务抓取全量的mysql数据不生效如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 SQL JSON
Flink问题之source并行度不同导致任务没有数据落地如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
332 0
|
1月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
52 3
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之flink Oraclecdc 捕获19C数据时报错错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
消息中间件 关系型数据库 Kafka
Flink CDC产品常见问题之 Oraclecdc JdbcIncrementalSource 捕获不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
72 2
|
1月前
|
分布式计算 Hadoop Java
Flink CDC产品常见问题之tidb cdc 数据量大了就疯狂报空指针如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。