全网最详细4W字Flink入门笔记(上) 1

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 全网最详细4W字Flink入门笔记(上)

因为公司用到大数据技术栈的缘故,之前也写过HBase,Spark等文章,公司离线用的是Spark,实时用的是Flink,所以这篇文章是关于Flink的,这篇文章对Flink的相关概念介绍的比较全面,希望对大家学习Flink能有所帮助。

Flink的一些概念和Spark非常像,看这篇文章之前,强烈建议翻看之前的Spark文章,这样学习Flink的时候能够举一反三,有助于理解。

流处理 & 批处理

事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流。在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流,流数据每输入一条数据,就有一次对应的输出

批处理,也叫作离线处理。针对的是有界数据集,非常适合需要访问海量的全部数据才能完成的计算工作,一般用于离线统计。

流处理主要针对的是数据流,特点是无界、实时,对系统传输的每个数据依次执行操作,一般用于实时统计。

无界流Unbounded streams

无界流有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。

有界流Bounded streams

有界流有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。所以在Flink里批计算其实指的就是有界流。

Flink的特点和优势

  • 同时支持高吞吐、低延迟、高性能。
  • 支持事件时间(Event Time)概念,结合Watermark处理乱序数据
  • 支持有状态计算,并且支持多种状态内存、 文件、RocksDB。
  • 支持高度灵活的窗口(Window) 操作time、 count、 session。
  • 基于轻量级分布式快照(CheckPoint) 实现的容错保证Exactly- Once语义。
  • 基于JVM实现独立的内存管理。
  • Save Points (保存点)。

Flink VS Spark

Spark 和 Flink 在不同的应用领域上表现会有差别。一般来说,Spark 基于微批处理的方式做同步总有一个“攒批”的过程,所以会有额外开销,因此无法在流处理的低延迟上做到极致。在低延迟流处理场景,Flink 已经有明显的优势。而在海量数据的批处理领域,Spark 能够处理的吞吐量更大。

Spark Streaming的流计算其实是微批计算,实时性不如Flink,还有一点很重要的是Spark Streaming不适合有状态的计算,得借助一些存储如:Redis,才能实现。而Flink天然支持有状态的计算

Flink API

Flink 本身提供了多层 API:

  • Stateful Stream Processing 最低级的抽象接口是状态化的数据流接口(stateful streaming)。这个接口是通过 ProcessFunction 集成到 DataStream API 中的。该接口允许用户自由的处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户也可以通过注册 event time 和 processing time 处理回调函数的方法来实现复杂的计算。
  • DataStream/DataSet API DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
  • Table API  Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁,可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
  • SQL Flink 提供的最高层级的抽象是 SQL,这一层抽象在语法与表达能力上与 Table API 类似,SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

Dataflows数据流图

所有的 Flink 程序都可以归纳为由三部分构成:Source、Transformation 和 Sink。

  • Source 表示“源算子”,负责读取数据源。
  • Transformation 表示“转换算子”,利用各种算子进行处理加工。
  • Sink 表示“下沉算子”,负责数据的输出。

source数据源会源源不断的产生数据,transformation将产生的数据进行各种业务逻辑的数据处理,最终由sink输出到外部(console、kafka、redis、DB......)。

基于Flink开发的程序都能够映射成一个Dataflows。

当source数据源的数量比较大或计算逻辑相对比较复杂的情况下,需要提高并行度来处理数据,采用并行数据流。

通过设置不同算子的并行度, source并行度设置为2 , map也是2。代表会启动2个并行的线程来处理数据:

Flink基本架构

Flink系统架构中包含了两个角色,分别是JobManager和TaskManager,是一个典型的Master-Slave架构。JobManager相当于是Master,TaskManager相当于是Slave。

Job Manager & Task Manager

在Flink中,JobManager负责整个Flink集群任务的调度以及资源的管理。它从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlot资源并命令TaskManager启动从客户端中获取的应用。

TaskManager负责执行作业流的Task,并且缓存和交换数据流。在TaskManager中资源调度的最小单位是Task slot。TaskManager中Task slot的数量表示并发处理Task的数量。一台机器节点可以运行多个TaskManager

TaskManager会向JobManager发送心跳保持连接

集群 & 部署

部署模式

