Flink 集群安装部署和 HA 配置

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 我们在这一课时将讲解 Flink 常见的部署模式:本地模式、Standalone 模式和 Flink On Yarn 模式,然后分别讲解三种模式的使用场景和部署中常见的问题,最后将讲解在生产环境中 Flink 集群的高可用配置。

我们在这一课时将讲解 Flink 常见的部署模式:本地模式、Standalone 模式和 Flink On Yarn 模式,然后分别讲解三种模式的使用场景和部署中常见的问题,最后将讲解在生产环境中 Flink 集群的高可用配置。


Flink 常见的部署模式


环境准备


在绝大多数情况下,我们的 Flink 都是运行在 Unix 环境中的,推荐在 Mac OS 或者 Linux 环境下运行 Flink。如果是集群模式,那么可以在自己电脑上安装虚拟机,保证有一个 master 节点和两个 slave 节点。


同时,要注意在所有的机器上都应该安装 JDK 和 SSH。JDK 是我们运行 JVM 语言程序必须的,而 SSH 是为了在服务器之间进行跳转和执行命令所必须的。关于服务器之间通过 SSH 配置公钥登录,你可以直接搜索安装和配置方法,我们不做过度展开。


Flink 的安装包可以在这里下载。需要注意的是,如果你要和 Hadoop 进行集成,那么我们需要使用到对应的 Hadoop 依赖,下面将会详细讲解。


Local 模式


Local 模式是 Flink 提供的最简单部署模式,一般用来本地测试和演示使用。

我们在这里下载 Apache Flink 1.10.0 for Scala 2.11 版本进行演示,该版本对应 Scala 2.11 版本。


将压缩包下载到本地,并且直接进行解压,使用 Flink 默认的端口配置,直接运行脚本启动:

➜  [SoftWare]# tar -zxvf flink-1.10.0-bin-scala_2.11.tgz

微信图片_20220425232726.png


上图则为解压完成后的目录情况。


然后,我们可以直接运行脚本启动 Flink :

复制代码

➜  [flink-1.10.0]# ./bin/start-cluster.sh

微信图片_20220425232732.png


上图显示我们的 Flink 启动成功。


我们直接访问本地的 8081 端口,可以看到 Flink 的后台管理界面,验证 Flink 是否成功启动。

微信图片_20220425232735.png


可以看到 Flink 已经成功启动。当然,我们也可以查看运行日志来确认 Flink 是不是成功启动了,在 log 目录下有程序的启动日志:

微信图片_20220425232738.png


我们尝试提交一个测试任务:

复制代码

./bin/flink run examples/batch/WordCount.jar

微信图片_20220425232742.png


我们在控制台直接看到输出。同样,在 Flink 的后台管理界面 Completed Jobs 一栏可以看到刚才提交执行的程序:

微信图片_20220425232746.png


Standalone 模式


Standalone 模式是集群模式的一种,但是这种模式一般并不运行在生产环境中,原因和 on yarn 模式相比:


  • Standalone 模式的部署相对简单,可以支持小规模,少量的任务运行;


  • Stabdalone 模式缺少系统层面对集群中 Job 的管理,容易遭成资源分配不均匀;


  • 资源隔离相对简单,任务之间资源竞争严重。


我们在 3 台虚拟机之间搭建 standalone 集群:

微信图片_20220425232749.png


在 master 节点,将 Apache Flink 1.10.0 for Scala 2.11 包进行解压:

复制代码

➜  [SoftWare]# tar -zxvf flink-1.10.0-bin-scala_2.11.tgz


重点来啦,我们需要修改 Flink 的配置文件,并且将修改好的解压目录完整的拷贝到两个从节点中去。在这里,我强烈建议主节点和从节点的目录要保持一致。

我们修改 conf 目录下的 flink-conf.yaml:

微信图片_20220425232754.png


flink-conf.yaml 文件中有大量的配置参数,我们挑选其中必填的最基本参数进行修改:


复制代码

jobmanager.rpc.address: master
jobmanager.heap.size: 1024m
jobmanager.rpc.port: 6123
taskmanager.memory.process.size: 1568m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
io.tmp.dirs: /tmp


它们分别代表:

微信图片_20220425232757.png


如果你对其他的参数有兴趣的话,可以直接参考官网。接下来我们修改 conf 目录下的 master 和 slave 文件。vim master,将内容修改为:

master


vim slave,将内容修改为:

slave01
slave02


然后,将整个修改好的 Flink 解压目录使用 scp 远程拷贝命令发送到从节点:

scp -r /SoftWare/flink-1.10.0 slave01:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave02:/SoftWare/


在 master、slave01、slave02 上分别配置环境变量,vim /etc/profile,将内容修改为:

export FLINK_HOME=/SoftWare/flink-1.10.0
export PATH=$PATH:$FLINK_HOME/bin


到此为止,我们整个的基础配置已经完成,下面需要启动集群,登录 master 节点执行:

/SoftWare/flink-1.10.0/bin/start-cluster.sh


可以在浏览器访问:http://192.168.2.100:8081/ 检查集群是否启动成功。

集群搭建过程中,可能出现的问题:


  • 端口被占用,我们需要手动杀掉占用端口的程序;


  • 目录找不到或者文件找不到,我们在 flink-conf.yaml 中配置过 io.tmp.dirs ,这个目录需要手动创建。


