01 引言
在前面的博客,我们已经大概对Flink有一个初步认识了,有兴趣的同学可以参阅下:
如果要学习Flink
必须先搭建好Flink
环境,本文来讲解下Flink
的环境搭建。
在上一篇博客 《Flink教程(01)- Flink知识图谱》里面的物理部署层,我们知道了Flink
有几种部署模式,根据本地或集群分为以下几种:
- Local(本地单机模式):学习测试时使用
- Standalone(独立集群模式):Flink自带集群,开发测试环境使用
- StandaloneHA(独立集群高可用模式):Flink自带集群,开发测试环境使用
- On Yarn(计算资源统一由Hadoop YARN管理):生产环境使用
本文来讲解下。
02 Local本地单机模式
2.1 工作原理
上图流程如下:
Flink
程序由JobClient
进行提交;JobClient
将作业提交给JobManager
;JobManager
负责协调资源分配和作业执行,资源分配完成后,任务将提交给相应的TaskManager
;TaskManager
启动一个线程以开始执行,TaskManager
会向JobManager
报告状态更改,如开始执行,正在进行或已完成;- 作业执行完成后,结果将发送回客户端(
JobClient
)。
2.2 安装部署
step1:下载安装包
step2:上传flink-1.12.0-bin-scala_2.12.tgz
到node1
的指定目录
step3:解压
tar -zxvf flink-1.12.0-bin-scala_2.12.tgz
step4:修改权限
chown -R root:root /export/server/flink-1.12.0
step5:改名或创建软链接
mv flink-1.12.0 flink ln -s /export/server/flink-1.12.0 /export/server/flink
2.3 测试验证
1. 准备文件/root/words.txt
vim /root/words.txt • 1
内容如下:
hello me you her
hello me you
hello me
hello
2. 启动Flink
本地“集群”
/export/server/flink/bin/start-cluster.sh
3.使用jps
可以查看到下面两个进程
- TaskManagerRunner - StandaloneSessionClusterEntrypoint
4.访问Flink
的Web UI
: http://node1:8081/#/overview
slot
在Flink
里面可以认为是资源组,Flink
是通过将任务分成子任务并且将这些子任务分配到slot
来并行执行程序。
5. 执行官方示例:
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out
6. 停止Flink:
/export/server/flink/bin/stop-cluster.sh
启动shell
交互式窗口(目前所有Scala 2.12
版本的安装包暂时都不支持Scala Shell
)
/export/server/flink/bin/start-scala-shell.sh local
执行如下命令:
benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
退出shell:
:quit
03 Standalone独立集群模式
3.1 工作原理
工作流程:
client
客户端提交任务给JobManager
;JobManager
负责申请任务运行所需要的资源并管理任务和资源;JobManager
分发任务给TaskManager
执行;TaskManager
定期向JobManager
汇报状态。
3.2 安装部署
step1:集群规划
- 服务器:
node1(Master + Slave)
:JobManager + TaskManager
- 服务器:
node2(Slave)
:TaskManager
- 服务器:
node3(Slave)
:TaskManager
step2:修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
内容如下:
jobmanager.rpc.address: node1 taskmanager.numberOfTaskSlots: 2 web.submit.enable: true #历史服务器 jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/ historyserver.web.address: node1 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
step3:修改masters
vim /export/server/flink/conf/masters
内容如下:
node1:8081
step4:修改slaves
vim /export/server/flink/conf/workers
内容如下:
node1 node2 node3
step5:添加HADOOP_CONF_DIR环境变量
vim /etc/profile
新增内容:
export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
step6:分发
scp -r /export/server/flink node2:/export/server/flink scp -r /export/server/flink node3:/export/server/flink scp /etc/profile node2:/etc/profile scp /etc/profile node3:/etc/profile
或
for i in {2..3}; do scp -r flink node$i:$PWD; done
step7:source
source /etc/profile
3.3 测试验证
1. 启动集群,在node1上执行如下命令
/export/server/flink/bin/start-cluster.sh
或者单独启动
/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all /export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all
2. 启动历史服务器
/export/server/flink/bin/historyserver.sh start • 1
3. 访问Flink UI界面或使用jps查看
TaskManager
界面:可以查看到当前Flink
集群中有多少个TaskManager
,每个TaskManager
的slots
、内存、CPU
Core
是多少
4. 执行官方测试案例
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input hdfs://node1:8020/wordcount/input/words.txt --output hdfs://node1:8020/wordcount/output/result.txt --parallelism 2
5. 查看历史日志
6. 停止Flink集群
/export/server/flink/bin/stop-cluster.sh
04 Standalone-HA高可用集群模式
4.1 工作原理
从之前的架构中我们可以很明显的发现 JobManager
有明显的单点问题(SPOF,single point of failure
)。JobManager
肩负着任务调度以及资源分配,一旦 JobManager
出现意外,其后果可想而知。
工作原理:
- 在
Zookeeper
的帮助下,一个Standalone
的Flink
集群会同时有多个活着的JobManager
,其中只有一个处于工作状态,其他处于Standby
状态。 - 当工作中的
JobManager
失去连接后(如宕机或Crash
),Zookeeper
会从Standby
中选一个新的JobManager
来接管Flink
集群。
4.2 安装部署
step1:集群规划
- 服务器:
node1(Master + Slave)
:JobManager + TaskManager
- 服务器:
node2(Master + Slave)
:JobManager + TaskManager
- 服务器:
node3(Slave)
:TaskManager
step2:启动ZooKeeper
zkServer.sh status zkServer.sh stop zkServer.sh start
step3:启动HDFS
/export/serves/hadoop/sbin/start-dfs.sh
step4:停止Flink集群
/export/server/flink/bin/stop-cluster.sh
step5:修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
增加如下内容:
state.backend: filesystem state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints high-availability: zookeeper high-availability.storageDir: hdfs://node1:8020/flink/ha/ high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
配置解释:
#开启HA,使用文件系统作为快照存储 state.backend: filesystem #启用检查点,可以将快照保存到HDFS state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints #使用zookeeper搭建高可用 high-availability: zookeeper # 存储JobManager的元数据到HDFS high-availability.storageDir: hdfs://node1:8020/flink/ha/ # 配置ZK集群地址 high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
step6:修改masters
vim /export/server/flink/conf/masters
node1:8081
node2:8081
step7:同步
scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/ scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/ scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/ scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/
step8:修改node2上的flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
修改内容如下:
jobmanager.rpc.address: node2
step9:重新启动Flink集群,node1上执行
/export/server/flink/bin/stop-cluster.sh /export/server/flink/bin/start-cluster.sh
step10:使用jps命令查看,发现没有Flink相关进程被启动
step11:查看日志
cat /export/server/flink/log/flink-root-standalonesession-0-node1.log
发现如下错误:
因为在Flink1.8
版本后,Flink
官方提供的安装包里没有整合HDFS
的jar
step12:下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
- 下载地址:https://flink.apache.org/downloads.html
- 放入lib目录(
cd /export/server/flink/lib
) - 分发(
for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done
)
step13:重新启动Flink集群,node1上执行
/export/server/flink/bin/start-cluster.sh
step14:使用jps命令查看,发现三台机器已经ok