一.引言
DataStream API 得名于特殊的 DataStream
类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。
DataStream
在用法上类似于常规的 Java 集合
,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream
API 操作来处理它们,DataStream
API 操作也叫作转换(transformation)。
你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream
。然后,你可以基于 DataStream
派生新的流,并使用 map、filter 等 API 方法把 DataStream
和派生的流连接在一起。和之前相同,一个 DataStrea 的处理主要包含 Source + Transformation + Sink 的组合:
编辑
Tips:
与之前不同的是,DataSet 的执行环境为:
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
DataStreaming 的执行环境为:
val env = StreamExecutionEnvironment.getExecutionEnvironment
二.FileBased 基于文件
这里大部分接口与 DataSet 类似,由于 env 的不同,得到的最终类型也不同,由 DataSet 变为了 DataStreaming
1.readTextFile(path)
读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。
val textLines = env.readTextFile("path")
2.readFile(fileInputFormat, path)
按照指定的文件输入格式读取(一次)文件。
class selfFileInputFormat() extends FileInputFormat[String] { override def reachedEnd(): Boolean = ??? override def nextRecord(ot: String): String = ??? } val dataStream = env.readFile(new selfFileInputFormat(), "")
3.readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
上述两个方法是基于 API 直接调用的,底层调用函数为该函数,该方法它基于给定的 fileInputFormat
读取路径 path
上的文件。根据提供的 watchType
的不同,source 可能定期(每 interval
毫秒)监控路径上的新数据。
Tips:
在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType
),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。
FileProcessingMode.PROCESS_CONTINUOUSLY
当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。
FileProcessingMode.PROCESS_ONCE
source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
val env = StreamExecutionEnvironment.getExecutionEnvironment // 自定义 TextFormat class selfFileInputFormat() extends FileInputFormat[String] { override def reachedEnd(): Boolean = ??? override def nextRecord(ot: String): String = ??? } // 隐函数 typeInfo implicit val typeInfo = TypeInformation.of(classOf[String]) // 读取模式 watchType val watchType = FileProcessingMode.PROCESS_CONTINUOUSLY // 文件过滤 fileFilter 也可以加入过滤正在处理的文件逻辑 val filePathFilter = new FilePathFilter { override def filterPath(filePath: Path): Boolean = { filePath.getPath.endsWith(".txt") } } val dataStream = env.readFile(new selfFileInputFormat(), "", watchType, 60L, filePathFilter)
三.Collection-Based
1.fromCollection(Collection)
从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。当然,使用 scala 的转换后,scala 对应的 collection 也可以使用,这里使用方法和 DataSet 类似。
val dataStream: DataStream[String] = env.fromCollection(Array("spark", "flink"))
2.fromCollection(Iterator, Class)
从迭代器获取,class 参数指定返回值返回元素的数据类型
val dataStream: DataStream[String] = env.fromCollection(Iterator("spark", "flink"))
3.fromElements(T ...)
从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
val dataStream: DataStream[String] = env.fromElements("spark", "flink")
4.fromParellelCollection(SplittableIterator, Class)
从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。
val itSequence = new NumberSequenceIterator(0, 100) val dataStream = env.fromParallelCollection(itSequence)
5.generateSequence(from, to)
val numbers = env.generateSequence(1, 10000000)
四.Socket-Based
从 Socket 读取。元素可以由分隔符分隔。
1.启动 Socket
在本地 terminal 执行下列语句并输入一下字符:
nc -lk 9999
编辑
2.读取 Socket
val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("localhost", 9999) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .keyBy(_._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum(1) counts.print() env.execute("Window Stream WordCount")
上面是使用 keyBy 对 5S 滚动窗口内的单词进行 wordCount 的实例,下面是输出结果:
3> (hello,1) 5> (world,1)
持续输入一些单词程序会每5s统计一个窗口内的 wordCount。
五.AddSource
1.官方 API
上一边文章提到了 Flink 支持的外部 API 以及对应支持的运行方式,下述 Connector 类别中支持 source 的均可以调用官方 API 和 Maven 依赖进行数据读取与加载生成 DataStream。
Connector 类别 | 支持方式 |
Apache Kafka | source/sink |
Apache Cassandra | sink |
Amazon Kinesis Streams | source/sink |
Elasticsearch | sink |
FileSystem | sink |
RabbitMQ | source/sink |
Google PubSub | source/sink |
Hybrid Source | source |
Apache NiFi | source/sink |
Apache Pulsar | source |
Twitter Streaming API | source |
JDBC | sink |
2. Self-Defined
自定义数据源需要继承 RichSourceFunction[T] 并定义数据类型 T, 主要实现 run 方法 - 获取数据与 cancel 方法 - 停止获取数据,这里和 Spark-Streaming 自定义 receiver,Storm 自定义实现 spout 类似,下面例子将以1s为间隔持续从文本中读取新内容并输出:
class SourceFromFile extends RichSourceFunction[String] { private var isRunning = true override def run(ctx: SourceFunction.SourceContext[String]): Unit = { val bufferedReader = new BufferedReader(new FileReader("data.txt")) while ( { isRunning }) { val line = bufferedReader.readLine if (!StringUtils.isBlank(line)) { ctx.collect(line) } TimeUnit.SECONDS.sleep(1) } } override def cancel(): Unit = { isRunning = false } } val dataStream = env.addSource(new SourceFromFile()).print()
六.总结
结合上一篇 Flink / Scala - DataSource 之 DataSet 获取数据总结,Flink 两种数据结构的获取 - DataSet / DataStream 就都介绍完了,作为流式处理引擎,Flink 更擅长于处理 DataStream 流式数据,后续也会介绍更多的流式数据处理方法。