flink standalone 部署模式且不能使用 hdfs 场景下的各种问题及其应对方案

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink standalone 部署模式且不能使用 hdfs 场景下的各种问题及其应对方案

一.背景

笔者所在公司某系统在某证券公司现场部署时,客户出于自己集群使用规划的考量,不允许 flink 访问大数据集群,既不能使用yarn资源管理器,也不能访问hdfs文件系统,而该系统中中 flink 应用程序的数据链路是 flink sql 读取 kafka topic中的数据,进行计算分析后写到es中供下游业务系统使用,计算过程中使用到了 flink 的 table api和sql。

该场景其实是flink standalone 部署模式且不能使用 hdfs 的典型场景,我在这里汇总分析了下该场景下相关的技术问题,并结合flink相关技术点的细节和原理,给出了各个问题可能的解决方案,供大家参考。

问题主要分为三大类:job manager 高可用部署,catalog配置和使用,以及 state backend 配置和使用。

以下是正文。

二.什么是 standalone 部署模式

首先明确下什么是 flink 的standalone 部署模式。

flink的 standalone 部署模式,即standalone deploy mode,是从 flink 集群的resource provider的角度说的,指的是flink直接在操作系统上启动flink 相关服务如client, jobmanager, taskmanager,而不依赖其它资源管理框架如yarn, mesos, k8s进行资源管理。

此时是由 flink直接来进行集群资源管理的,比如监控和重启失败的服务进程,分配和释放资源等等。

需要说明的是,standalone模式下,也可以结合使用docker和k8s (flink将使用k8s作为资源管理框架的模式称为native k8s mode,以跟standalone模式下的k8s进行区分)。

三.standalone 部署模式跟 hdfs 的关系

接着看下 standalone 部署模式跟 hdfs 的关系。

通过以上概念可以看出,flink 的standalone模式,跟flink是否能访问大数据集群中的hdfs没有任何关系。

很多standalone模式的flink,仍然使用的是大数据集群中的hdfs,作为job manager 高可用部署的后端存储(high-availability.storageDir),作为 checkpoint 时状态快照的后端存储(state.checkpoints.dir/state.savepoints.dir)。

四. standalone 部署模式且不能使用 hdfs 场景下各种问题及其应对方案

最后分析下flink standalone 部署模式且不能使用 hdfs 情景下,遇到的各种问题和对应的解决方案。

该问题的背景是,某系统在某证券公司部署时,客户出于自己集群使用规划放方面的考量,不允许 flink 访问大数据集群,即不能使用yarn资源管理器,也不能访问hdfs文件系统。该系统中flink 应用程序的数据链路是 flink sql 读取 kafka topic中的数据,进行计算分析后写到es中供下游业务系统使用,计算过程中使用到了 flink 的 table api和sql。

第一个问题是 flink job manager 的高可用部署问题

flink job manager ha, 即 flink job manager 高可用部署,需要使用到 zookeeper 集群,也需要一个各个 job manager 节点都可以访问的可持久化的存储系统如 hdfs, s3, ceph, nfs等。

具体到我们的场景,因为不能访问大数据集群,也就不能使用大数据集群中的 zookeeper 和 hdfs。 简单起见,我们可以不进行 job manager 的高可用部署(当硬件资源充足,且 flink 集群不是多租户使用时,经过参数调优的 flink 集群运行负载相对稳定的 flink 作业时,job manager 是比较稳定的,做好监控的情况下,不做 Ha 高可用部署问题也不大);或者一定要部署 ha的话,可以在大数据集群之外(如 flink 集群中)单独部署一套 zk 集群供flink ha 使用;同时 high-availability.storageDir 可以配置使用 nfs 等各个 job manager 节点都可以访问的其它持久化存储。

第二个问题是 flink 的 catalog 问题

flink 的 catalog 提供了 flink 访问数据库或其它外部存储系统中存储的数据时所需要的各种元数据,包括库,表,分区,视图,函数等等。

flink内置支持的 catalog 包括 GenericInMemoryCatalog/PostgresCatalog/HiveCatalog 三种。

PostgresCatalog是专门为访问 postgres 数据库中数据使用的,其主要功能是使得 flink 能直接读 posgres databases 里面已有的表,省略了 flink sql 端 ddl 的过程,目前也只支持部分 get/list 等 catalog 方法,所以不适合我们这里访问 kafka 数据的场景。

GenericInMemoryCatalog将库表分区视图函数等flink的元数据存储在内存中,这些元数据信息只在该 session 存活期间有效,不支持持久化。具体到我们的场景,如果我们能改造flink sql 代码,在每次创建 sessions 时首先重新执行 ddl 语句对 kafka topic 建表,是可以采用该方案的。

HiveCatalog,flink 的 hive catalog 跟 spark 的 hive catalog 一样,有两种作用:一是作为 flink 中库表分区视图函数等元数据的持久化层,二是作为读写 Hive 中元数据的接口。但同 spark 不同的是,flink hive catalog 需要一个 existing Hive installation , 也需要启动 hive metastore service.(spark 使用 hive catalog 时是不需要另外安装 hive 的,因为通过指定参数 -Phive 打包获得的 spark 的二进制包中包括了 hive 相关 jar 包; 也不需要启动 hive metastore service, 因为 spark 可以直接访问 hive metastore db,这相当于 hive 的 local 模式)。flink hive catalog 需要的 existing Hive installation,可以是大数据集群中的 hive,也可以是独立于大数据集群之外单独安装的hive。具体到我们的场景,如果要使用 hiveCatalog,需要在 flink 集群中某节点上安装 hadoop, hive,mysql; 需要启动 Mysql,hive metastore service (注意:只需要在某个节点安装 hadoop 和 hive 即可,不需要启动 hadoop! 不需要启动 hiveserver2!只需要启动hive metastore service)。

