带你读《Flink原理、实战与性能优化》之二:环境准备

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 这是一部以实战为导向,能指导读者零基础掌握Flink并快速完成进阶的著作,从功能、原理、实战和调优等4个维度循序渐进地讲解了如何利用Flink进行分布式流式应用开发。作者是该领域的资深专家,现就职于第四范式,曾就职于明略数据。

点击查看第一章
点击查看第三章

第2章

环境准备
本章主要介绍Flink在使用前的环境安装准备,包括必须依赖的环境以及相应的参数,首先从不同运行环境进行介绍,包括本地调试环境、Standalone集群环境,以及在On Yarn环境上。另外介绍Flink自带的Template模板,如何通过该项目模板本地运行代码环境的直接生成,而不需要用户进行配置进行大量的开发环境配置,节省了开发的时间成本。最后介绍Flink源码编译相关的事项,通过对源码进行编译,从而对整个Flink计算引擎有更深入的理解。

2.1 运行环境介绍

Flink执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarn或Mesos等不同的资源管理器部署自己的应用。
环境依赖
(1)JDK环境
Flink核心模块均使用Java开发,所以运行环境需要依赖JDK,本书暂不详细介绍JDK安装过程,用户可以根据官方教程自行安装,其中包括Windows和Linux环境安装,需要注意的是JDK版本需要保证在1.8以上。
(2)Scala环境
如果用户选择使用Scala作为Flink应用开发语言,则需要安装Scala执行环境,Scala环境可以通过本地安装Scala执行环境,也可以通过Maven依赖Scala-lib来引入。
(3)Maven编译环境
Flink的源代码目前仅支持通过Maven进行编译,所以如果需要对源代码进行编译,或通过IDE开发Flink Application,则建议使用Maven作为项目工程编译方式。Maven的具体安装方法这里不再赘述。
需要注意的是,Flink程序需要Maven的版本在3.0.4及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
(4)Hadoop环境
对于执行在Hadoop Yarn资源管理器的Flink应用,则需要配置对应的Hadoop环境参数。目前Flink官方提供的版本支持hadoop2.4、2.6、2.7、2.8等主要版本,所以用户可以在这些版本的Hadoop Yarn中直接运行自己的Flink应用,而不需要考虑兼容性的问题。

2.2 Flink项目模板

Flink为了对用户使用Flink进行应用开发进行简化,提供了相应的项目模板来创建开发项目,用户不需要自己引入相应的依赖库,就能够轻松搭建开发环境,前提是在JDK(1.8及以上)和Maven(3.0.4及以上)的环境已经安装好且能正常执行。在Flink项目模板中,Flink提供了分别基于Java和Scala实现的模板,下面就两套项目模板分别进行介绍和应用。

2.2.1 基于Java实现的项目模板

1. 创建项目
创建模板项目的方式有两种,一种方式是通过Maven archetype命令进行创建,另一种方式是通过Flink提供的Quickstart Shell脚本进行创建,具体实例说明如下。
□通过Maven Archetype进行创建:

image.png

通过以上Maven命令进行项目创建的过程中,命令会交互式地提示用户对项目的groupId、artifactId、version、package等信息进行定义,且部分选项具有默认值,用户直接回车即可,如图2-1所示。我们创建了实例项目成功之后,客户端会提示用户项目创建成功,且在当前路径中具有相应创建的Maven项目。

image.png

□通过quickstart脚本创建:

image.png

通过以上脚本可以比较简单地创建项目,执行后项目会自动生成,但是项目的名称和一些GAV信息都是自动生成的,用户不能进行交互式重新定义,其中的项目名称为quickstart,gourpid为org.myorg.quickstart,version为0.1。这种方式对于Flink入门相对比较适合,其他有一定基础的情况下,则不建议使用这种方式进行项目创建。
注意:在Maven 3.0以上的版本中,DarchetypeCatalog配置已经从命令行中移除,需要用户在Maven Settings中进行配置,或者直接将该选项移除,否则可能造成不能生成Project的错误。

2. 检查项目
对于使用quickstart curl命令创建的项目,我们可以看到的项目结构如代码清单2-1所示,如果用户使用Maven Archetype,则可以自己定义对应的artifactId等信息。

