Flink / Scala - DataSource 之 DataStream 获取数据总结

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。下面将介绍 DataStream 的常见初始化方法。...

一.引言

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。

DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。

你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。和之前相同,一个 DataStrea 的处理主要包含 Source + Transformation + Sink 的组合:

image.gif编辑

Tips:

与之前不同的是,DataSet 的执行环境为:

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

image.gif

DataStreaming 的执行环境为:

val env = StreamExecutionEnvironment.getExecutionEnvironment

image.gif

 

二.FileBased 基于文件

这里大部分接口与 DataSet 类似,由于 env 的不同,得到的最终类型也不同,由 DataSet 变为了 DataStreaming

1.readTextFile(path)

读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

val textLines = env.readTextFile("path")

image.gif

2.readFile(fileInputFormat, path)

按照指定的文件输入格式读取(一次)文件。

class selfFileInputFormat() extends FileInputFormat[String] {
      override def reachedEnd(): Boolean = ???
      override def nextRecord(ot: String): String = ???
    }
    val dataStream = env.readFile(new selfFileInputFormat(), "")

image.gif

3.readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

上述两个方法是基于 API 直接调用的,底层调用函数为该函数,该方法它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据。

Tips:

在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

FileProcessingMode.PROCESS_CONTINUOUSLY

当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。

FileProcessingMode.PROCESS_ONCE

source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 自定义 TextFormat
    class selfFileInputFormat() extends FileInputFormat[String] {
      override def reachedEnd(): Boolean = ???
      override def nextRecord(ot: String): String = ???
    }
    // 隐函数 typeInfo
    implicit val typeInfo = TypeInformation.of(classOf[String])
    // 读取模式 watchType
    val watchType = FileProcessingMode.PROCESS_CONTINUOUSLY
    // 文件过滤 fileFilter 也可以加入过滤正在处理的文件逻辑
    val filePathFilter = new FilePathFilter {
      override def filterPath(filePath: Path): Boolean = {
        filePath.getPath.endsWith(".txt")
      }
    }
    val dataStream = env.readFile(new selfFileInputFormat(), "", watchType, 60L, filePathFilter)

image.gif

三.Collection-Based

1.fromCollection(Collection)

从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。当然,使用 scala 的转换后,scala 对应的 collection 也可以使用,这里使用方法和 DataSet 类似。

val dataStream: DataStream[String] = env.fromCollection(Array("spark", "flink"))

image.gif

2.fromCollection(Iterator, Class)

从迭代器获取,class 参数指定返回值返回元素的数据类型

val dataStream: DataStream[String] = env.fromCollection(Iterator("spark", "flink"))

image.gif

3.fromElements(T ...)

从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

val dataStream: DataStream[String] = env.fromElements("spark", "flink")

image.gif

4.fromParellelCollection(SplittableIterator, Class)

从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。

val itSequence = new NumberSequenceIterator(0, 100)
    val dataStream = env.fromParallelCollection(itSequence)

image.gif

5.generateSequence(from, to)

val numbers = env.generateSequence(1, 10000000)

image.gif

四.Socket-Based

从 Socket 读取。元素可以由分隔符分隔。

1.启动 Socket

在本地 terminal 执行下列语句并输入一下字符:

nc -lk 9999

image.gif

image.gif编辑

 

2.读取 Socket

val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)
      val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)
    counts.print()
    env.execute("Window Stream WordCount")

image.gif

上面是使用 keyBy 对 5S 滚动窗口内的单词进行 wordCount 的实例,下面是输出结果:

3> (hello,1)
5> (world,1)

image.gif

持续输入一些单词程序会每5s统计一个窗口内的 wordCount。

五.AddSource

1.官方 API

上一边文章提到了 Flink 支持的外部 API 以及对应支持的运行方式,下述 Connector 类别中支持 source 的均可以调用官方 API 和 Maven 依赖进行数据读取与加载生成 DataStream。

Connector 类别 支持方式
Apache Kafka source/sink
Apache Cassandra sink
Amazon Kinesis Streams source/sink
Elasticsearch sink
FileSystem sink
RabbitMQ source/sink
Google PubSub source/sink
Hybrid Source  source
Apache NiFi  source/sink
Apache Pulsar source
Twitter Streaming API source
JDBC sink

2. Self-Defined

自定义数据源需要继承 RichSourceFunction[T] 并定义数据类型 T, 主要实现 run 方法 - 获取数据与 cancel 方法 - 停止获取数据,这里和 Spark-Streaming 自定义 receiver,Storm 自定义实现 spout 类似,下面例子将以1s为间隔持续从文本中读取新内容并输出:

class SourceFromFile extends RichSourceFunction[String] {
      private var isRunning = true
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        val bufferedReader = new BufferedReader(new FileReader("data.txt"))
        while ( {
          isRunning
        }) {
          val line = bufferedReader.readLine
          if (!StringUtils.isBlank(line)) {
            ctx.collect(line)
          }
          TimeUnit.SECONDS.sleep(1)
        }
      }
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    val dataStream = env.addSource(new SourceFromFile()).print()

image.gif

 

六.总结

结合上一篇 Flink / Scala - DataSource 之 DataSet 获取数据总结,Flink 两种数据结构的获取 - DataSet / DataStream 就都介绍完了,作为流式处理引擎,Flink 更擅长于处理 DataStream 流式数据,后续也会介绍更多的流式数据处理方法。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
201 0
|
1月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
168 61
|
2月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
82 1
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
199 0
|
2月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
41 0
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
51 0
|
2月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
59 0
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1246 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