Flink教程(03)- Flink环境搭建(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: Flink教程(03)- Flink环境搭建(上)

01 引言

在前面的博客,我们已经大概对Flink有一个初步认识了,有兴趣的同学可以参阅下:

如果要学习Flink必须先搭建好Flink环境,本文来讲解下Flink的环境搭建。

在上一篇博客 《Flink教程(01)- Flink知识图谱》里面的物理部署层,我们知道了Flink有几种部署模式,根据本地或集群分为以下几种:

  • Local(本地单机模式):学习测试时使用
  • Standalone(独立集群模式):Flink自带集群,开发测试环境使用
  • StandaloneHA(独立集群高可用模式):Flink自带集群,开发测试环境使用
  • On Yarn(计算资源统一由Hadoop YARN管理):生产环境使用

本文来讲解下。

02 Local本地单机模式

2.1 工作原理

上图流程如下:

  1. Flink程序由JobClient进行提交;
  2. JobClient将作业提交给JobManager
  3. JobManager负责协调资源分配和作业执行,资源分配完成后,任务将提交给相应的TaskManager
  4. TaskManager启动一个线程以开始执行,TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成;
  5. 作业执行完成后,结果将发送回客户端(JobClient)。

2.2 安装部署

step1:下载安装包

step2:上传flink-1.12.0-bin-scala_2.12.tgznode1的指定目录

step3:解压

tar -zxvf flink-1.12.0-bin-scala_2.12.tgz 

step4:修改权限

chown -R root:root /export/server/flink-1.12.0

step5:改名或创建软链接

mv flink-1.12.0 flink
ln -s /export/server/flink-1.12.0 /export/server/flink

2.3 测试验证

1. 准备文件/root/words.txt

vim /root/words.txt
• 1

内容如下:

hello me you her

hello me you

hello me

hello

2. 启动Flink本地“集群”

/export/server/flink/bin/start-cluster.sh

3.使用jps可以查看到下面两个进程

- TaskManagerRunner
 - StandaloneSessionClusterEntrypoint

4.访问FlinkWeb UIhttp://node1:8081/#/overview

slotFlink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。

5. 执行官方示例:

/export/server/flink/bin/flink run 
/export/server/flink/examples/batch/WordCount.jar --input 
/root/words.txt --output /root/out

6. 停止Flink

/export/server/flink/bin/stop-cluster.sh

启动shell交互式窗口(目前所有Scala 2.12版本的安装包暂时都不支持Scala Shell)

/export/server/flink/bin/start-scala-shell.sh local

执行如下命令:

benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()

退出shell:

:quit

03 Standalone独立集群模式

3.1 工作原理

工作流程:

  1. client客户端提交任务给JobManager
  2. JobManager负责申请任务运行所需要的资源并管理任务和资源;
  3. JobManager分发任务给TaskManager执行;
  4. TaskManager定期向JobManager汇报状态。

3.2 安装部署

step1:集群规划

  • 服务器: node1(Master + Slave): JobManager + TaskManager
  • 服务器: node2(Slave): TaskManager
  • 服务器: node3(Slave): TaskManager

step2:修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

内容如下:

jobmanager.rpc.address: node1
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true
#历史服务器
jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/

step3:修改masters

vim /export/server/flink/conf/masters

内容如下:

node1:8081

step4:修改slaves

vim /export/server/flink/conf/workers

内容如下:

node1
node2
node3

step5:添加HADOOP_CONF_DIR环境变量

vim /etc/profile

新增内容:

export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

step6:分发

scp -r /export/server/flink node2:/export/server/flink
scp -r /export/server/flink node3:/export/server/flink
scp  /etc/profile node2:/etc/profile
scp  /etc/profile node3:/etc/profile

for i in {2..3}; do scp -r flink node$i:$PWD; done

step7:source

source /etc/profile

3.3 测试验证

1. 启动集群,在node1上执行如下命令

/export/server/flink/bin/start-cluster.sh

或者单独启动

/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all

2. 启动历史服务器

/export/server/flink/bin/historyserver.sh start
• 1

3. 访问Flink UI界面或使用jps查看

TaskManager界面:可以查看到当前Flink集群中有多少个TaskManager,每个TaskManagerslots、内存、CPUCore是多少

4. 执行官方测试案例

/export/server/flink/bin/flink run  
/export/server/flink/examples/batch/WordCount.jar --input 
hdfs://node1:8020/wordcount/input/words.txt --output 
hdfs://node1:8020/wordcount/output/result.txt  --parallelism 2

5. 查看历史日志

6. 停止Flink集群

/export/server/flink/bin/stop-cluster.sh

04 Standalone-HA高可用集群模式

4.1 工作原理

从之前的架构中我们可以很明显的发现 JobManager有明显的单点问题(SPOF,single point of failure)。JobManager 肩负着任务调度以及资源分配,一旦 JobManager出现意外,其后果可想而知。

工作原理:

  • Zookeeper的帮助下,一个 StandaloneFlink集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于Standby状态。
  • 当工作中的 JobManager 失去连接后(如宕机或Crash),Zookeeper会从 Standby中选一个新的 JobManager 来接管 Flink 集群。

4.2 安装部署

step1:集群规划

  • 服务器: node1(Master + Slave): JobManager + TaskManager
  • 服务器: node2(Master + Slave):JobManager + TaskManager
  • 服务器:node3(Slave): TaskManager

step2:启动ZooKeeper

zkServer.sh status
zkServer.sh stop
zkServer.sh start

step3:启动HDFS

/export/serves/hadoop/sbin/start-dfs.sh

step4:停止Flink集群

/export/server/flink/bin/stop-cluster.sh

step5:修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

增加如下内容:

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

配置解释:

#开启HA,使用文件系统作为快照存储
state.backend: filesystem
#启用检查点,可以将快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
#使用zookeeper搭建高可用
high-availability: zookeeper
# 存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://node1:8020/flink/ha/
# 配置ZK集群地址
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

step6:修改masters

vim /export/server/flink/conf/masters

node1:8081

node2:8081

step7:同步

scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/

step8:修改node2上的flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

修改内容如下:

jobmanager.rpc.address: node2

step9:重新启动Flink集群,node1上执行

/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh

step10:使用jps命令查看,发现没有Flink相关进程被启动

step11:查看日志

cat /export/server/flink/log/flink-root-standalonesession-0-node1.log

发现如下错误:

因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar

step12:下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作

  • 下载地址:https://flink.apache.org/downloads.html
  • 放入lib目录(cd /export/server/flink/lib
  • 分发(for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done

step13:重新启动Flink集群,node1上执行

/export/server/flink/bin/start-cluster.sh

step14:使用jps命令查看,发现三台机器已经ok

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL 运维 API
Apache Flink 学习教程----持续更新
Apache Flink 学习教程----持续更新
279 0
|
6月前
|
流计算
JD Flink教程
JD Flink教程
42 0
|
6月前
|
Apache 流计算
Apache Flink教程
Apache Flink教程
259 0
|
3月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
249 2
|
6月前
|
Java Linux 流计算
【极数系列】Flink环境搭建&Docker版本(04)
【极数系列】Flink环境搭建&Docker版本(04)
221 3
|
6月前
|
Java Linux 网络安全
【极数系列】Flink环境搭建&Linux版本 (03)
【极数系列】Flink环境搭建&Linux版本 (03)
99 2
|
6月前
|
分布式计算 网络安全 流计算
Flink【环境搭建 01】(flink-1.9.3 集群版安装、配置、验证)
【2月更文挑战第15天】Flink【环境搭建 01】(flink-1.9.3 集群版安装、配置、验证)
442 0
|
6月前
|
Apache 流计算
Apache Flink教程----2.本地开发
Apache Flink教程----2.本地开发
68 0
|
6月前
|
Shell Apache 流计算
Apache Flink教程----1.安装初体验
Apache Flink教程----1.安装初体验
75 0
|
6月前
|
SQL 分布式计算 Java
2021年最新最全Flink系列教程__Flink综合案例(九)
2021年最新最全Flink系列教程__Flink综合案例(九)
60 0