image.png

从上述项目结构可以看出,该项目已经是一个相对比较完善的Maven项目,其中创建出来对应的Java实例代码,分别是BatchJob.java和Streaming.java两个文件,分别对应Flink批量接口DataSet的实例代码和流式接口DataStream的实例代码。在创建好上述项目后,建议用户将项目导入到IDE进行后续开发,Flink官网推荐使用的是Intellij IDEA或者Eclipse进行项目开发,具体的开发环境配置可以参考下一节中的介绍。
3. 编译项目
项目经过上述步骤创建后,可以使用Maven Command命令mvn clean package对项目进行编译,编译完成后在项目同级目录会生成target/-.jar,则该可执行Jar包就可以通过Flink命令或者Web客户端提交到集群上执行。
注意: 通过Maven创建Java应用,用户可以在Pom中指定Main Class,这样提交执行过程中就具有默认的入口Main Class,否则需要用户在执行的Flink App的Jar应用中指定Main Class。

4. 开发应用
在项目创建和检测完成后,用户可以选择在模板项目中的代码上编写应用,也可以定义Class调用DataSet API或DataStream API进行Flink应用的开发,然后通过编译打包,上传并提交到集群上运行。具体应用的开发读者可以参考后续章节。

2.2.2 基于Scala实现的项目模板

Flink在开发接口中同样提供了Scala的接口,用户可以借助Scala高效简洁的特性进行Flink App的开发。在创建项目的过程中,也可以像上述Java一样创建Scala模板项目,而在Scala项目中唯一的区别就是可以支持使用SBT进行项目的创建和编译,以下实例,将从SBT和Maven两种方式进行介绍。
1. 创建项目
(1)创建Maven项目
1)使用Maven archetype进行项目创建
代码清单2-2是通过Maven archetype命令创建Flink Scala版本的模板项目,其中项目相关的参数同创建Java项目一样,需要通过交互式的方式进行输入,用户可以指定对应的项目名称、groupid、artifactid以及version等信息。

image.png


执行完上述命令之后,会显示如图2-2所示的提示,表示项目创建成功,可以进行后续操作。同时可以在同级目录中看到已经创建好的Scala项目模板,其中包括了两个Scala后缀的文件。
2)使用quickstart curl脚本创建
如上节所述,在创建Scala项目模板的过程中,也可以通过quickstart curl脚本进行创建,这种方式相对比较简单,只要执行以下命令即可:

image.png
image.png


执行上述命令后就能在路径中看到相应的quickstart项目生成,其目录结构和通过Maven archetype创建的一致,只是不支持修改项目的GAV信息。
(2)创建SBT项目
在使用Scala接口开发Flink应用中,不仅可以使用Maven进行项目的编译,也可以使用SBT(Simple Build Tools)进行项目的编译和管理,其项目结构和Maven创建的项目结构有一定的区别。可以通过SBT命令或者quickstart脚本进行创建SBT项目,具体实现方式如下:
1)使用SBT命令创建项目

image.png

执行上述命令后,会在客户端输出创建成功的信息,表示项目创建成功,同时在同级目录中生成创建的项目,其中包含两个Scala的实例代码供用户参考。
2)使用quickstart curl脚本创建项目
可以通过使用以下指令进行项目创建Scala项目:

image.png

注意: 如果项目编译方式选择SBT,则需要在环境中提前安装SBT编译器,同时版本需要在0.13.13以上,否则无法通过上述方式进行模板项目的创建,具体的安装教程可以参考SBT官方网站https://www.scala-sbt.org/download.html 进行下载和安装。

2. 检查项目
对于使用Maven archetype创建的Scala项目模板,其结构和Java类似,在项目中增加了Scala的文件夹,且包含两个Scala实例代码,其中一个是实现DataSet接口的批量应用实例BatchJob,另外一个是实现DataStream接口的流式应用实例StreamingJob,如代码清单2-3所示。

image.png

