一.引言
编辑
数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。创建数据集的机制一般抽象在 InputFormat 后面,这里有点类似 spark 的 sparkContext,Flink 的 ExecutionEnvironment 也提供了很多快捷的方法。主要分为下面几大类,基于文件的和基于集合的 :
File-Based 基于文件 | |
readTextFile(path) | TextInputFormat -读取文件行并返回字符串。 |
readTextFileWithValue(path) | TextValueInputFormat -读取文件行并返回StringValues。StringValues是可变字符串。 |
readCsvFile(path) | CsvInputFormat -解析逗号(或其他char)分隔字段的文件。返回由元组或pojo组成的数据集。支持基本java类型及其对应值作为字段类型。 |
readFileOfPrimitives(path, Class) | PrimitiveInputFormat—解析以新行(或其他字符序列)分隔的原始数据类型的文件,如String或Integer。 |
readFileOfPrimitives(path, delimiter, Class) | PrimitiveInputFormat—使用给定的分隔符,解析以新行(或其他字符序列)分隔的原始数据类型(如String或Integer)的文件。 |
Collection-Based 基于集合 | |
fromCollection(Collection) | 从Java.util.Collection创建一个数据集。集合中的所有元素必须具有相同的类型,当然也可以是 scala 的。 |
fromCollection(Iterator, Class) | 从迭代器创建一个数据集。该类指定迭代器返回的元素的数据类型。 |
fromElements(T…) | 根据给定的对象序列创建一个数据集。所有对象必须是相同的类型。 |
fromParallelCollection(SplittableIterator, Class) |
从一个迭代器中并行创建一个数据集。该类指定迭代器返回的元素的数据类型。 |
generateSequence(from, to) | 在给定的间隔内并行生成数字序列。 |
Generic 泛型 | |
readFile(inputFormat, path) |
FileInputFormat - 接受文件输入格式。 |
createInput(inputFormat) / inputFormat | 接受通用的输入格式。 |
Tips:
介绍前这里先初始化好执行的 ExecutionEnvironment ,后面的示例都将基于改 env 实现。注意最下面 import 的隐式转换,Flink 基于 Scala 时很多方法都需要隐式转换,否则 api 执行会报错。
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._
二.FileBased 基于文件
1.readTextFile
Api readTextFile 和 spark 的 textFile 很像,它可以读取本地的文件,也可以读取 HDFS 或者集群上的数据,并支持自动识别一些压缩格式文件,就像 textFile 可以直接读取 gz 一样。
val textLinesFromLocalFile: DataSet[String] = env.readTextFile("./myfile") val textLinesFromHdfs: DataSet[String] = env.raedTextFile("hdfs://nnHost:nnPort/path/to/my/textfile") //触发程序执行 textLines.print()
对于本地和集群的文件都可以直接调用执行,截止到 Flink v1.14.3 该接口支持以下压缩格式:
Compressed Method 压缩方法 | File Extensions 文件扩展名 | Parallelizable 可压缩 |
DEFLATE | .deflate | no |
GZip | .gz, .gzip | no |
Bzip2 | .bz2 | no |
XZ | .xz | no |
ZStandart | .zst | no |
2.readTextFileWithValue
从文本文件中读取数据并以StringValue类型返回,StringValue类型为可变字符串。此方法和readTextFile方法类似,只不过是制定了数据类型,返回的不是 DataSet[String] 而是 DataSet[StringValue]
val textLines = env.readTextFileWithValue("./yourfile") //触发程序执行 textLines.print()
3.readCsvFile
csv 文件内容如下:
1,2,3,4,5 2,3,4,5,6 3,4,5,6,7 4,5,6,7,8 5,6,7,8,9
A.基础读法
[(隐式转换)] 中指定了 csv 文件中各个元素的数据类型
val csvInput = env.readCsvFile[(String,String,String,String,String)]("./info.csv") csvInput.print()
B.读取指定行
includedFields 使用数组,其中数字代表的含义为要保留的数据列对应的列数,这里还支持 ignoreFirstLine = true 参数可以去除带表头的 csv 文件。
val csvInput2 = env.readCsvFile[(String, Double)]("./info.csv", includedFields = Array(0, 3)) csvInput2.print()
C.读取生成指定类
scala 支持 caseClass 快速定义数据类,这里 [] 内代表返回的数据类型,pojoFields 指定对应列的 colName, 由于只给出了三列而原始数据有五列,所以只返回对应三列的数据
case class Person(a: String, b: String, c: String) val csvInput3 = env.readCsvFile[Person]("./info.csv", pojoFields = Array("name", "age", "gender")) csvInput3.print()
Person(4,5,6) Person(1,2,3) Person(5,6,7) Person(2,3,4) Person(3,4,5)
4.readFileOfPrimitives
读取一个原始数据类型(如String,Integer)的文件,返回一个对应的原始类型的DataSet集合。这里第一个参数为对应文件 path,第二个参数为分割符,以上面的 csv 文件数据为例,读取文件时会自动分割原始数据,得到类似的 DateSet[1,2,3,4,5,6,......],原始方法中还有一个 class 参数指定输出数据类型,这里隐式方法 env.readFileOfPrimitives[String] 的 [Class] 已经实现了该功能,所以 readFileOfPrimitives(path, delimiter, Class) 可以看作是 readFileOfPrimitives(path, Class) 的一个扩展。
val textLinesOfPrimitives = env.readFileOfPrimitives[String]("./info.csv", delimiter = ",") textLinesOfPrimitives.print()
Line: 1,2,3,4,5 DataSet.print(): 1 2 3 4 5
三. Collection-Based 基于集合
1.fromCollection
该方法有两个参数类型,一种是直接从 collection 中初始化,还有一种是从 iterator 中初始化,两者基本类似。这里如果是 java 则对应 java.util.Collection ,scala 则对应 scala.collection
//1.用Array创建DataSet val dataSet1: DataSet[String] = env.fromCollection(Array("spark", "flink")) dataSet1.print() //2.用Iterable创建DataSet val dataSet2: DataSet[String] = env.fromCollection(Iterable("spark", "flink")) dataSet2.print()
由于 collection 中包含多种数据结构,写法相同,下面给出一些可以用于初始化的常见数据结构 :
Array,ArrayBuffer,List,ListBuffer,Vector,mutable.Queue,mutable.Stack,Stream,Seq,Set,Iteratable, Iterator,mutable.ArraySeq,mutable.ArrayStack,Map,Range。
还有一个特殊的 generateSequence 可以生成 DataSet :
val numbers = env.generateSequence(1, 10000000)
2.fromElements
根据给定的对象序列创建数据集。所有对象必须是相同的类型。这个就比较好理解了,直接给出相同类型的元素即可,fromCollection 和 fromElements 身上都可以看到一丝 spark.parallelize 序列化函数的影子
//1.用element创建DataSet(fromElements) val dataSet1: DataSet[String] = env.fromElements("spark", "flink") dataSet1.print() //2.用Tuple创建DataSet(fromElements) val dataSet2: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink")) dataSet2.print()
3. fromParallelCollection
package org.apache.flink.util; import java.io.Serializable; import java.util.Iterator; import org.apache.flink.annotation.Public; @Public public abstract class SplittableIterator<T> implements Iterator<T>, Serializable { private static final long serialVersionUID = 200377674313072307L; public SplittableIterator() { } public abstract Iterator<T>[] split(int var1); public Iterator<T> getSplit(int num, int numPartitions) { if (numPartitions >= 1 && num >= 0 && num < numPartitions) { return this.split(numPartitions)[num]; } else { throw new IllegalArgumentException(); } } public abstract int getMaximumNumberOfSplits(); }
fromParallelCollection 的参数为 SplittableIterator, SplittableIterator是个抽象类,它定义了抽象方法 split 以及 getMaximumNumberOfSplits;它有两个实现类,分别是LongValueSequenceIterator以及NumberSequenceIterator。两个实现类实现了常用 number 的迭代器实现和 Long 的迭代器实现,有兴趣的小伙伴可以去看下 SplittableIterator 和各自实现类的源码,没兴趣的话你就只需要知道该方法可以并行读取迭代器并返回指定元素的数据类型。
val start = System.currentTimeMillis() val it = (0 to 100).iterator val dataSetSingle: DataSet[Int] = env.fromCollection(it) dataSetSingle.print() println("Single thread Cost: ", (System.currentTimeMillis() - start)) val start1 = System.currentTimeMillis() val itSequence = new NumberSequenceIterator(0, 100) val dataSetParellel = env.fromParallelCollection(itSequence) dataSetParellel.print() println("Parallel thread Cost: ", (System.currentTimeMillis() - start1))
二者主要体现在并行的效率上 :
(Single thread Cost: 3886) (Parallel thread Cost: 939)
四.Generic 泛型
1.readFile
A.ExecutionEnvironment
该方法接受文件输入格式,指定 inputFormat 和 path 即可输出文件内容
val data = env.readFile(new TextInputFormat(null), "./info.csv") data.print()
1,2,3,4,5 3,4,5,6,7 5,6,7,8,9 4,5,6,7,8 2,3,4,5,6
B.StreamExecutionEnvironment
上述 env 采用 ExecutionEnvironment.getExecutionEnvironment,可以看作是 sparkContext 处理离线任务,还有一种 StreamExecutionEnvironment 可以看作是 StreamingContext 处理流式任务,该 env 也拥有 readFile api :
val envStreaming = StreamExecutionEnvironment.getExecutionEnvironment val dataSource = envStreaming.readFile(new TextInputFormat(null), "./info.csv", FileProcessingMode.PROCESS_CONTINUOUSLY, 5000L) dataSource.print() envStreaming.execute()
ExecutionEnvironment 执行时只读取文件一次,StreamingExecutionEnvironment 在 PROCESS_CONTINUOUSLY 模式下会根据 interval = 5000L ms 持续扫描文件,如果文件发生修改则重新读取文件内容,这里 interval 可以自定义。如果选择 PROCESS_ONE 模式,则会退化为 ExecutionEnvironment 的 readFIle 即只读一次。
2.createInput
该方法下接受通用输入格式。该方法和 spark.HadoopRDD 接口比较类似了,自定义的部分比较大。
// read a file from the specified path of type SequenceFileInputFormat val tuples = env.createInput(HadoopInputs.readSequenceFile(classOf[IntWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file"))
五.总结
ExecutionEnvironment 模型下主要以静态 DateSet 为 DataSource 并进行后续处理,很多接口的含义和执行与 spark 很类似,其主要思想为批处理,后续介绍 DataSet 常用的 transform 函数与批处理方法。