搭建Flink集群、集群HA高可用以及配置历史服务器

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 本文介绍了如何搭建一个Flink集群、Flink集群HA高可用,并配置历史服务器以记录Job任务执行的详细信息和状态。

Flink集群搭建

集群规划

节点 node01 node02 node03
角色 JobManager
TaskManager
TaskManager TaskManager

下载并解压安装包

wget https://repo.huaweicloud.com/apache/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz

在node01节点下载flink安装包,同时解压、重命名。

tar  -zxvf flink-1.17.0-bin-scala_2.12.tgz
mv flink-1.17.0 flink

修改集群配置

进入flink的conf目录,修改集群配置

vim /usr/local/program/flink/conf/flink-conf.yaml

1.修改flink-conf.yaml文件

JobManager节点配置

# jobmanager.rpc.address: localhost
# jobmanager.bind-host: localhost
jobmanager.rpc.address: node01
jobmanager.bind-host: 0.0.0.0

# rest.address: localhost
# rest.bind-address: localhost
rest.address: node01
rest.bind-address: 0.0.0.0

TaskManager节点配置

# taskmanager.host: localhost
# taskmanager.bind-host: localhost

taskmanager.host: node01
taskmanager.bind-host: 0.0.0.0

注意:需要在/etc/hosts文件中配置各个节点信息

172.29.234.1    node01    node01
172.29.234.2    node02    node02
172.29.234.3    node03    node03

2.修改workers文件

指定node01、node02、node03等节点为TaskManager

# localhost
node01
node02
node03

3.修改masters文件

# localhost:8081
node01:8081

分发安装目录

node01节点安装、配置好后,将Flink安装目录分发给另外两个节点服务器。

[root@node01 program]# pwd
/usr/local/program
[root@node01 program]# ls
flink                            jdk8

[root@node01 program]# scp -r flink node02:/usr/local/program/flink

[root@node01 program]# scp -r flink node03:/usr/local/program/flink

在node02、node03节点,修改flink-conf.yaml配置

1.node02节点

# taskmanager.host: localhost

taskmanager.host: node02

2.node03节点

# taskmanager.host: localhost

taskmanager.host: node03

启动集群

Flink附带了相关的bash脚本,可以用于启动、停止集群。

# 启动集群
./bin/start-cluster.sh

# 停止集群
./bin/stop-cluster.sh

node01节点服务器上执行start-cluster.sh脚本以启动Flink集群

[root@node01 bin]# cd /usr/local/program/flink/bin

[root@node01 bin]# ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.

查看进程情况

[root@node01 bin]# jps
6788 StandaloneSessionClusterEntrypoint
7256 Jps
7116 TaskManagerRunner
[root@node02 conf]# jps
16884 TaskManagerRunner
16959 Jps
[root@node03 conf]# jps
17139 TaskManagerRunner
17214 Jps

访问Web UI

当如上所示一样后,代表启动成功,此时可以访问http://node01:8081对flink集群和任务进行监控管理。

image.png

注意:关闭防火墙,否则可能无法访问,或者集群的TaskManager数量、Slot数量显示异常

systemctl stop firewalld

Flink集群HA高可用

概述

集群实际上只有一个JobManager,是存在单点故障的,官方提供了Standalone Cluster HA模式来实现集群高可用。

集群可以有多个JobManager,但只有一个处于active状态,其余的则处于备用状态,Flink使用 ZooKeeper来选举出Active JobManager,并依赖其来提供一致性协调服务,所以需要预先安装 ZooKeeper 。

Flink本身提供了内置ZooKeeper插件,可以直接修改conf/zoo.cfg,并且使用/bin/start-zookeeper-quorum.sh直接启动。

集群规划

节点 node01 node02 node03
角色 JobManager
TaskManager
JobManager
TaskManager
TaskManager

配置flink

基于Flink集群的node01节点配置的情况下,修改conf/flink-conf.yaml文件,增加如下配置:

# 配置使用zookeeper来开启高可用模式
high-availability.type: zookeeper

# 配置zookeeper的地址,采用zookeeper集群时,可以使用逗号来分隔多个节点地址
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181

# 在zookeeper上存储flink集群元信息的路径
high-availability.zookeeper.path.root: /flink

# 集群id 放置集群的所有必需协调数据
high-availability.cluster-id: /cluster_one