Flink支持多种部署模式,包括本地模式、Standalone模式、YARN模式、Mesos模式和Kubernetes模式。

  • 本地模式:本地模式是在单个JVM中启动Flink,主要用于开发和测试。它不需要任何集群管理器,但也不能跨多台机器运行。本地模式的优点是部署简单,缺点是不能利用分布式计算的优势。
  • Standalone模式:Standalone模式是在一个独立的集群中运行Flink。它需要手动启动Flink集群,并且需要手动管理资源。Standalone模式的优点是部署简单,可以跨多台机器运行,缺点是需要手动管理资源。
  • YARN模式:YARN模式是在Hadoop YARN集群中运行Flink。它可以利用YARN进行资源管理和调度。YARN模式的优点是可以利用现有的Hadoop集群,缺点是需要安装和配置Hadoop YARN,这是在企业中使用最多的方式
  • Mesos模式:Mesos模式是在Apache Mesos集群中运行Flink。它可以利用Mesos进行资源管理和调度。Mesos模式的优点是可以利用现有的Mesos集群,缺点是需要安装和配置Mesos。
  • Kubernetes模式:Kubernetes模式是在Kubernetes集群中运行Flink。它可以利用Kubernetes进行资源管理和调度。Kubernetes模式的优点是可以利用现有的Kubernetes集群,缺点是需要安装和配置Kubernetes。

每种部署模式都有其优缺点,选择哪种部署模式取决于具体的应用场景和需求。

Session、Per-Job和Application是Flink在YARN和Kubernetes上运行时的三种不同模式,它们不是独立的部署模式,而是在YARN和Kubernetes部署模式下的子模式。

  • Session模式:在Session模式下,Flink集群会一直运行,用户可以在同一个Flink集群中提交多个作业。Session模式的优点是作业提交快,缺点是作业之间可能会相互影响。
  • Per-Job模式:在Per-Job模式下,每个作业都会启动一个独立的Flink集群。Per-Job模式的优点是作业之间相互隔离,缺点是作业提交慢。
  • Application模式:Application模式是在Flink 1.11版本中引入的一种新模式,它结合了Session模式和Per-Job模式的优点。在Application模式下,每个作业都会启动一个独立的Flink集群,但是作业提交快。

这三种模式都可以在YARN和Kubernetes部署模式下使用。

提交作业流程

  1. Session 模式:
  • 用户启动 Flink 会话,并连接到 Flink 集群。
  • 用户使用 CLI 或 Web UI 提交作业,提交的作业被发送到 Flink 集群的 JobManager。
  • JobManager 接收作业后,会对作业进行解析和编译,生成作业图(JobGraph)。
  • 生成的作业图被发送到 JobManager 的调度器进行调度。
  • 调度器将作业图划分为任务并将其分配给 TaskManager 执行。
  • TaskManager 在其本地执行环境中运行任务。
  • 在 Session 模式下,Flink 运行在交互式会话中,允许用户在一个 Flink 集群上连续地提交和管理多个作业。
  • 用户可以通过 Flink 命令行界面(CLI)或 Web UI 进行交互。
  • 提交流程如下:
  1. Per-Job 模式:
  • 用户准备好作业程序和所需的配置文件。
  • 用户使用 Flink 提供的命令行工具或编程 API 将作业程序和配置文件打包成一个作业 JAR 文件。
  • 用户将作业 JAR 文件上传到 Flink 集群所在的环境(例如 Hadoop 分布式文件系统)。
  • 用户使用 Flink 提供的命令行工具或编程 API 在指定的 Flink 集群上提交作业。
  • JobManager 接收作业 JAR 文件并进行解析、编译和调度。
  • 调度器将作业图划分为任务并将其分配给可用的 TaskManager 执行。
  • TaskManager 在其本地执行环境中运行任务。
  • 在 Per-Job 模式下,每个作业都会启动一个独立的 Flink 集群,用于执行该作业。
  • 这种模式适用于独立的批处理或流处理作业,不需要与其他作业共享资源。
  • 提交流程如下:
  1. Application 模式:
  • 用户准备好应用程序程序和所需的配置文件。
  • 用户使用 Flink 提供的命令行工具或编程 API 将应用程序程序和配置文件打包成一个应用程序 JAR 文件。
  • 用户将应用程序 JAR 文件上传到 Flink 集群所在的环境(例如 Hadoop 分布式文件系统)。
  • 用户使用 Flink 提供的命令行工具或编程 API 在指定的 Flink 集群上提交应用程序。
  • JobManager 接收应用程序 JAR 文件并进行解析、编译和调度。
  • 调度器将应用程序图划分为任务并将其分配给可用的 TaskManager 执行。
  • TaskManager 在其本地执行环境中运行任务。
  • Application 模式是 Flink 1.11 版本引入的一种模式,用于在常驻的 Flink 集群上执行多个应用程序。
  • 在 Application 模式下,用户可以在运行中的 Flink 集群上动态提交、更新和停止应用程序。
  • 提交流程如下:

配置开发环境

