Flink 数据源 DataSource是这个样子的?(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 因为本篇文章中,有个 Kafka 数据源的 Demo,在一开始解答小伙伴有可能的困惑:

为何要使用 Flink

因为本篇文章中,有个 Kafka 数据源的 Demo,在一开始解答小伙伴有可能的困惑:

Question:既然监听 Kafka 消息,为何不建立一个简单的消息消费者,使用简单的代码就能进行消息的消费?


Answer:在普通的消费者逻辑中,只能做到对传送过来的一条消息进行单条处理。而在 Flink 这个优秀的流计算框架中,能够使用窗口进行多样化处理。提供了窗口处理函数,可以对一段时间(例如 5s 内)或者一批(计数大小,例如 5 个一批)的数据进行计数或者 reduce 整合处理

还有 Flink 拥有状态管理,能够保存 checkpoint,如果程序出现错误,也能够之前的检查点恢复,继续程序的处理,于是拥有这些好处的优秀框架,希望小伙伴也加入进来,一起学习~

1、前言

接下来的几篇文章,都会围绕着下面这张图,整体上来说,就是 DataStreamAPI 编程的练习:16.jpg

分别是 SourceTransformationSink 进行逐一学习。


2、DataSource 介绍

直译:数据来源

计算引擎,不管是批出来还是流处理,最重要的是数据来源,根据源源不断的数据进行处理,加工成更有价值的数据。

Flink官方包中提供了如下基于集合、文件、套接字等 API然后第三方例如KafkaRabbitMq等也提供了方便的集成库

由于我们测试时,使用的是 StreamExecutionEnvironment.getExecutionEnvironment() 来获取流执行环境类进行操作,所以我们来看下这个类的返回类型是 DataStreamSource 的方法:

3、集合

集合数据源主要有三种:collectionelementgenerateSequence

  • fromCollection(Collection):接受的参数对象必须是同一类型的集合
  • fromCollection(Iterator<OUT>data, Class<OUT> type):第一个参数是迭代器,第二个参数是指定返回的类型
  • fromElements(Class<OUT> type, OUT... data):第一个参数是指定返回的类型,后面的是不定数量入参,可以输入多个 OUT 类型的对象
  • fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo, String operatorName)从一个可分离的迭代器中创建并行数据源。这个方法是 parallel 并行数据源的底层调用方法,typeInfo 是具体的类型信息,最后一个参数就是操作名字。这个并行数据源并没有测试过,等到之后回来补坑吧。
  • generateSequence(long, long):创建一个包含数字序列的新数据流。例如传进去是 1l 和 10l,那么数据源就是 [1-10]

测试代码如下:

DataSourceFromCollection.java


private static DataStreamSource<Student> collection1(StreamExecutionEnvironment env) {
    List<Student> studentList = Lists.newArrayList(
            new Student(1, "name1", 23, "address1"),
            new Student(2, "name2", 23, "address2"),
            new Student(3, "name3", 23, "address3")
    );
    return env.fromCollection(studentList);
}
private static DataStreamSource<Long> collection2(StreamExecutionEnvironment env) {
    return env.generateSequence(1, 20);
}

4、文件 File

从官方例子中,罗列了以下三个读取文件的方法,第一个返回的文本类型的数据源,第二个数据源是只读取一次文件,第三个方法参数比较多,文档中关于 watchType 观察类型介绍比较多,这里翻译自文档 Flink DataStream API Programming Guide


  • readTextFile(filePath):从 filePath读取文本数据源,文本类型是 TextInputFormat 以及字符串类型是 UTF-8,返回的是文本类型的数据源
  • readFile(fileInputFormat, path):根据指定的文件输入格式读取文件,只读取一次,不随着文本修改重新读取
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):这是前两个内部调用的方法。它根据给定的 fileInputFormat读取路径中的文件。**根据提供的 watchType 对数据源做不同的操作,FileProcessingMode.PROCESS_CONTINUOUSLY 模式下,会定期(每间隔 ms)监视新数据的路径,FileProcessingMode.PROCESS_ONCE 模式下,会一次处理当前路径中的数据并退出。**使用 pathFilter,用户可以进一步从处理文件中排除文件。


4.1、实现 IMPLEMENTATION:


在后台,Flink将文件读取过程分为两个子任务,即目录监视和数据读取。这些子任务中的每一个都是由单独的实体实现的。

监视由单个非并行(并行度= 1)任务实现,而读取由并行运行的多个任务执行。

后者的并行性等于作业并行性。单个监视任务的作用是扫描目录(根据 watchType 定期或仅扫描一次),查找要处理的文件,将它们划分为多个拆分,然后将这些拆分分配给下游读取器 (reader)。

readers 是实际读取到数据的角色。每一个分片 split 只能由一个 reader读取,但是一个 reader 可以读取多个分片 split


4.2、重要笔记 IMPORTANT NOTES:

  1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。 这可能会破坏“完全一次”的语义,因为在文件末尾附加数据将导致重新处理其所有内容。
  2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,则源将扫描路径一次并退出,而无需等待读取器完成文件内容的读取。 当然,读者将继续阅读,直到读取了所有文件内容。关闭源将导致在该点之后没有更多检查点。这可能导致节点故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

根据上诉两种情况,个人觉得如果用文件数据作为数据源进行测试,那么使用第二种观察模式 FileProcessingMode.PROCESS_ONCE,只扫描一次,避免修改文件后影响之前的计算结果。

DataSourceFromFile.java

// 简单的文字文件输入流
DataStreamSource<String> textFileSource = env.readTextFile(filePath);
// 指定格式和监听类型
Path pa = new Path(filePath);
TextInputFormat inputFormat = new TextInputFormat(pa);
DataStreamSource<String> complexFileSource =
                env.readFile(inputFormat, filePath, 
                FileProcessingMode.PROCESS_CONTINUOUSLY, 
                100L, 
                TypeExtractor.getInputFormatTypes(inputFormat));

5、套接字 Socket

  • socketTextStream:从套接字 socket 读取。元素可以由自定义分隔符 delimiter 进行分隔。

DataSourceFromSocket.java

// 监听端口号
DataStreamSource<String> source = env.socketTextStream("localhost", 9000);
// 定义分隔符
DataStreamSource<String> source = env.socketTextStream("localhost", 9000, "\\W+");

更具体例子可以参考上一篇 Hello World 例子

6、自定义 DataSource

从前面介绍中看到,Flink 提供了一个 addSource(SourceFunction<OUT>) 的方法,其中 SourceFunction 是实现自定义数据源的关键接口,而我们常用来扩展的是它的抽象子类 RichSourceFunction

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
消息中间件 关系型数据库 MySQL
Flink数据源问题之转换异常如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
135 2
|
7月前
|
SQL 消息中间件 关系型数据库
Flink数据源问题之读取mysql报错如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
117 0
|
7月前
|
消息中间件 SQL Kafka
Flink数据源问题之定时扫描key如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
|
7月前
|
存储 Oracle 关系型数据库
Flink CDC 数据源问题之连接释放冲突如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
173 0
|
7月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
144 3
|
7月前
|
关系型数据库 MySQL OLAP
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
消息中间件 SQL Kubernetes
实时计算 Flink版产品使用合集之多线程环境中,遇到 env.addSource 添加数据源后没有执行到 env.execut,是为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
45 0
|
5月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之如何对接Oracle数据源
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之是否支持异构数据源之间的数据映射关系
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章