3. 编译项目
1)使用Maven编译
进入到项目路径中,然后通过执行mvn clean package命令对项目进行编译,编译完成后产生target/-.jar。
2)使用Sbt编译
进入到项目路径中,然后通过使用sbt clean assembly对项目进行编译,编译完成后再产生target/scala_your-major-scala-version/project-name-assembly-0.1-SNAPSHOT.jar。
4. 开发应用
在项目创建和检测完成后,用户可以选择在Scala项目模板的代码上编写应用,也可以定义Class调用DataSet API或DataStream API进行Flink应用的开发,然后通过编译打包,上传并提交到集群上运行。

2.3 Flink开发环境配置

我们可以选择IntelliJ IDEA或者Eclipse作为Flink应用的开发IDE,但是由于Eclipse本身对Scala语言支持有限,所以Flink官方还是建议用户能够使用IntelliJ IDEA作为首选开发的IDE,以下将重点介绍使用IntelliJ IDEA进行开发环境的配置。

2.3.1 下载IntelliJ IDEA IDE

用户可以通过IntelliJ IDEA官方地址下载安装程序,根据操作系统选择相应的程序包进行安装。安装方式和安装包请参考https://www.jetbrains.com/idea/download/

2.3.2 安装Scala Plugins

对于已经安装好的IntelliJ IDEA默认是不支持Scala开发环境的,如果用户选择使用Scala作为开发语言,则需要安装Scala插件进行支持。以下说明在IDEA中进行Scala插件的安装:
□打开IDEA IDE后,在IntelliJ IDEA菜单栏中选择 Preferences选项,然后选择Plugins子选项,最后在页面中选择Browser Repositories,在搜索框中输入Scala进行检索;
□在检索出来的选项列表中选择和安装Scala插件,如图2-3所示;
□点击安装后重启IDE,Scala编程环境即可生效。

image.png

2.3.3 导入Flink应用代码

开发环境配置完毕之后,下面就可以将2.3.2节中创建好的项目导入到IDE中,具体步骤如下所示:
□启动IntelliJ IDEA,选择File→Open,在文件选择框中选择创建好的项目(quickstart),点击确定,IDEA将自动进行项目的导入;
□如果项目中提示没有SDK,可以选择File→Project Structure,在SDKS选项中选择安装好的JDK路径,添加Scala SDK路径,如果系统没有安装Scala SDK,也可以通过Maven Dependence将scala-lib引入;
□项目正常导入之后,程序状态显示正常,可以通过mvn clean package或者直接通过IDE中自带的工具Maven编译方式对项目进行编译;
完成了在IntelliJ IDEA中导入Flink项目,接下来用户就可以开发Flink应用了。

2.3.4 项目配置

对于通过项目模板生成的项目,项目中的主要参数配置已被初始化,所以无须额外进行配置,如果用户通过手工进行项目的创建,则需要创建Flink项目并进行相应的基础配置,包括Maven Dependences、Scala的Version等配置信息。
1. Flink基础依赖库
对于Java版本,需要在项目的pom.xml文件中配置如代码清单2-4所示的依赖库,其中flink-java和flink-streaming-java分别是批量计算DataSet API和流式计算DataStream API的依赖库,{flink.version}是官方的发布的版本号,用户可根据自身需要进行选择,本书中所有的实例代码都是基于Flink 1.7版本开发。

image.png

创建Scala版本Flink项目依赖库配置如下,和Java相比需要指定scala的版本信息,目前官方建议的是使用Scala 2.11,如果需要使用特定版本的Scala,则要将源码下载进行指定Scala版本编译,否则Scala各大版本之间兼容性较弱会导致应用程序在实际环境中无法运行的问题。Flink 基于Scala语言项目依赖配置库如代码清单2-5所示:

image.png

另外在上述Maven Dependences配置中,核心的依赖库配置的Scope为provided,主要目的是在编译阶段能够将依赖的Flink基础库排除在项目之外,当用户提交应用到Flink集群的时候,就避免因为引入Flink基础库而导致Jar包太大或类冲突等问题。而对于Scope配置成provided的项目可能出现本地IDE中无法运行的问题,可以在Maven中通过配置Profile的方式,动态指定编译部署包的scope为provided,本地运行过程中的scope为compile,从而解决本地和集群环境编译部署的问题。
注意:由于Flink在最新版本中已经不再支持scala 2.10的版本,建议读者使用scala 2.11,同时Flink将在未来的新版本中逐渐支持Scala 2.12。