每个 Flink 应用都需要依赖一组 Flink 类库。Flink 应用至少需要依赖 Flink APIs。许多应用还会额外依赖连接器类库(比如 Kafka、Cassandra 等)。 当用户运行 Flink 应用时(无论是在 IDEA 环境下进行测试,还是部署在分布式环境下),运行时类库都必须可用

开发工具:IntelliJ IDEA

配置开发Maven依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-scala_2.11</artifactId>
  <version>1.10.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

注意点:

  • 如果要将程序打包提交到集群运行,打包的时候不需要包含这些依赖,因为集群环境已经包含了这些依赖,此时依赖的作用域应该设置为provided
  • Flink 应用在 IntelliJ IDEA 中运行,这些 Flink 核心依赖的作用域需要设置为 compile 而不是 provided 。 否则 IntelliJ 不会添加这些依赖到 classpath,会导致应用运行时抛出 NoClassDefFountError 异常。

添加打包插件:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!--不要拷贝 META-INF 目录下的签名,
                                否则会引起 SecurityExceptions 。 -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>my.programs.main.clazz</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

WordCount流批计算程序

配置好开发环境之后写一个简单的Flink程序。

实现:统计HDFS文件单词出现的次数

读取HDFS数据需要添加Hadoop依赖

<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>2.6.5</version>
</dependency>

批计算:

val env = ExecutionEnvironment.getExecutionEnvironment
val initDS: DataSet[String] = env.readTextFile("hdfs://node01:9000/flink/data/wc")
val restDS: AggregateDataSet[(String, Int)] = initDS.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
restDS.print()

流计算:

/** 准备环境
      * createLocalEnvironment 创建一个本地执行的环境,local
      * createLocalEnvironmentWithWebUI 创建一个本地执行的环境,同时还开启Web UI的查看端口,8081
      * getExecutionEnvironment 根据你执行的环境创建上下文,比如local  cluster
      */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    /**
      * DataStream:一组相同类型的元素 组成的数据流
      */
    val initStream:DataStream[String] = env.socketTextStream("node01",8888)
    val wordStream = initStream.flatMap(_.split(" "))
    val pairStream = wordStream.map((_,1))
    val keyByStream = pairStream.keyBy(0)
    val restStream = keyByStream.sum(1)
    restStream.print()
    //启动Flink 任务
    env.execute("first flink job")

并行度

特定算子的子任务(subtask)的个数称之为并行度(parallel),并行度是几,这个task内部就有几个subtask。

怎样实现算子并行呢?其实也很简单,我们把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。

整个流处理程序的并行度,理论上是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量

并行度设置

在 Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。

代码中设置

  • 我们在代码中,可以很简单地在算子后跟着调用 setParallelism()方法,来设置当前算子的并行度: stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);这种方式设置的并行度,只针对当前算子有效。
  • 我们也可以直接调用执行环境的 setParallelism()方法,全局设定并行度:env.setParallelism(2);这样代码中所有算子,默认的并行度就都为 2 了。

提交应用时设置

在使用 flink run 命令提交应用时,可以增加 -p 参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置。如果我们直接在 Web UI 上提交作业,也可以在对应输入框中直接添加并行度。

配置文件中设置

我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:parallelism.default: 2(初始值为 1)

这个设置对于整个集群上提交的所有作业有效。

在开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数

并行度生效优先级

  1. 对于一个算子,首先看在代码中是否单独指定了它的并行度,这个特定的设置优先级最高,会覆盖后面所有的设置。
  2. 如果没有单独设置,那么采用当前代码中执行环境全局设置的并行度。
  3. 如果代码中完全没有设置,那么采用提交时-p 参数指定的并行度。
  4. 如果提交时也未指定-p 参数,那么采用集群配置文件中的默认并行度。

这里需要说明的是,算子的并行度有时会受到自身具体实现的影响。比如读取 socket 文本流的算子 socketTextStream,它本身就是非并行的 Source 算子,所以无论怎么设置,它在运行时的并行度都是 1

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
280 3
|
Java Linux API
flink入门-流处理
flink入门-流处理
168 0
|
存储 Java Linux
10分钟入门Flink--安装
本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、Flink Standalone搭建、Flink Standalong HA搭建。
10分钟入门Flink--安装
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
184 0
|
8月前
|
存储 缓存 算法
[尚硅谷flink] 检查点笔记
[尚硅谷flink] 检查点笔记
212 3
|
8月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
|
8月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
8月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
1458 3
|
存储 缓存 分布式计算
Flink教程(02)- Flink入门(下)
Flink教程(02)- Flink入门(下)
129 0
|
SQL 消息中间件 API
Flink教程(02)- Flink入门(上)
Flink教程(02)- Flink入门(上)
225 0