为何要使用 Flink
因为本篇文章中,有个 Kafka
数据源的 Demo
,在一开始解答小伙伴有可能的困惑:
Question:既然监听 Kafka 消息,为何不建立一个简单的消息消费者,使用简单的代码就能进行消息的消费?
Answer:在普通的消费者逻辑中,只能做到对传送过来的一条消息进行单条处理。而在 Flink 这个优秀的流计算框架中,能够使用窗口进行多样化处理。提供了窗口处理函数,可以对一段时间(例如 5s 内)或者一批(计数大小,例如 5 个一批)的数据进行计数或者 reduce 整合处理
还有 Flink
拥有状态管理,能够保存 checkpoint
,如果程序出现错误,也能够之前的检查点恢复,继续程序的处理,于是拥有这些好处的优秀框架,希望小伙伴也加入进来,一起学习~
1、前言
接下来的几篇文章,都会围绕着下面这张图,整体上来说,就是 DataStreamAPI
编程的练习:
分别是 Source
、Transformation
和 Sink
进行逐一学习。
2、DataSource 介绍
直译:数据来源
计算引擎,不管是批出来还是流处理,最重要的是数据来源,根据源源不断的数据进行处理,加工成更有价值的数据。
Flink
官方包中提供了如下基于集合、文件、套接字等 API
,然后第三方例如Kafka
、RabbitMq
等也提供了方便的集成库。
由于我们测试时,使用的是 StreamExecutionEnvironment.getExecutionEnvironment()
来获取流执行环境类进行操作,所以我们来看下这个类的返回类型是 DataStreamSource
的方法:
3、集合
集合数据源主要有三种:collection
、element
和 generateSequence
。
- fromCollection(Collection):接受的参数对象必须是同一类型的集合
- fromCollection(Iterator
<
OUT>
data, Class<
OUT>
type):第一个参数是迭代器,第二个参数是指定返回的类型 - fromElements(Class
<
OUT>
type, OUT... data):第一个参数是指定返回的类型,后面的是不定数量入参,可以输入多个OUT
类型的对象 - fromParallelCollection(SplittableIterator
<
OUT>
iterator, TypeInformation<
OUT>
typeInfo, String operatorName):从一个可分离的迭代器中创建并行数据源。这个方法是parallel
并行数据源的底层调用方法,typeInfo
是具体的类型信息,最后一个参数就是操作名字。这个并行数据源并没有测试过,等到之后回来补坑吧。 - generateSequence(long, long):创建一个包含数字序列的新数据流。例如传进去是 1l 和 10l,那么数据源就是 [1-10]
测试代码如下:
DataSourceFromCollection.java
private static DataStreamSource<Student> collection1(StreamExecutionEnvironment env) { List<Student> studentList = Lists.newArrayList( new Student(1, "name1", 23, "address1"), new Student(2, "name2", 23, "address2"), new Student(3, "name3", 23, "address3") ); return env.fromCollection(studentList); } private static DataStreamSource<Long> collection2(StreamExecutionEnvironment env) { return env.generateSequence(1, 20); }
4、文件 File
从官方例子中,罗列了以下三个读取文件的方法,第一个返回的文本类型的数据源,第二个数据源是只读取一次文件,第三个方法参数比较多,文档中关于 watchType 观察类型介绍比较多,这里翻译自文档 Flink DataStream API Programming Guide
- readTextFile(filePath):从
filePath
读取文本数据源,文本类型是TextInputFormat
以及字符串类型是UTF-8
,返回的是文本类型的数据源 - readFile(fileInputFormat, path):根据指定的文件输入格式读取文件,只读取一次,不随着文本修改重新读取
- readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):这是前两个内部调用的方法。它根据给定的
fileInputFormat
读取路径中的文件。**根据提供的watchType
对数据源做不同的操作,FileProcessingMode.PROCESS_CONTINUOUSLY
模式下,会定期(每间隔 ms)监视新数据的路径,FileProcessingMode.PROCESS_ONCE
模式下,会一次处理当前路径中的数据并退出。**使用pathFilter
,用户可以进一步从处理文件中排除文件。
4.1、实现 IMPLEMENTATION:
在后台,Flink将文件读取过程分为两个子任务,即目录监视和数据读取。这些子任务中的每一个都是由单独的实体实现的。
监视由单个非并行(并行度= 1)任务实现,而读取由并行运行的多个任务执行。
后者的并行性等于作业并行性。单个监视任务的作用是扫描目录(根据 watchType
定期或仅扫描一次),查找要处理的文件,将它们划分为多个拆分,然后将这些拆分分配给下游读取器 (reader)。
readers
是实际读取到数据的角色。每一个分片 split
只能由一个 reader
读取,但是一个 reader
可以读取多个分片 split
。
4.2、重要笔记 IMPORTANT NOTES:
- 如果
watchType
设置为FileProcessingMode.PROCESS_CONTINUOUSLY
,则在修改文件时,将完全重新处理其内容。 这可能会破坏“完全一次”的语义,因为在文件末尾附加数据将导致重新处理其所有内容。 - 如果
watchType
设置为FileProcessingMode.PROCESS_ONCE
,则源将扫描路径一次并退出,而无需等待读取器完成文件内容的读取。 当然,读者将继续阅读,直到读取了所有文件内容。关闭源将导致在该点之后没有更多检查点。这可能导致节点故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。
根据上诉两种情况,个人觉得如果用文件数据作为数据源进行测试,那么使用第二种观察模式 FileProcessingMode.PROCESS_ONCE
,只扫描一次,避免修改文件后影响之前的计算结果。
DataSourceFromFile.java
// 简单的文字文件输入流 DataStreamSource<String> textFileSource = env.readTextFile(filePath); // 指定格式和监听类型 Path pa = new Path(filePath); TextInputFormat inputFormat = new TextInputFormat(pa); DataStreamSource<String> complexFileSource = env.readFile(inputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100L, TypeExtractor.getInputFormatTypes(inputFormat));
5、套接字 Socket
- socketTextStream:从套接字
socket
读取。元素可以由自定义分隔符delimiter
进行分隔。
DataSourceFromSocket.java
// 监听端口号 DataStreamSource<String> source = env.socketTextStream("localhost", 9000); // 定义分隔符 DataStreamSource<String> source = env.socketTextStream("localhost", 9000, "\\W+");
更具体例子可以参考上一篇 Hello World
例子
6、自定义 DataSource
从前面介绍中看到,Flink
提供了一个 addSource(SourceFunction<OUT>)
的方法,其中 SourceFunction
是实现自定义数据源的关键接口,而我们常用来扩展的是它的抽象子类 RichSourceFunction