2. Flink Connector和Lib依赖库
除了上述Flink项目中应用开发必须依赖的基础库之外,如果用户需要添加其他依赖,例如Flink中內建的Connector,或者其他第三方依赖库,需要在项目中添加相应的Maven Dependences,并将这些Dependence的Scope需要配置成compile。
如果项目中需要引入Hadoop相关依赖包,和基础库一样,在打包编译的时候将Scope注明为provided,因为Flink集群中已经将Hadoop依赖包添加在集群的环境中,用户不需要再将相应的Jar包打入应用中,否则容易造成Jar包冲突。
注意:对于有些常用的依赖库,为了不必每次都要上传依赖包到集群上,用户可以将依赖的包可以直接上传到Flink安装部署路径中的lib目录中,这样在集群启动的时候就能够将依赖库加载到集群的ClassPath中,无须每次在提交任务的时候上传依赖的Jar包。

2.4 运行Scala REPL

和Spark Shell一样,Flink也提供了一套交互式解释器(Scala-Shell),用户能够在客户端命令行交互式编程,执行结果直接交互式地显示在客户端控制台上,不需要每次进行编译打包在集群环境中运行,目前该功能只支持使用Scala语言进行程序开发。另外需要注意的是在开发或者调试程序的过程中可以使用这种方式,但在正式的环境中则不建议使用。

2.4.1 环境支持

用户可以选择在不同的环境中启动Scala Shell,目前支持Local、Remote Cluster 和Yarn Cluster模式,具体命令可以参考以下说明:
□通过start-scala-shell.sh启动本地环境;
bin/start-scala-shell.sh local
□可以启动远程集群环境,指定远程Flink集群的hostname和端口号;
bin/start-scala-shell.sh remote
□启动Yarn集群环境,环境中需要含有hadoop客户端配置文件;
bin/start-scala-shell.sh yarn -n 2

2.4.2 运行程序

启动Scala Shell交互式解释器后,就可以进行Flink流式应用或批量应用的开发。需要注意的是,Flink已经在启动的执行环境中初始化好了相应的Environment,分别使用“benv”和“senv”获取批量计算环境和流式计算环境,然后使用对应环境中的API开发Flink应用。以下代码实例分别是用批量和流式实现WordCount的应用,读者可以直接在启动Flink Scala Shell客户端后执行并输出结果。
□通过Scala-Shell运行批量计算程序,调用benv完成对单词数量的统计。

image.png

□通过Scala-Shell运行流式计算,调用senv完成对单词数量的统计。

image.png

注意:用户在使用交互式解释器方式进行应用开发的过程中,流式作业和批量作业中的一些操作(例如写入文件)并不会立即执行,而是需要用户在程序的最后执行env.execute(“appname”)命令,这样整个程序才能触发运行。

2.5 Flink源码编译

对于想深入了解Flink源码结构和实现原理的读者,可以按照本节的内容进行Flink源码编译环境的搭建,完成Flink源码的编译,具体操作步骤如下所示。
Flink源码可以从官方 Git Repository上通过git clone命令下载:

image.png

读者也可以通过官方镜像库手动下载,下载地址为https://archive.apache.org/dist/flink/
用户根据需要选择需要编译的版本号,下载代码放置在本地路径中,然后通过如下Maven命令进行编译,需要注意的是,Flink源码编译依赖于JDK和Maven的环境,且JDK必须在1.8版本以上,Maven必须在3.0版本以上,否则会导致编译出错。

image.png

(1)Hadoop版本指定
Flink镜像库中已经编译好的安装包通常对应的是Hadoop的主流版本,如果用户需要指定Hadoop版本编译安装包,可以在编译过程中使用-Dhadoop.version参数指定Hadoop版本,目前Flink支持的Hadoop版本需要在2.4以上。

image.png

如果用户使用的是供应商提供的Hadoop平台,如Cloudera的CDH等,则需要根据供应商的系统版本来编译Flink,可以指定-Pvendor-repos参数来激活类似于Cloudera的Maven Repositories,然后在编译过程中下载依赖对应版本的库。

image.png

