Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
0. 处理无界和有界数据
任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。
数据可以被作为 无界 或者 有界 流来处理。
无界流
有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
有界流
有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理
Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。
1. Flink程序和数据流图
如图所示,Flink程序分为三大部分,第1部分读取数据源(Source),第2部分对数据做转换操作(Transformation),第3部分将转换结果输出到一个目的地(Sink)。
代码中 sum()、flatMap()、keyBy()、timeWindow()
这些方法,是Flink提供给程序员的接口,程序员需要调用并实现这些函数,对数据进行操作,进而变成特定的业务逻辑。
通常一到多个函数会组成一个算子(Operator)、算子执行对数据的操作(Operation)。在WordCount的例子中,有以下3类算子。
Source
算子读取数据源中的数据,数据源可以是数据流,也可以存储在文件系统中的文件。
Transformation
算子对数据进行必要的计算处理。
Sink
算子将处理结果输出,数据一般被输出到数据库、文件系统或消息队列。
:::info
我们先对这个程序中各个函数做一个简单的介绍,关于这些函数的具体使用方式将在后文中详细说明。
:::
(1)flatMap()
对输入进行处理,生成零到多个输出。本例中它执行一个简单的分词过程,对一行字符串按照空格切分,生成一个(word,1)的Key-Value二元组。
(2) keyBy()
根据某个Key对数据重新分组。本例中是将二元组(word,1)中第一项作为Key进行分组,相同的单词会被分到同一组。
(3)timeWindow()
是时间窗口函数,用来界定对多长时间之内的数据做统计。
(4)sum()
为求和函数。sum(1)表示对二元组中第二个元素求和,因为经过前面的keyBy()算子将所有相同的单词都分到了一组,因此,在这个分组内,将单词出现的次数相加,就得到出现的总次数。
:::info
在程序实际执行前,Flink会将用户编写的代码做一个简单处理,生成一个如图所示的逻辑视图。
:::
上图展示了WordCount程序中,数据在不同算子间流动的情况。
图中,圆圈代表算子,圆圈间的空心箭头代表数据流,数据流在Flink程序中经过不同算子的计算,最终生成结果。
其中,keyBy()
、timeWindow()
和 sum()
共同组成了一个时间窗口上的聚合操作,被归结为一个算子 Window Aggregation
。我们可以在Flink的WebUI中,单击一个作业,查看这个作业的逻辑视图。
对于WordCount程序,逻辑上来讲无非是对数据流中的单词做提取,然后使用一个Key-Value二元组对单词做词频计数,最后输出结果即可。这样的逻辑本可以用几行代码完成,改成使用算子形式,反而让人看得一头雾水。
2. 为什么一定要用算子的形形式来写程序呢?
实际上,算子进化成当前这个形态,就像人类从石块计数,到手指计数、算盘计数,再到计算机计数这样的进化过程一样,尽管更低级的方式可以完成一定的计算任务,但是随着计算规模的增长,古老的计数方式存在着低效的弊端,无法完成更高级别和更大规模的计算需求。
试想,如果我们不使用大数据处理框架提供的算子,而是自己实现一套上述的计算逻辑,尽管我们可以快速完成当前的词频统计的任务,但是当面临一个新计算任务时,我们需要重新编写程序,完成一整套计算任务。我们自己编写代码的横向扩展性可能很差,当输入数据暴增时,我们需要做很大改动,以部署在更多节点上。
:::info
大数据框架的算子对计算做了一些抽象,对于人们来说有一定学习成本,而一旦掌握这门技术,人们所能处理的数据规模将成倍增加。
:::
算子的出现,正是针对大数据场景下,人们需要一种统一的计算描述语言来对数据做计算而进化出的新计算形态。基于Flink的算子,我们可以定义一个数据流的逻辑视图,以此完成对大数据的计算。剩下那些数据交换、横向扩展、故障恢复等问题可交由大数据框架来解决。
3. 从逻辑视图转化为物理执行图
在绝大多数的大数据处理场景下,一个节点无法处理所有数据,数据会被切分到多个节点上。在大数据领域,当数据量大到超过单个节点处理能力时,需要将一份数据切分到多个分区(Partition)上,每个分区分布在一台虚拟机或物理机上。
大数据框架的算子提供了编程接口,我们可以使用算子构建数据流的逻辑视图。考虑到数据分布在多个节点的情况,逻辑视图只是一种抽象,需要将逻辑视图转化为物理执行图,才能在分布式环境下执行。
如图所示为 WordCount
程序的物理执行图,数据流分布在2个分区上。空心箭头部分表示数据流分区,圆圈部分表示算子在分区上的算子子任务(Operator Subtask
)。从逻辑视图变为物理执行图后,FlatMap
算子在每个分区都有一个算子子任务,以处理该分区上的数据:FlatMap[1/2]
算子子任务处理第一个数据流分区上的数据,以此类推。
在分布式计算环境下,执行计算的单个节点(物理机或虚拟机)被称为实例,一个算子在并行执行时,算子子任务会分布到多个节点上,所以算子子任务又被称为算子实例(Instance)。即使输入数据增多,我们也可以通过部署更多的算子子任务来进行横向扩展。从图中可以看到,除去 Sink
外的算子都被分成了2个算子子任务,它们的并行度(Parallelism
)为2,
Sink
算子的并行度为1。并行度是可以被设置的,当设置某个算子的并行度为2时,也就意味着这个算子有2个算子子任务(或者说2个算子实例)并行执行。实际应用中一般根据输入数据量的大小、计算资源的多少等多方面的因素来设置并行度。
4. 数据交换策略
如上图中出现了数据流动的现象,即数据在不同的算子子任务上进行数据交换。无论是Hadoop、Spark还是Flink,都会涉及数据交换策略。常见的数据交换策略有4种,如下图所示。
- 前向传播(Forward):前一个算子子任务将数据直接传递给后一个算子子任务,数据不存在跨分区的交换,也避免了因数据交换产生的各类开销,前面 wordCount 中 Source 和 FlatMap 之间就是这样的情形。
- 按Key分组(Key-Based):数据以 (Key,Value) 二元组形式存在,该策略将所有数据按照Key进行分组,相同Key的数据会被分到一组、发送到同一个分区上。WordCount程序中,keyBy() 将单词作为Key,把相同单词都发送到同一分区,以方便后续算子的聚合统计。
- 广播(Broadcast):将某份数据发送到所有分区上,这种策略涉及了数据在全局的复制,因此非常消耗资源。
- 随机(Random):该策略将所有数据随机均匀地发送到多个分区上,以保证数据平均分配到不同分区上。该策略通常为了防止数据倾斜到某些分区,导致部分分区数据稀疏,另外一些分区数据拥堵的情况发生。
5. Flink分布式架构与核心组件
:::info
为了支持分布式执行,Flink跟其他大数据框架一样,采用了主从(Master-Worker)架构。
:::
Flink执行时主要包括如下两个组件。
Master 是一个 Flink 作业的主进程。它起到了协调管理的作用。
TaskManager,又被称为Worker或Slave,是执行计算任务的进程。它拥有CPU、内存等计算资源。Flink作业需要将计算任务分发到多个TaskManager上并行执行。下面将从作业执行层面来分析Flink各个模块如何工作。
1. Flink作业提交过程
Flink为适应不同的基础环境(Standalone集群、YARN、Kubernetes),在不断迭代开发过程中已经逐渐形成了一个兼容性很强的架构。不同的基础环境对计算资源的管理方式略有不同,不过都大同小异,图所示为以Standalone集群为例,分析作业的提交过程。Standalone模式指Flink独占该集群,集群上无其他任务。
在一个作业提交前,Master和TaskManager等进程需要先被启动。我们可以在Flink主目录中执行脚本来启动这些进程:bin/start-cluster.sh
。Master和TaskManager被启动后,TaskManager
需要将自己注册给 Master中的 ResourceManager。这个初始化和资源注册过程发生在单个作业提交前,我们称之为第0步。
① 用户编写应用程序代码,并通过Flink客户端(Client)提交作业。程序一般为Java或Scala语言,调用Flink API,构建逻辑视图。代码和相关配置文件被编译打包,被提交到Master的Dispatcher,形成一个应用作业(Application)。
② Dispatcher接收到这个作业,启动JobManager,这个JobManager会负责本次作业的各项协调工作。
③ JobManager向ResourceManager申请本次作业所需资源。
④ 由于在第0步中TaskManager已经向ResourceManager中注册了资源,这时闲置的TaskManager会被反馈给JobManager。
⑤ JobManager将用户作业中的逻辑视图转化为并行化的物理执行图,将计算任务分发部署到多个TaskManager上。至此,一个Flink作业就开始执行了。
TaskManager在执行计算任务过程中可能会与其他TaskManager交换数据,会使用一些数据交换策略。同时,TaskManager也会将一些任务状态信息反馈给JobManager,这些信息包括任务启动、执行或终止的状态,快照的元数据等。
6. Flink核心组件
有了这个作业提交流程,读者对各组件的功能应该有了更全面的认识,接下来我们再对涉及的各个组件进行更为详细的介绍。
1. Client
用户一般使用Client提交作业,比如Flink主目录下bin目录中提供的命令行工具。Client会对用户提交的Flink作业进行预处理,并把作业提交到Flink集群上。Client提交作业时需要配置一些必要的参数,比如使用Standalone集群还是YARN集群等。整个作业被打成了JAR包,DataStream API被转换成了JobGraph,JobGraph是一种逻辑视图。
2. Dispatcher
Dispatcher可以接收多个作业,每接收一个作业,Dispatcher都会为这个作业分配一个JobManager。Dispatcher对外提供 restful 接口,以 http 来对外提供服务。
3. JobManager
:::info
JobManager 是单个Flink作业的协调者,一个作业会有一个JobManager来负责。
:::
JobManager会将Client提交的JobGraph转化为ExecutionGraph,ExecutionGraph是并行的物理执行图。JobManager会向ResourceManager申请必要的资源,当获取足够的资源后,JobManager将ExecutionGraph以及具体的计算任务分发部署到多个TaskManager上。同时,JobManager还负责管理多个TaskManager,包括收集作业的状态信息、生成检查点、必要时进行故障恢复等。
早期,Flink Master被命名为JobManager,负责绝大多数Master进程的工作。随着迭代和开发,出现了名为JobMaster的组件,JobMaster负责单个作业的执行。本书中,我们仍然使用JobManager的概念,表示负责单个作业的组件。一些Flink文档也可能使用JobMaster的概念,读者可以将JobMaster等同于JobManager来看待。
4. ResourceManager
Flink现在可以部署在 Standalone、YARN或Kubernetes 等环境上,不同环境中对计算资源的管理模式略有不同,Flink使用一个名为 ResourceManager 的模块来统一处理资源分配上的问题。
:::info
在Flink中,计算资源的基本单位是TaskManager上的任务槽位(Task Slot,简称Slot)。
:::
ResourceManager 的职责主要是从YARN等资源提供方获取计算资源,当JobManager有计算需求时,将空闲的Slot分配给JobManager。当计算任务结束时,ResourceManager 还会重新收回这些Slot。
5.TaskManager
TaskManager 是实际负责执行计算的节点。一般地,一个Flink作业是分布在多个TaskManager上执行的,单个TaskManager上提供一定量的Slot。一个 TaskManager 启动后,相关Slot信息会被注册到 ResourceManager 中。当某个Flink作业提交后,ResourceManager 会将空闲的Slot提供给 JobManager。JobManager 获取到空闲的 Slot 后会将具体的计算任务部署到空闲 Slot 之上,任务开始在这些 Slot 上执行。在执行过程,由于要进行数据交换,TaskManager 还要和其他 TaskManager 进行必要的数据通信。
:::info
总之,TaskManager 负责具体计算任务的执行,启动时它会将 Slot 资源向 ResourceManager 注册。
:::