第三个问题是 flink 作业的 state backend问题

在 Flink 中,State Backend 有两个功能,一是提供状态 state 的存储和访,二是如果开启了 Checkpoint,会周期向远程的 Durable storage 上传 checkpoint snapshot 数据和返回元数据 (meta) 给 Job Manager。

在FLINK 1.13 之前的 Flink 版本中,以上两个功能是混在一起的,即把状态存储和检查点的创建概念笼统的混在一起,导致初学者对此部分感觉很混乱,很难理解。Flink 1.13 中两个概念被拆开 (FLINK-19463),其中 State Backend 的概念变窄,只描述状态访问和存储;另外一个概念是 Checkpoint storage,描述的是 Checkpoint 行为,如 Checkpoint 数据是发回给 JM 内存还是上传到远程。相对应的,配置项也被拆开为 State Backend 和 Checkpoint Storage,可选的选项包括 HashMapStateBackend/EmbeddedRocksDBStateBackend 和 JobManagerCheckpointStorage/FileSystemCheckpointStorage 。

旧式的StateBackend 和 新式的 StateBackend + Checkpoint Storage, 完整对应关系如下:

image.png

  • MemoryStateBackend() <=> HashMapStateBackend() + JobManagerCheckpointStorage;
  • MemoryStateBackend("file://path") <=> HashMapStateBackend() + JobManagerCheckpointStorage("file://path");
  • FsStateBackend() <=> HashMapStateBackend() + FileSystemCheckpointStorage();
  • RocksDBStateBackend(new MemoryStateBackend()) <=> EmbeddedRocksDBStateBackend() + JobManagerCheckpointStorage();
  • RocksDBStateBackend(new FsStateBackend()) <=> EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage();

经常使用的三种 StateBackend ,其具体配置方式如下:

  • MemoryStateBackend() <=> state.backend: jobmanager/hashmap;
  • FsStateBackend() <=> state.backend: filesystem/hashmap,state.checkpoints.dir: file:///checkpoint-dir/;
  • RocksDBStateBackend() <=> state.backend: rocksdb,state.checkpoints.dir: file:///checkpoint-dir/;

经常使用的三种 StateBackend,其详细说明和对比如下:

  • MemoryStateBackend() 在 task manager java 堆空间中以 java object 形式存储 state, 在 checkpoint 时会制作 state snapshot 并发送给 Job Manager;
  • FsStateBackend() 在 task manager java 堆空间中以 java object 形式存储 state,在 checkpoint 时会制作 state snapshot 并存储到指定的文件系统目录下,jobManager只在内存中存储了少量元数据,其性能很好,但 state 大小受限于 taskManager 进程的内存;
  • RocksDBStateBackend() 在 task manager 内置的 RocksDB 中存储 state(其本质是使用 task manager 本地磁盘),在 checkpoint 时会对 RocksDB 制作 snapshot 并存储到指定的文件系统目录下,jobManager 只在内存中存储了少量元数据,其可以存储更大的 state (因为 state 是存储在task manager 所在节点的本地磁盘上而不是 taskManager 堆空间中),但性能相对 FsStateBackend() 低一些 (因为涉及到 java对象的序列化和反序列以及磁盘读写), 可以支持增量快照。
  • 在开发和测试环境,我们经常使用 MemoryStateBackend(),而在生产环境,我们一般推荐使用第二种 FsStateBackend() 或第三种 RocksDBStateBackend()。

对应到我们无法使用 hdfs 也没有其它分布式持久化存储的场景下,在生产环境推荐使用FsStateBackend() 或 RocksDBStateBackend(), 此时需要配置 state.checkpoints.dir 使用本地文件系统。此时可以应对 flink task 的失败和重启,可以应对 task namager 和 jobmanager 的失败和重启,但无法应对物理服务器节点的故障或本地物理磁盘的损坏;(注意:如果有各个节点都可以访问的nfs文件系统,将state.checkpoints.dir 配置为 nfs 从原理上说也行得通,但目前尚未做该部署方案的功能和性能验证)。

相关文章
|
1月前
|
SQL Kubernetes 调度
Flink 流批一体在模型特征场景的使用
本文整理自B站资深开发工程师张杨老师在 Flink Forward Asia 2023 中 AI 特征工程专场中的分享。
77109 4
Flink 流批一体在模型特征场景的使用
|
3月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
24 2
|
12天前
|
存储 消息中间件 运维
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
本文主要分享友盟+ U-App 整体的技术架构,以及在实时和离线计算上面的优化方案。
317 1
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
|
2月前
|
缓存 分布式计算 Apache
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
62 0
|
2月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
36 3
|
3月前
|
Java 流计算
在Flink实时任务中,POJO(Plain Old Java Object)对象的模式演进可能会引起不兼容的问题
【2月更文挑战第6天】在Flink实时任务中,POJO(Plain Old Java Object)对象的模式演进可能会引起不兼容的问题
22 3
|
3月前
|
消息中间件 SQL Java
flink问题之Application 模式下启动失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
53 0
|
3月前
|
资源调度 Kubernetes Java
Flink--day02、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
Flink--day022、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
143 5
|
3月前
|
消息中间件 SQL NoSQL
Flink数据源问题之自定义如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
31 3
|
3月前
|
消息中间件 SQL 关系型数据库
Flink数据源问题合集之设置成批量模式如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
32 2