(2)Scala版本指定
Flink中提供了基于Scala语言的开发接口,包括DataStream API、DataSet API、SQL等,需要在代码编译过程中指定Scala的版本,因为Scala版本兼容性相对较弱,因此不同的版本之间的差异相对较大。目前Flink最近的版本基本已经支持Scala-2.11,不再支持Scala-2.10的版本,在Flink 1.7开始支持Scala-2.12版本,社区则建议用户使用Scala-2.11或者Scala-2.12的Scala环境。

2.6 本章小结

本章介绍了在使用Flink应用程序开发之前必备的环境要求,分别对基础环境依赖例如JDK、Maven等进行说明。通过借助于Flink提供的项目模板创建不同编程语言的Flink 应用项目。在2.3节中介绍了如何构建Flink开发环境,以及如何选择合适的IDE和项目配置。在2.4节中介绍了Flink交互式编程客户端Scala REPL,通过使用交互式客户端编写和执行Flink批量和流式应用代码。2.5节介绍了Flink源码编译和打包,编译中指定不同的Hadoop版本以及Scala版本等。在第3章将重点介绍Flink编程模型,其中包括Flink程序基本构成,以及Flink所支持的数据类型等。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
177 3
|
1月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
114 3
|
1月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之环境只能拿到全量数据,无法获取增量数据,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL API 流计算
实时计算 Flink版产品使用合集之在Mac M1下的Docker环境中开启SQL Server代理的操作步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
164 1
|
1月前
|
消息中间件 SQL Kubernetes
实时计算 Flink版产品使用合集之多线程环境中,遇到 env.addSource 添加数据源后没有执行到 env.execut,是为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 数据采集 监控
14个Flink SQL性能优化实践分享
本文档详细列举了Apache Flink SQL的性能调优策略。主要关注点包括:增加数据源读取并行度、优化状态管理(如使用RocksDB状态后端并设置清理策略)、调整窗口操作以减少延迟、避免类型转换和不合理的JOIN操作、使用广播JOIN、注意SQL查询复杂度、控制并发度和资源调度、自定义源码实现、执行计划分析、异常检测与恢复、监控报警、数据预处理与清洗、利用高级特性(如容器化部署和UDF)以及数据压缩与序列化。此外,文档还强调了任务并行化、网络传输优化、系统配置调优、数据倾斜处理和任务调度策略。通过这些方法,可以有效解决性能问题,提升Flink SQL的运行效率。
|
1月前
|
SQL 资源调度 监控
Flink SQL性能优化实践
Apache Flink流处理性能优化指南:探索数据源读取并行度、状态管理、窗口操作的优化策略,包括设置默认并行度、使用RocksDB状态后端、调整窗口大小。调优方法涉及数据源分区、JOIN条件优化、使用Broadcast JOIN。注意SQL复杂度、并发控制与资源调度,如启用动态资源分配。源码层面优化自定义Source和Sink,利用执行计划分析性能瓶颈。异常检测与恢复通过启用检查点,监控任务性能。预处理数据、使用DISTINCT去重,结合UDF提高效率。选择高效序列化框架和启用数据压缩,优化网络传输和系统配置。处理数据倾斜,均衡数据分布,动态调整资源和任务优先级,以提升整体性能。
59 2
|
1月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之在oracle cdc2.3 + flink1.7环境下只能初始化同步数据,但后续Oracle的增删改查无法同步出去,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
运维 监控 Java
面经:Storm实时计算框架原理与应用场景
【4月更文挑战第11天】本文是关于Apache Storm实时流处理框架的面试攻略和核心原理解析。文章分享了面试常见主题,包括Storm的架构与核心概念(如Spout、Bolt、Topology、Tuple和Ack机制),编程模型与API,部署与运维,以及应用场景与最佳实践。通过代码示例展示了如何构建一个简单的WordCountTopology,强调理解和运用Storm的关键知识点对于面试和实际工作的重要性。
77 4
面经:Storm实时计算框架原理与应用场景
|
1月前
|
存储 NoSQL 分布式数据库
【Flink】Flink分布式快照的原理是什么?
【4月更文挑战第21天】【Flink】Flink分布式快照的原理是什么?