Flink 基础学习(八) 手把手教你搭建伪集群 HA

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
全局流量管理 GTM,标准版 1个月
简介: 前面理论性的知识是不是有点太“干货”,所以来点实战性的内容吧,这次记录了如何搭建高可用的 Flink 集群。在正式配置前,来讲下为何要配置高可用(High Availability)目前越来越多公司的线上应用,都采用的是分布式架构(一主多从),从而避免单点故障引起的服务不可用。而在 Flink 中,同样也有集群保障服务的高可用,任何时候都有一个主 JobManager 和多个备 JobManager,当前主节点宕机后,立刻会有备 JobManager 当选成为主 JobManager ,接管集群,让每个作业继续正常进行。出于这样的考虑,在 Flink 应用上线前,需要有这样一个

1 前言

前面理论性的知识是不是有点太“干货”,所以来点实战性的内容吧,这次记录了如何搭建高可用的 Flink 集群。

在正式配置前,来讲下为何要配置高可用(High Availability

目前越来越多公司的线上应用,都采用的是分布式架构(一主多从),从而避免单点故障引起的服务不可用。

而在 Flink 中,同样也有集群保障服务的高可用,任何时候都有一个主 JobManager 和多个备 JobManager,当前主节点宕机后,立刻会有备 JobManager 当选成为主 JobManager ,接管集群,让每个作业继续正常进行。

出于这样的考虑,在 Flink 应用上线前,需要有这样一个集群来进行保障,下面就来看下该如何进行配置吧。

2 前置工作

2.1 机器准备

在本次搭建前,我通过了本地单机模拟,监听了 8081 和 8082 端口,启动了两个 JobManager 和三个 TaskManager

序号 IP 启动进程 UI PORT
1 127.0.0.1 JobManager、TaskManager 8081
2 127.0.0.1 JobManager、TaskManager 8082
3 127.0.0.1 TaskManager /

一般来说,JobManager 任务调度器所在的机器配置可以低一些,而 TaskManager 任务执行者所在的机器配置会高一些,所以基于上述机器的配置,请各位使用时进行合理分配。

2.1.2 机器之间 SSH 免密登录

本质上,我们只需要修改一份配置,然后将修改后的 Flink 包通过 SCP 命令传送到其它机器上。

接着在单台服务器上启动,它会根据 mastersslaves 文件找到对应的服务器,登录上去,判断启动 JobManagerTaskManager 或者两者都启动。

总结一下机器之间 SSH 免密登录的要点:

1. 每台机器生成公私钥:ssh-keygen 命令

2. 将本机生成的公钥(id_rsa.pub)拷贝到其它机器的认证列表中(authorized_keys)

使用该命令 ssh-copy-id -i id_rsa.pub root@118.25.xxx.xxx

3. 重复上述命令,将本机的公钥添加到集群中的每一台服务器上

4. 免密登录所在的服务器,就是待会我们启动 ./start_cluster.sh 脚本所在的服务器哟

52.jpg

参考上面的图片,将公钥拷贝到远程服务器后,我们就能通过 ssh root@xxx.xxx.xx.xx,免密码登录到远程服务器啦。

2.2 安装 Zookeeper

为了启用 JobManager 高可用这个功能,需要在 flink_conf.yaml 中将 高可用模式 设置为 zookeeper

Flink 利用 Zookeeper 管理 JobManagerTaskManager,进行分布式协调(熟悉 Dubbo 系的小伙伴,应该对我们的 Zookeeper 注册中心感到熟悉)。

Zookeeper 是独立于 Flink 的一项服务,该服务通过节点选举和轻量级一致状态存储提供高度可靠的分布式协调。

待会再具体讲配置文件的参数,这里是提醒大家一定要安装和启动 Zk 服务

具体可以看下 Zookeeper 单机环境和集群环境搭建

2.3 安装 Hadoop

在高可用模式下,需要使用 分布式文件系统 来持久化存储 JobManager 的元数据(meta data),常用的是 HDFS,于是 Hadoop 也得提前安装,在 Hadoop 配置上,我花了半天的功夫,才将它从 0 搭建了起来,这里得好好记录一下。

提前说明一下,下面出现域名的地方,我基本都以 IP 地址的形式展示出来,避免大家忘了在 /etc/hosts 文件中进行 DNS 解析,以及设定的文件目录是 /tmp,也是避免启动前忘了给文件夹赋予读写权限。

(懂我意思了吧,要修改成域名和文件目录地址,记得添加 DNS 解析和  chmod 赋予权限哟)

进入 ${HADOOP_HOME}/etc/hadoop 目录下,修改以下配置文件

1. core-site.xml

<configuration>
    <property>
        <!--指定 namenode 的 hdfs 协议文件系统的通信地址-->
        <name>fs.defaultFS</name>
        <value>hdfs://127.0.0.1:9000</value>
    </property>
    <property>
        <!--指定 hadoop 集群存储临时文件的目录-->
        <name>hadoop.tmp.dir</name>
        <value>/tmp/hadoop/tmp</value>
    </property>
</configuration>

2. hdfs-site.xml

<configuration>
    <property>
      <!--namenode 节点数据(即元数据)的存放位置,可以指定多个目录实现容错,多个目录用逗号分隔-->
      <name>dfs.namenode.name.dir</name>
      <value>/tmp/hadoop/namenode/data</value>
</property>
<property>
      <!--datanode 节点数据(即数据块)的存放位置-->
      <name>dfs.datanode.data.dir</name>
      <value>/tmp/hadoop/datanode/data</value>
</property>
    <property>
      <!-- 手动设置 DFS 监听端口号 -->
      <name>dfs.http.address</name>
      <value>127.0.0.1:50070</value>
</property>
</configuration>

3. yarn-site.xml

<configuration>
    <property>
        <!--配置 NodeManager 上运行的附属服务。需要配置成 mapreduce_shuffle 后才可以在 Yarn 上运行 MapReduce 程序。-->
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <!--resourcemanager 的主机名-->
        <name>yarn.resourcemanager.hostname</name>
        <value>localhost</value>
    </property>
</configuration>

4. mapred-site.xml

<configuration>
    <property>
        <!--指定 mapreduce 作业运行在 yarn 上-->
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

单机启动,定位到 ${HADOOP_HOME}/sbin 目录,输入以下命令 ./start-all.sh 启动 Hadoop

除了通过 ${HADOOP_HOME}/logs 目录下的日志查看,还可以通过以下三个端口检测 Hadoop 是否成功启动:

  • Resource Manager: http://localhost:50070

53.jpg

  • JobTracker: http://localhost:8088

54.jpg

  • Specific Node Information: http://localhost:8042

55.jpg

2.4 安装 Hadoop 踩坑记

  • localhost port 22 : Connection refused

提示这个错误是因为 Hadoop 启动时需要远程登录 ssh localhost,需要进入 /root/.shh 目录,将公钥加入到认证列表中:

$ cat id_rsa.pub >> authorized_keys

如果是 Mac 用户,你还需要在小齿轮设置中的【共享】模块勾选【远程登录】这一个选项

  • hadoop无法访问50070端口

查看日志将会看到如下提示信息是

java.net.ConnectException: Call From yejingqidembp.lan/192.168.199.232 to localhost:9000 failed on connection exception: java.net.ConnectException: Connection refused;

实际原因是:Resource Manager 没有正常启动

解决方案:

  1. hdfs-site.xml 中添加 namenode 初始化默认端口
<!-- dfs ip 地址请按实际使用时设置 -->
<property>
  <name>dfs.http.address</name>
  <value>127.0.0.1:50070</value>
</property>
  1. 重新格式化 namenode,然后再启动 hadoop
$ cd ${HADOOP_HOME}
  $ ./bin/hdfs namenode -format
  $ ./sbin/start-all.sh

经过上面的操作,你就能够成功启动单点 Hadoop,如果想要启动集群版的,请参考这篇文章:Hadoop 集群环境搭建

3 Flink Standalone Cluster HA

使用 Zookeeper 作为注册中心,进行资源的分布式协调,实现集群高可用

3.1 Flink 配置

首先 Flink 的配置文件路径在 ${FLINK_HOME}/conf,推荐各位使用 VSCode 这个编辑器打开,可以在左侧查看文件目录树的结构,界面好看,功能强大(这大概是我抛弃了 Sublime 的原因哈哈哈)

56.jpg

上图是本次要修改的三个核心配置文件,关于高可用 Zookeeper 的设定和主从节点的配置。

  • flink-conf.yaml
# High Availability
# 高可用模式设置成 zk
high-availability: zookeeper
# 持久化存储 JobManager 元数据的地址
high-availability.storageDir: hdfs://127.0.0.1:9000/flink/ha/
# zk 仲裁集群地址,实际使用时建议三台以上
high-availability.zookeeper.quorum: 127.0.0.1:2181
  • master

配置了两个 JobManager,由于是本地演示,所以监听了 8081 和 8082 两个端口,启动两个 WEBUI:PORT

127.0.0.1:8081
127.0.0.1:8082
  • slaves

配置了三个 TaskManager

127.0.0.1
127.0.0.1
127.0.0.1

其它更多可选配置,可以参考 Flink 官方配置介绍,关于 JobManager 堆大小分配和 Slot 分配等配置,本次使用的是默认的,使用的是 1G 和 1 Slot

3.2 分发到其它服务器

前面也提到过,集群中的配置,只需要在一台服务器上修改好,打通机器间的免密登录,在分发出去的机器上启动集群就可以了,所以来看下如何将文件夹分发到其它服务器吧

$ scp -r /usr/local/flink-1.9.1 root@127.0.0.2:/usr/local
$ scp -r /usr/local/flink-1.9.1 root@127.0.0.3:/usr/local

假设我们有三台服务器,127.0.0.1 上是我们修改过的包,通过上面的 scp 命令就能将文件夹发送到集群中的服务器。

不过本次演示是单机,不需要分发,我们可以直接进行下一步,但在多机器的情况下,保持路径一致,分发后才进行下一步。

3.3 启动集群

在启动前,请确保 ZookeeperHadoop 都已成功启动,进入 ${FLINK_HOME}/bin 目录

$ ./start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host yejingqideMBP.lan.
[INFO] 1 instance(s) of standalonesession are already running on yejingqideMBP.lan.
Starting standalonesession daemon on host yejingqideMBP.lan.
Starting taskexecutor daemon on host yejingqideMBP.lan.
[INFO] 1 instance(s) of taskexecutor are already running on yejingqideMBP.lan.
Starting taskexecutor daemon on host yejingqideMBP.lan.
[INFO] 1 instance(s) of taskexecutor are already running on yejingqideMBP.lan.
Starting taskexecutor daemon on host yejingqideMBP.lan.

3.4 验证集群成功启动

通过 JPS 命令,查看当前运行的 Java 进程

57.jpg

红框中圈出的是属于 Flink 的进程,包括两个 JobManager 和 三个 TaskManager

同样,我们还可以通过界面 UI 查看:

58.jpg

左边是 JobManager1,端口号是 8081,在 JobManager 菜单中也能看到我们配置的 Zookeeper 信息,右边是 JobManager2,端口号是 8082,在 TaskManager 菜单可以看到启动了三个 Worker,以上结果符合我们在 masterslaves 文件上的配置~

3.5 验证集群功能

前面也提到过,集群帮助我们出现单点故障导致整个服务不可用的情况,通过查看线程信息和 JobManager 菜单下的 logs 日志,发现了 8081 是我们的 Master 节点。

59.jpg

接着就是关闭 8081 进程  kill -9 processID 来一下,刷新 8081 的页面,将会变成灰色不可用,这样我们的主节点就“宕机了”

重新启动 8081,./jobManager.sh start,查看输出日志:

60.jpg

可以发现 8081 已经没有 grant leadership 的标志,我们的 8082 成功当选 master 节点,继续承担主节点的职责,完成资源任务的调度。

3.6 Flink 启动踩坑

在启动过程中,发现 Cluster HA 并没有成功启动,通过查看 ${FLINK_HOME}/log 下的日志,发现了这个问题:

- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
  at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
...
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.

同样,查阅到相关资料后,发现是默认下载的 Flink 缺少 Hadoop 的相关依赖,需要去官网下载相应的 Optional components 依赖,例如我下载的是这个 flink-shaded-hadoop-2-uber-2.8.3-7.0.jar

下载完成后,将它扔入 ${FLINK_HOME}/lib 目录下,然后重新启动就可以了。

4 总结

本篇介绍了如何启用 Flink Standalone Cluster HA,由于踩坑巨多才成功搭建,这里总结一下要点:

  1. 服务器准备
    将前面出现的 ip 地址替换成自己集群中的服务器
  2. 免密登录其它服务器
    参考 ssh-copy-id命令 和 authoried_keys 文件作用
  3. 安装 ZookeeperHadoop
  4. 配置 Hadoop
    这一步是坑最多的,Mac brew 安装的有点坑,所以十分建议下载官网的包,然后修改下载后的包配置,具体看前面的配置
  5. 下载 Flink
    同样也是建议下载官网的包,然后记得下载 Hadoop 依赖包,扔入 ${FLINK_HOME}/lib 目录下
  6. 配置 Flink
    flink-conf.yamlmastersslaves
  7. 启动 Flink 集群和验证

好了,以上就是本次的经验总结,介绍了如何搭建伪集群高可用 HA,跟上面说的,将出现的 IP 换成多台服务器就能够实现真正的高可用部署

如有其它学习建议或文章不对之处,请与我联系~(可在 Github 中提 Issue 或掘金中联系)

5 项目地址

https://github.com/Vip-Augus/flink-learning-note

git clone https://github.com/Vip-Augus/flink-learning-note

6 参考资料

1、生产级别flink HA集群搭建和调优

2、Linux主机之间ssh免密登录配置

3、JobManager High Availability (HA)

4、hadoop无法访问50070端口

5、在Mac下安装Hadoop的坑

6、Flink 官方配置介绍


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
zdl
|
2月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
191 56
|
3月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
118 3
|
3月前
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
63 0
|
5月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
5月前
|
SQL Kubernetes 数据处理
实时计算 Flink版产品使用问题之如何把集群通过kubernetes进行部署
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
5月前
|
资源调度 算法 Java
Flink四种集群模式原理
Flink四种集群模式原理
189 0
|
5月前
|
资源调度 Oracle Java
实时计算 Flink版产品使用问题之在YARN集群上运行时,如何查看每个并行度的详细处理数据情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Kubernetes Java 数据库连接
实时计算 Flink版产品使用问题之部署到 Kubernetes 集群时,任务过一会儿自动被取消,该如何排查
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之集群重启后,所有的Jobs任务丢失,如何快速恢复
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。