# 持久化存储JobManager元数据的地址,zookeeper上存储的只是指向该元数据的指针信息
high-availability.storageDir: hdfs://node01:9000/flink/recovery

配置master、workers

修改conf/masters文件,配置master节点

node01:8081
node02:8081

修改conf/workers文件,配置worker节点

node01
node02
node03

配置ZK

编辑vim zoo.cfg文件

server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

分发安装目录

node01节点安装、配置好后,将Flink安装目录分发给另外两个节点服务器。

[root@node01 program]# pwd
/usr/local/program
[root@node01 program]# ls
flink                            jdk8

[root@node01 program]# scp -r flink node02:/usr/local/program/flink

[root@node01 program]# scp -r flink node03:/usr/local/program/flink

在node02、node03节点,修改flink-conf.yaml配置

1.node02节点

jobmanager.rpc.address: node02

taskmanager.host: node02

2.node03节点

taskmanager.host: node03

启动HA集群

分发Flink相关配置到其他节点,然后确保Hadoop和ZooKeeper已经启动后,使用以下命令来启动集群:

[root@node01 flink]# bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host node01.
Starting standalonesession daemon on host node02.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.

访问http://node01:8081
image.png

访问http://node02:8081
image.png

测试

查看ZK:JobManager节点信息
image.png

kill node01节点上的JobManager进程

[root@node01 flink]# jps
2564 DataNode
3508 NodeManager
18741 Jps
7784 QuorumPeerMain
16666 TaskManagerRunner
2363 NameNode
16300 StandaloneSessionClusterEntrypoint
3117 ResourceManager
[root@node01 flink]# kill -9 16300

查看Active JobManager是否变化
image.png

Flink参数配置

flink-conf.yaml文件中有大量的配置参数,基本常见参数如下:

# jobmanager地址    
jobmanager.rpc.address: node01

# JobManager 的 JVM 堆内存大小,默认为 1024m 
jobmanager.heap.size: 1024m

# rpc通信端口
jobmanager.rpc.port: 6123

# 进程使用的全部内存大小,可以根据集群规模进行适当调整
jobmanager.memory.process.size:1600m

# Taskmanager 的 JVM 堆内存大小,默认为 1024m 
taskmanager.heap.size: 1024m

# 进程使用的全部内存大小,可以根据集群规模进行适当调整
taskmanager.memory.process.size: 1728m

# 每个TaskManager能够分配的Slot数量进行配置,默认为1 
# 通常设置为 CPU 核心的数量,或其一半
# Slot就是TaskManager中具体运行一个任务所分配的计算资源
taskmanager.numberOfTaskSlots: 1

# flink任务执行的并行度,默认为1
# 优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量
parallelism.default: 1

# 重启策略
jobmanager.execution.failover-strategy: region

# 存储临时文件的路径,如果没有配置,则默认采用服务器的临时目录,如 LInux 的 /tmp 目录
io.tmp.dirs: /tmp

参考Flink的官方手册:更多配置

配置历史服务器

概述

运行Flink job的集群一旦停止,只能去yarn或本地磁盘上查看日志,对于Job任务信息的查看、异常问题的排查非常不友好。

Flink提供了历史服务器,用来在相应的Flink集群关闭后查询已完成作业的统计信息。通过History Server可以查询这些已完成作业的统计信息,无论是正常退出还是异常退出。

Flink任务停止后,JobManager会将已经完成任务的统计信息进行存档,History Server进程则在任务停止后可以对任务统计信息进行查询。

配置

创建存储目录

[root@node01 flink]# hadoop fs -mkdir -p /logs/flink-job

在flink-config.yaml中添加如下配置

#==============================================================================
# HistoryServer
#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)

# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
jobmanager.archive.fs.dir: hdfs://node01:9000/logs/flink-job

# The address under which the web-based HistoryServer listens.
#historyserver.web.address: 0.0.0.0
historyserver.web.address: node01

# The port under which the web-based HistoryServer listens.
#historyserver.web.port: 8082
historyserver.web.port: 8082

# Comma separated list of directories to monitor for completed jobs.
#historyserver.archive.fs.dir: hdfs:///completed-jobs/
historyserver.archive.fs.dir: hdfs://node01:9000/logs/flink-job

# Interval in milliseconds for refreshing the monitored directories.
#historyserver.archive.fs.refresh-interval: 10000
historyserver.archive.fs.refresh-interval: 5000

启动、停止历史服务器

启动历史服务器