On Yarn 模式和 HA 配置

微信图片_20220425232801.png


上图是 Flink on Yarn 模式下,Flink 和 Yarn 的交互流程。Yarn 是 Hadoop 三驾马车之一,主要用来做资源管理。我们在 Flink on Yarn 模式中也是借助 Yarn 的资源管理优势,需要在三个节点中配置 YARN_CONF_DIR、HADOOP_CONF_DIR、HADOOP_CONF_PATH 中的任意一个环境变量即可。


本课时中集群的高可用 HA 配置是基于独立的 ZooKeeper 集群。当然,Flink 本身提供了内置 ZooKeeper 插件,可以直接修改 conf/zoo.cfg,并且使用 /bin/start-zookeeper-quorum.sh 直接启动。


环境准备:


  • ZooKeeper-3.x


  • Flink-1.10.0


  • Hadoop-2.6.5


我们使用 5 台虚拟机搭建 on yarn 的高可用集群:

微信图片_20220425232804.png


如果你在使用 Flink 的最新版本 1.10.0 时,那么需要在本地安装 Hadoop 环境并进行下面的操作。


首先,添加环境变量:


vi /etc/profile
# 添加环境变量
export HADOOP_CONF_DIR=/Software/hadoop-2.6.5/etc/hadoop
# 环境变量生效
source /etc/profile


其次,下载对应的的依赖包,并将对应的 Hadoop 依赖复制到 flink 的 lib 目录下,对应的 hadoop 依赖可以在这里下载。

微信图片_20220425232807.png


与 standalone 集群不同的是,我们需要修改 flink-conf.yaml 文件中的一些配置:

high-availability: zookeeper
high-availability.storageDir: hdfs://cluster/flinkha/
high-availability.zookeeper.quorum: slave01:2181,slave02:2181,slave03:2181


它们分别代表:

微信图片_20220425232809.png


然后分别修改 master、slave、zoo.cfg 三个配置文件。


vim master,将内容修改为:


master01:8081
master02:8081


vim slave,将内容修改为:


slave01
slave02
slave03


vim zoo.cfg,将内容修改为:

server.1=slave01:2888:3888
server.2=slave02:2888:3888
server.3=slave03:2888:3888


然后,我们将整个修改好的 Flink 解压目录使用 scp 远程拷贝命令发送到从节点:

scp -r /SoftWare/flink-1.10.0 slave01:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave02:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave03:/SoftWare/


分别启动 Hadoop 和 ZooKeeper,然后在主节点,使用命令启动集群:

/SoftWare/flink-1.10.0/bin/start-cluster.sh


我们同样直接访问 http://192.168.2.100:8081/ 端口,可以看到 Flink 的后台管理界面,验证 Flink 是否成功启动。


在 Flink on yarn 模式下,启动集群的方式有两种:


  • 直接在 yarn 上运行任务


  • yarn session 模式


直接在 yarn 上运行任务相当于将 job 直接提交到 yarn 上,每个任务会根据用户的指定进行资源申请,任务之间互不影响。

./bin/flink run -yjm 1024m -ytm 4096m -ys 2  ./examples/batch/WordCount.jar


更多关于参数的含义,可以参考官网。使用 yarn session 模式,我们需要先启动一个 yarn-session 会话,相当于启动了一个 yarn 任务,这个任务所占用的资源不会变化,并且一直运行。我们在使用 flink run 向这个 session 任务提交作业时,如果 session 的资源不足,那么任务会等待,直到其他资源释放。当这个 yarn-session 被杀死时,所有任务都会停止。


例如我们启动一个 yarn session 任务,该任务拥有 8G 内存、32 个槽位。

./bin/yarn-session.sh -tm 8192 -s 32


我们在 yarn 的界面上可以看到这个任务的 ID,然后向这个 session ID 提交 Flink 任务:

./bin/flink run -m yarn-cluster -yid application_xxxx ./examples/batch/WordCount.jar


其中,application_xxxx 即为上述的 yarn session 任务 ID。


总结


本课时我们讲解了 Flink 的三种部署模式和高可用配置,并且对这三种部署模式的适用场景进行了讲解。在生产上,我们最常用的方式当然是 Flink on Yarn,借助 Yarn 在资源管理上的绝对优势,确保集群和任务的稳定。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
9天前
|
资源调度 监控 数据处理
【Flink】Flink集群有哪些角色?各自有什么作用?
【4月更文挑战第18天】【Flink】Flink集群有哪些角色?各自有什么作用?
|
1月前
|
Kubernetes 流计算 Perl
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
35 7
|
1月前
|
缓存 监控 Java
Flink CDC产品常见问题之flink集群jps命令报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
资源调度 Kubernetes Apache
部署Flink集群后没有资源可能有以下几个原因
【2月更文挑战第23天】 部署Flink集群后没有资源可能有以下几个原因
12 2
|
1月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
152 0
|
1月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
60 1
|
1月前
|
分布式计算 网络安全 流计算
Flink【环境搭建 01】(flink-1.9.3 集群版安装、配置、验证)
【2月更文挑战第15天】Flink【环境搭建 01】(flink-1.9.3 集群版安装、配置、验证)
76 0
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之从EARLIEST_OFFSET启动就报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
SQL 消息中间件 Java
Flink部署问题之带上savepoint部署任务报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Kafka
Flink部署问题之hive表没有数据如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。