因为公司用到大数据技术栈的缘故,之前也写过HBase,Spark等文章,公司离线用的是Spark,实时用的是Flink,所以这篇文章是关于Flink的,这篇文章对Flink的相关概念介绍的比较全面,希望对大家学习Flink能有所帮助。
Flink的一些概念和Spark非常像,看这篇文章之前,强烈建议翻看之前的Spark文章,这样学习Flink的时候能够举一反三,有助于理解。
流处理 & 批处理
事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流。在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流,流数据每输入一条数据,就有一次对应的输出。
批处理,也叫作离线处理。针对的是有界数据集,非常适合需要访问海量的全部数据才能完成的计算工作,一般用于离线统计。
流处理主要针对的是数据流,特点是无界、实时,对系统传输的每个数据依次执行操作,一般用于实时统计。
无界流Unbounded streams
无界流有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
有界流Bounded streams
有界流有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。所以在Flink里批计算其实指的就是有界流。
Flink的特点和优势
- 同时支持高吞吐、低延迟、高性能。
- 支持事件时间(Event Time)概念,结合Watermark处理乱序数据
- 支持有状态计算,并且支持多种状态内存、 文件、RocksDB。
- 支持高度灵活的窗口(Window) 操作time、 count、 session。
- 基于轻量级分布式快照(CheckPoint) 实现的容错保证Exactly- Once语义。
- 基于JVM实现独立的内存管理。
- Save Points (保存点)。
Flink VS Spark
Spark 和 Flink 在不同的应用领域上表现会有差别。一般来说,Spark 基于微批处理的方式做同步总有一个“攒批”的过程,所以会有额外开销,因此无法在流处理的低延迟上做到极致。在低延迟流处理场景,Flink 已经有明显的优势。而在海量数据的批处理领域,Spark 能够处理的吞吐量更大。
Spark Streaming的流计算其实是微批计算,实时性不如Flink,还有一点很重要的是Spark Streaming不适合有状态的计算,得借助一些存储如:Redis,才能实现。而Flink天然支持有状态的计算。
Flink API
Flink 本身提供了多层 API:
- Stateful Stream Processing 最低级的抽象接口是状态化的数据流接口(stateful streaming)。这个接口是通过 ProcessFunction 集成到 DataStream API 中的。该接口允许用户自由的处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户也可以通过注册 event time 和 processing time 处理回调函数的方法来实现复杂的计算。
- DataStream/DataSet API DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
- Table API Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁,可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
- SQL Flink 提供的最高层级的抽象是 SQL,这一层抽象在语法与表达能力上与 Table API 类似,SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。
Dataflows数据流图
所有的 Flink 程序都可以归纳为由三部分构成:Source、Transformation 和 Sink。
- Source 表示“源算子”,负责读取数据源。
- Transformation 表示“转换算子”,利用各种算子进行处理加工。
- Sink 表示“下沉算子”,负责数据的输出。
source数据源会源源不断的产生数据,transformation将产生的数据进行各种业务逻辑的数据处理,最终由sink输出到外部(console、kafka、redis、DB......)。
基于Flink开发的程序都能够映射成一个Dataflows。
当source数据源的数量比较大或计算逻辑相对比较复杂的情况下,需要提高并行度来处理数据,采用并行数据流。
通过设置不同算子的并行度, source并行度设置为2 , map也是2。代表会启动2个并行的线程来处理数据:
Flink基本架构
Flink系统架构中包含了两个角色,分别是JobManager和TaskManager,是一个典型的Master-Slave架构。JobManager相当于是Master,TaskManager相当于是Slave。
Job Manager & Task Manager
在Flink中,JobManager负责整个Flink集群任务的调度以及资源的管理。它从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlot资源并命令TaskManager启动从客户端中获取的应用。
TaskManager负责执行作业流的Task,并且缓存和交换数据流。在TaskManager中资源调度的最小单位是Task slot。TaskManager中Task slot的数量表示并发处理Task的数量。一台机器节点可以运行多个TaskManager 。
TaskManager会向JobManager发送心跳保持连接。
集群 & 部署
部署模式
Flink支持多种部署模式,包括本地模式、Standalone模式、YARN模式、Mesos模式和Kubernetes模式。
- 本地模式:本地模式是在单个JVM中启动Flink,主要用于开发和测试。它不需要任何集群管理器,但也不能跨多台机器运行。本地模式的优点是部署简单,缺点是不能利用分布式计算的优势。
- Standalone模式:Standalone模式是在一个独立的集群中运行Flink。它需要手动启动Flink集群,并且需要手动管理资源。Standalone模式的优点是部署简单,可以跨多台机器运行,缺点是需要手动管理资源。
- YARN模式:YARN模式是在Hadoop YARN集群中运行Flink。它可以利用YARN进行资源管理和调度。YARN模式的优点是可以利用现有的Hadoop集群,缺点是需要安装和配置Hadoop YARN,这是在企业中使用最多的方式。
- Mesos模式:Mesos模式是在Apache Mesos集群中运行Flink。它可以利用Mesos进行资源管理和调度。Mesos模式的优点是可以利用现有的Mesos集群,缺点是需要安装和配置Mesos。
- Kubernetes模式:Kubernetes模式是在Kubernetes集群中运行Flink。它可以利用Kubernetes进行资源管理和调度。Kubernetes模式的优点是可以利用现有的Kubernetes集群,缺点是需要安装和配置Kubernetes。
每种部署模式都有其优缺点,选择哪种部署模式取决于具体的应用场景和需求。
Session、Per-Job和Application是Flink在YARN和Kubernetes上运行时的三种不同模式,它们不是独立的部署模式,而是在YARN和Kubernetes部署模式下的子模式。
- Session模式:在Session模式下,Flink集群会一直运行,用户可以在同一个Flink集群中提交多个作业。Session模式的优点是作业提交快,缺点是作业之间可能会相互影响。
- Per-Job模式:在Per-Job模式下,每个作业都会启动一个独立的Flink集群。Per-Job模式的优点是作业之间相互隔离,缺点是作业提交慢。
- Application模式:Application模式是在Flink 1.11版本中引入的一种新模式,它结合了Session模式和Per-Job模式的优点。在Application模式下,每个作业都会启动一个独立的Flink集群,但是作业提交快。
这三种模式都可以在YARN和Kubernetes部署模式下使用。
提交作业流程
- Session 模式:
- 用户启动 Flink 会话,并连接到 Flink 集群。
- 用户使用 CLI 或 Web UI 提交作业,提交的作业被发送到 Flink 集群的 JobManager。
- JobManager 接收作业后,会对作业进行解析和编译,生成作业图(JobGraph)。
- 生成的作业图被发送到 JobManager 的调度器进行调度。
- 调度器将作业图划分为任务并将其分配给 TaskManager 执行。
- TaskManager 在其本地执行环境中运行任务。
- 在 Session 模式下,Flink 运行在交互式会话中,允许用户在一个 Flink 集群上连续地提交和管理多个作业。
- 用户可以通过 Flink 命令行界面(CLI)或 Web UI 进行交互。
- 提交流程如下:
- Per-Job 模式:
- 用户准备好作业程序和所需的配置文件。
- 用户使用 Flink 提供的命令行工具或编程 API 将作业程序和配置文件打包成一个作业 JAR 文件。
- 用户将作业 JAR 文件上传到 Flink 集群所在的环境(例如 Hadoop 分布式文件系统)。
- 用户使用 Flink 提供的命令行工具或编程 API 在指定的 Flink 集群上提交作业。
- JobManager 接收作业 JAR 文件并进行解析、编译和调度。
- 调度器将作业图划分为任务并将其分配给可用的 TaskManager 执行。
- TaskManager 在其本地执行环境中运行任务。
- 在 Per-Job 模式下,每个作业都会启动一个独立的 Flink 集群,用于执行该作业。
- 这种模式适用于独立的批处理或流处理作业,不需要与其他作业共享资源。
- 提交流程如下:
- Application 模式:
- 用户准备好应用程序程序和所需的配置文件。
- 用户使用 Flink 提供的命令行工具或编程 API 将应用程序程序和配置文件打包成一个应用程序 JAR 文件。
- 用户将应用程序 JAR 文件上传到 Flink 集群所在的环境(例如 Hadoop 分布式文件系统)。
- 用户使用 Flink 提供的命令行工具或编程 API 在指定的 Flink 集群上提交应用程序。
- JobManager 接收应用程序 JAR 文件并进行解析、编译和调度。
- 调度器将应用程序图划分为任务并将其分配给可用的 TaskManager 执行。
- TaskManager 在其本地执行环境中运行任务。
- Application 模式是 Flink 1.11 版本引入的一种模式,用于在常驻的 Flink 集群上执行多个应用程序。
- 在 Application 模式下,用户可以在运行中的 Flink 集群上动态提交、更新和停止应用程序。
- 提交流程如下:
配置开发环境
每个 Flink 应用都需要依赖一组 Flink 类库。Flink 应用至少需要依赖 Flink APIs。许多应用还会额外依赖连接器类库(比如 Kafka、Cassandra 等)。 当用户运行 Flink 应用时(无论是在 IDEA 环境下进行测试,还是部署在分布式环境下),运行时类库都必须可用
开发工具:IntelliJ IDEA
配置开发Maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.0</version> </dependency>
注意点:
- 如果要将程序打包提交到集群运行,打包的时候不需要包含这些依赖,因为集群环境已经包含了这些依赖,此时依赖的作用域应该设置为provided。
- Flink 应用在 IntelliJ IDEA 中运行,这些 Flink 核心依赖的作用域需要设置为 compile 而不是 provided 。 否则 IntelliJ 不会添加这些依赖到 classpath,会导致应用运行时抛出
NoClassDefFountError
异常。
添加打包插件:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!--不要拷贝 META-INF 目录下的签名, 否则会引起 SecurityExceptions 。 --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>my.programs.main.clazz</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
WordCount流批计算程序
配置好开发环境之后写一个简单的Flink程序。
实现:统计HDFS文件单词出现的次数
读取HDFS数据需要添加Hadoop依赖
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.5</version> </dependency>
批计算:
val env = ExecutionEnvironment.getExecutionEnvironment val initDS: DataSet[String] = env.readTextFile("hdfs://node01:9000/flink/data/wc") val restDS: AggregateDataSet[(String, Int)] = initDS.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) restDS.print()
流计算:
/** 准备环境 * createLocalEnvironment 创建一个本地执行的环境,local * createLocalEnvironmentWithWebUI 创建一个本地执行的环境,同时还开启Web UI的查看端口,8081 * getExecutionEnvironment 根据你执行的环境创建上下文,比如local cluster */ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) /** * DataStream:一组相同类型的元素 组成的数据流 */ val initStream:DataStream[String] = env.socketTextStream("node01",8888) val wordStream = initStream.flatMap(_.split(" ")) val pairStream = wordStream.map((_,1)) val keyByStream = pairStream.keyBy(0) val restStream = keyByStream.sum(1) restStream.print() //启动Flink 任务 env.execute("first flink job")
并行度
特定算子的子任务(subtask)的个数称之为并行度(parallel),并行度是几,这个task内部就有几个subtask。
怎样实现算子并行呢?其实也很简单,我们把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。
整个流处理程序的并行度,理论上是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量。
并行度设置
在 Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。
代码中设置
- 我们在代码中,可以很简单地在算子后跟着调用
setParallelism()
方法,来设置当前算子的并行度:stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
这种方式设置的并行度,只针对当前算子有效。 - 我们也可以直接调用执行环境的
setParallelism()
方法,全局设定并行度:env.setParallelism(2);
这样代码中所有算子,默认的并行度就都为 2 了。
提交应用时设置
在使用 flink run 命令提交应用时,可以增加 -p
参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置。如果我们直接在 Web UI 上提交作业,也可以在对应输入框中直接添加并行度。
配置文件中设置
我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:parallelism.default: 2(初始值为 1)
这个设置对于整个集群上提交的所有作业有效。
在开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数。
并行度生效优先级
- 对于一个算子,首先看在代码中是否单独指定了它的并行度,这个特定的设置优先级最高,会覆盖后面所有的设置。
- 如果没有单独设置,那么采用当前代码中执行环境全局设置的并行度。
- 如果代码中完全没有设置,那么采用提交时-p 参数指定的并行度。
- 如果提交时也未指定-p 参数,那么采用集群配置文件中的默认并行度。
这里需要说明的是,算子的并行度有时会受到自身具体实现的影响。比如读取 socket 文本流的算子 socketTextStream,它本身就是非并行的 Source 算子,所以无论怎么设置,它在运行时的并行度都是 1。