[root@node01 flink]# bin/historyserver.sh start
Starting historyserver daemon on host node01.

停止历史服务器

[root@node01 flink]# bin/historyserver.sh stop
Stopping historyserver daemon (pid: 30749) on host node01.

提交一个Job任务

[root@node01 flink]# bin/flink run -t yarn-per-job -c com.atguigu.wc.WordCountStreamUnboundedDemo  /root/FlinkTutorial-1.17-1.0-SNAPSHOT.jar

2023-06-12 23:41:00,719 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:41:00,742 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:41:00,761 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 23:41:00,766 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1686577483648_0012
2023-06-12 23:41:00,792 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1686577483648_0012
2023-06-12 23:41:00,792 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2023-06-12 23:41:00,793 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2023-06-12 23:41:04,565 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2023-06-12 23:41:04,565 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node02:38887 of application 'application_1686577483648_0012'.
Job has been submitted with JobID cd41d983c93d8eb906c9aa899dcdefd0

访问http://node01:8088/cluster查看Hadoop

image.png

访问Web UI查看提交任务信息
image.png

查看历史Job信息

在浏览器地址栏输入:http://node01:8082 查看已经停止的 job 的统计信息
image.png

停止提交任务

[root@node01 flink]# bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1686577483648_0012 cd41d983c93d8eb906c9aa899dcdefd0

访问http://node01:9870/explorer.html#/logs/flink-job查看HDFS中的归档文件
image.png

等一段时间,几分钟后查看历史服务器
image.png

查看Job具体信息
image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8天前
|
存储 弹性计算 人工智能
阿里云服务器配置选择方法,八大使用场景选择合适的云服务器配置
本文详解阿里云ECS服务器在八大场景(新手入门、网站、数据库、大数据、游戏、视频、AI、高性能计算)中的配置选择策略,涵盖实例性能特点与推荐型号,助力用户精准选型,实现性能与成本的最优平衡。
|
16天前
|
Ubuntu 安全 应用服务中间件
详细指南:配置Nginx服务器在Ubuntu平台上
以上步骤涵盖了基本流程:从软件包管理器获取 Ngnix, 设置系统服务, 调整UFW规则, 创建并激活服务器块(也称作虚拟主机), 并进行了初步优化与加固措施。这些操作都是建立在命令行界面上,并假设用户具有必要权限(通常是root用户)来执行这些命令。每个操作都有其特定原因:例如,设置开机启动确保了即使重启后也能自动运行 Ngnix;而编辑server block则定义了如何处理进入特定域名请求等等。
157 18
|
11天前
|
存储 弹性计算 固态存储
阿里云服务器租用价格参考:最新收费标准与不同实例热门配置活动价格
阿里云服务器租用价格参考:配置最低的1核0.5G云服务器,按量付费价格0.063元/小时,按月租用价格为18元/1月,爆款配置的活动价格目前直降,2核2G配置轻量应用服务器抢购价为38元一年;经济型e实例2核2G3M特惠价99元1年;通用算力型u1实例2核4G5M带宽特惠价199元1年;2核8G配置的活动价格最低为一年652.32元;4核16G配置的活动价格最低为1196.64元;8核16G配置的最低一年租用价格为3815.03元。以下是2025年阿里云服务器最新收费标准与热门配置活动价格的详细内容。
|
12天前
|
存储 缓存 数据挖掘
阿里云轻量应用服务器“CPU优化型”配置介绍、费用价格说明
阿里云轻量应用服务器推出CPU优化型,提供更强计算性能,2核4GB起,最高16核64GB,全系支持200Mbps带宽。适用于企业级应用、数据库、游戏服务器等高算力场景,保障稳定高效运行。
112 1
|
8天前
|
弹性计算 定位技术 数据中心
阿里云服务器选择方法:配置、地域及付费模式全解析
2025阿里云服务器选购指南:就近选择地域以降低延迟,企业用户优选2核4G5M带宽u1实例,仅199元/年;个人用户可选2核2G3M带宽ECS,99元/年起。长期稳定业务选包年包月,短期或波动场景用按量付费,轻松搭建网站首选高性价比配置。
|
资源调度 监控 数据处理
【Flink】Flink集群有哪些角色?各自有什么作用?
【4月更文挑战第18天】【Flink】Flink集群有哪些角色?各自有什么作用?
|
2月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
375 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
11月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3261 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎

热门文章

最新文章