Demo 示例:如何原生的在 K8s 上运行 Flink?-阿里云开发者社区

开发者社区> 阿里云实时计算> 正文

Demo 示例:如何原生的在 K8s 上运行 Flink?

简介: Kubernetes 相信大家都比较熟悉,近两年大家都在讨论云原生的话题,讨论 Kubernetes。本文由阿里巴巴技术专家王阳(亦祺)分享,社区志愿者翟玥整理主要介绍如何原生的在 Kubernetes 上运行 Flink。

整理:翟玥(Flink 社区志愿者)
校对:温天柱(Flink 社区志愿者)
作者:王阳(亦祺)

摘要:本文由阿里巴巴技术专家王阳(亦祺)分享,社区志愿者翟玥整理主要介绍如何原生的在 Kubernetes 上运行 Flink。主要内容包括:

  1. Kubernetes 简介
  2. Flink on Kubernetes 部署演进
  3. Flink Native Integration 技术细节
  4. Demo 演示

Tips:点击「阅读原文」链接可查看作者原版 PPT 及分享视频~

Kubernetes 简介

什么是 Kubernetes?

Kubernetes 相信大家都比较熟悉,近两年大家都在讨论云原生的话题,讨论 Kubernetes。那么什么是 Kubernetes 呢?

  • K8s 是一个资源管理系统。

如果大家对 Yarn、 Mesos 熟悉,假设给定一批裸的物理机,将资源管理系统部署上去之后,可以在此之上基于它的 API 或者 SDK 开发一些分布式软件或者应用程序。例如可以在 Yarn 上开发传统的 MapReduce,在 K8s 上可以开发一些分布式的 Web Server,或者是大数据计算任务等等。

  • K8s 是一个容器编排系统。

不同于传统的 Yarn,K8s 在所有的进程运行过程中,是全部基于容器化的,但这里的容器并不只是单纯的 Docker 容器,它也包括 Rocket 等其他相关的隔离措施。如果在生产环境中要求比较高的话,可能会有一些安全容器,比如 Kata Containers 等等。K8s 在 Slave 上部署的应用程序,都是用容器化的方式去做分发和管理,同时用容器化的技术做隔离。

  • K8s 是一个自动化运维系统。

它是一个声明式的 API,我们只需要告诉 K8s 集群需要创建一个 Deployment,设置的副本数量,需要达到一个什么样的状态,调度系统也就是 K8s 就会帮助我们维持状态,直到达到设置的状态为止。如果中间发生了一些 failover 或者发生了一些失败,它会自动地将任务迁移到其他的机器上,来满足当前的调度。

  • 云原生。

目前几乎所有的云厂商都已经提供了 K8s 服务支持,包括国内的阿里、国际上的 Amazon、Google 等等,包括传统的微软都已经提供了对于 K8s 的 Managed 服务或者是 Unmanaged 服务。随着目前 Lambda 表达式或者 Function 计算的应用, Serverless 方式也变得更加流行。除了传统的部署小集群以外,通过云产生一个 manager,构建一个大的 Serverless 集群,然后用户按需进行计算资源付费,这也是一种新的模式。

Kubernetes 的架构

1 Kubernetes架构图.png

上图是 K8s 基本的架构,这是一个非常典型的 Master-Slave 的架构。

  1. 在 Master 上,是由 Controller,API Server,Scheduler 以及包括做存储的 Etcd 等构成。Etcd 可以算成 Master,也可以作为独立于 Master 之外的存储来对待。Master 的 Controller、API Server、Scheduler 都是单独的进程模式。这和 Yarn 有一些不同,Yarn 的整个 Master 是一个单进程的模式。K8s 的 Master 还可以在多个 Master 之间完成自发的选举,然后由 active 状态的 Master 对外提供服务。
  2. 在 Slave 上,它主要是包括 Kube proxy、Kubelet,以及 Docker 等相关的组件,每个 Node 上部署的相关组件都是类似的,通过它来管理上面运行的多个 Pod。
  3. 根据不同用户的习惯,可以通过 UI 或者 CLI 的方式向 K8s 提交任务。用户可以通过 K8s 提供的 Dashboard Web UI 的方式将任务进行提交,也可以通过 Kubectl 命令行的方式进行提交。

Kubernetes 的一些概念

  • ConfigMap

ConfigMap 是一个 K-V 数据结构。通常的用法是将 ConfigMap 挂载到 Pod ,作为配置文件提供 Pod 里新的进程使用。在 Flink 中可以将 Log4j 文件或者是 flink-conf 文件写到 ConfigMap 里面,在 JobManager 或者 TaskManger 起来之前将它挂载到 Pod 里,然后 JobManager 去读取相应的 conf 文件,加载其配置,进而再正确地拉起 JobManager 一些相应的组件。

  • Service(简称 SVC )

一种对外暴露服务的方式。如果现在内部有一个服务,需要在 K8s 外部进行访问,此时可以通过 Service,然后用 LoadBalancer 或者 NodePort 的方式将其暴露出去。

如果有一个 Service,不希望或不需要将其对外暴露,可以把它设置为 Cluster IP 或者是 None 这种 Headless 的模式。这个时候,它可以用于服务之间相互连接,例如传统的前端去联后端服务,或者是在 Flink 中非 HA 的情况下,TaskManager 去连 JobManager 等等。

  • Pod

Pod 是 K8s 里最小的调度单元。K8s 都是以 Pod 进行调度的。每个 Pod 可以包含一个或者多个 Container。每个 Container 都会有自己的资源,相互之间资源也是已经隔离的,但是所有 Container 共享同一个网络,这就意味着所有的 Container 可以通过 localhost 直接进行通信。

同时,Container 之间可以通过 Volume 共享一些文件。比如 JobManager 或 TaskManager 的 Pod 里产生了一些日志,在同一个 Pod 里再去起另外一个进程收集不符合 K8s 的原生语义。可以通过 SideCar 的方式去起另外一个 Container,把 JobManager 产生的日志收走。这就是一个 Pod 多个 Container 的具体用途。

  • Deployment

因为 Pod 是可以随时被终止的,所以当 Pod 终止之后,就无法再拉起来去做 failover 等其他相关操作。Deployment 是在 Pod 之上提供了更高一层的抽象。Deployment 可以设置 Pod 的状态,比如需要起 5 个 TaskManager,Deployment 会维持当前状态。当有 TaskManager 挂了以后,它会起新的 TaskManager,来补上。这样可以避免自己汇报 Pod 的状态,可以去做一些更复杂的管理 failover 等等。这也是最基础的概念——运维自动化。

2 目前使用情况.png

目前都有什么样的任务在 K8s 上运行?

除了传统的 Web 以及移动端一些无状态的如 MySQL、Kafka 等存储相关的任务外,有状态的服务也不断地在 K8s 上做适配和运行。除此之外,深度学习框架 Tensorflow 原生即可在 K8s 上运行,包括 Spark、Flink 等等,一些大数据相关的框架也在不断地去兼容,不断地去适配,以便让更多的大数据服务可以更好地在 K8s 上运行。

从这一点我们可以看出, K8s 相比于 Yarn 或传统的 Hadoop 具有更好的包容性,它可以把存储、深度学习、大数据包括 OLAP 分析等多种计算框架、引擎都运行在 K8s 之上。这样就会带来一个很大的好处,整个公司只需要去管理一个调度架构,就可以把所有的存储,实时计算,批量计算,包括深度学习,OLAP 分析等等,都在一个集群里面运行。除了管理更方便以外,也可以达到更好的集群利用率。

Flink On Kubernetes 的部署演进

Flink 在 K8s 上最简单的方式是以 Standalone 方式进行部署。这种方式部署的好处在于不需要对 Flink 做任何改动,同时 Flink 对 K8s 集群是无感知的,通过外部手段即可让 Flink 运行起来。

Standalone Session On K8s

Standalone方式在k8s运行步骤:

3 Standalone Session On K8s方式.png

如图所示:

  • 步骤1, 使用 Kubectl 或者 K8s 的 Dashboard 提交请求到 K8s Master。
  • 步骤2, K8s Master 将创建 Flink Master Deployment、TaskManager Deployment、ConfigMap、SVC 的请求分发给 Slave 去创建这四个角色,创建完成后,这时 Flink Master、TaskManager 启动了。
  • 步骤3, TaskManager 注册到 JobManager。在非 HA 的情况下,是通过内部 Service 注册到 JobManager。
  • 至此,Flink 的 Sesion Cluster 已经创建起来。此时就可以提交任务了。
  • 步骤4,在 Flink Cluster 上提交 Flink run 的命令,通过指定 Flink Master 的地址,将相应任务提交上来,用户的 Jar 和 JobGrapth 会在 Flink Client 生成,通过 SVC 传给 Dispatcher。
  • 步骤5,Dispatcher 会发现有一个新的 Job 提交上来,这时会起一个新的 JobMaster,去运行这个 Job。
  • 步骤6,JobMaster 会向 ResourceManager 申请资源,因为 Standalone 方式并不具备主动申请资源的能力,所以这个时候会直接返回,而且我们已经提前把 TaskManager 起好,并且已经注册回来了。
  • 步骤7-8,这时 JobMaster 会把 Task 部署到相应的 TaskManager 上,整个任务运行的过程就完成了。

Standalone perjob on K8s

现在我们看一下 Perjob 的部署,因为 Session Cluster 和 Perjob 分别都有不同的适用场景,一个 Session 里面可以跑多个任务,但是每个任务之间没有办法达到更好的隔离性。而 Perjob 的方式,每个job都会有一个自己独立的 Flink Cluster 去运行,它们之间相互独立。

■ Perjob 的特点:

  1. 用户的 Jar 和依赖都是在镜像里提前编译好,或者通过 Init Container 方式,在真正 Container 启动之前进行初始化。
  2. 每个 Job 都会启动一个新的 Cluster。
  3. 一步提交,不需要像 Session Cluster 一样先启动集群再提交任务。
  4. 用户的 main 方法是在 Cluster 里运行。在特殊网络环境情况下,main 方法需要在 Cluster 里运行的话,Session 方式是无法做到的,而 Perjob 方式是可以执行的。

4 Standalone perjob on K8s方式.png

■ 执行步骤:

由 Standalone JobCluster EntryPoint 执行,从 classpath 找到用户 Jar,执行它的 main 方法得到 JobGrapth 。再提交到 Dispathcher,这时候走 Recover Job 的逻辑,提交到 JobMaster。JobMaster 向 ResourceManager 申请资源,请求 slot,执行 Job。

Helm Chart 方式

Helm 类似于 Linux 上的 Yum。

K8s 里的 Helm 是一个包管理工具,可以很方便的安装一个包。部署一个 Flink 集群等操作,只需要 helm install 就可以将之前很多步的安装操作,一步去完成。本质上没有什么差别,只是它用 Helm 重新组织,包括一些模板等等,用起来会更加方便。

Flink Kubernetes Operator

5 Flink Kubernetes Operator方式.png

  • 任务生命周期管理

使用 Operator 的方式来管理 Flink,主要是来管理多个 Cluster 的情况,可起到任务生命周期管理的作用。它和 Standalone、Native 的方式,本质上不是在一个层次上,它类似于一个更上层的做任务管理的工具。

  • 基于 K8s Operator,方便创建 Flink Cluster。

之前去创建一个 Perjob Cluster,可能需要部署多次,如果任务要做升级,甚至可能需要把之前的删掉,然后修改配置,再重新部署。

引入 K8s Operator 就只需要做一些简单操作。比如 Operator 中有自己的一套 yaml 描述方式,修改其中某一个字段,如修改 image 的 version 字段,此时后台会自动触发一些重启,包括对目前正在执行的任务做 savepoint,然后把 Cluster 销毁掉,再进行新的定向就可以将集群拉起,等一系列自动化的操作。对 Flink 的配置做修改等也都可以在后台自动化完成。

目前 Operater 有 Lyft 和 Google 两个开源的 operator,他们在功能上类似,而且都是已经经过生产检验,与目前的 Standalone Cluster 结合的比较好的,已经达到生产可用的标准。

参考:
1.lyft/flinkk8soperator
https://github.com/lyft/flinkk8soperator
2.GoogleCloudPlatform/flink-on-k8s-operator
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator

总结

当然,Flink on K8s 当前也存在一些不足:

  • 无论 Operator、Helm Chart 或者是直接使用 Kubectl Yaml 的方式,Flink 都感知不到 K8s 的存在。
  • 目前主要使用静态的资源分配。需要提前确认好需要多少个 TaskManager,如果 Job 的并发需要做一些调整,TaskManager 的资源情况必须相应的跟上,否则任务无法正常执行。
  • 用户需要对一些 Container、Operator 或者 K8s 有一些最基本的认识,这样才能保证顺利将 Flink 运行到 K8s 之上。
  • 对于批处理任务,或者想在一个 Session 里提交多个任务不太友好。无法实时申请资源和释放资源。因为 TaskManager 的资源是固定的,批处理任务可能会分多个阶段去运行,需要去实时地申请资源、释放资源,当前也无法实现。如果需要在一个 Session 里跑多个 Job 并且陆续运行结束当前也无法实现。这时如果维持一个比较大的 Session Cluster,可能会资源浪费。但如果维持的 Session Cluster 比较小,可能会导致 Job 跑得慢或者是跑不起来。

基于这几点,我们在社区推进了一个 Native 的集成方案,这个 Native 类似于 Yarn 这种原生的集成,就是让 Flink 原生的感知到下层 Cluster 的存在。

Navtive Integration 的技术细节

为什么叫 Native 方式?包括如下几个含义。

  • 资源申请方式:Flink 的 Client 内置了一个 K8s Client,可以借助 K8s Client 去创建 JobManager,当 Job 提交之后,如果对资源有需求,JobManager 会向 Flink 自己的 ResourceManager 去申请资源。这个时候 Flink 的 ResourceManager 会直接跟 K8s 的 API Server 通信,将这些请求资源直接下发给 K8s Cluster,告诉它需要多少个 TaskManger,每个 TaskManager 多大。当任务运行完之后,它也会告诉 K8s Cluster释放没有使用的资源。相当于 Flink 用很原生的方式了解到 K8s Cluster 的存在,并知晓何时申请资源,何时释放资源。
  • Native 是相对于 Flink 而言的,借助 Flink 的命令就可以达到自治的一个状态,不需要引入外部工具就可以通过 Flink 完成任务在 K8s 上的运行。

具体如何工作?主要分 Session 和 Perjob 两个方面来给大家介绍。

Native Kubernetes Session 方式

5 Native Kubernetes Session方式.png

首先 Session 的方式。

  • 第一个阶段:启动 Session Cluster。Flink Client 内置了 K8s Client,告诉 K8s Master 创建 Flink Master Deployment,ConfigMap,SVC。创建完成后,Master 就拉起来了。这时,Session 就部署完成了,并没有维护任何 TaskManager。
  • 第二个阶段:当用户提交 Job 时,可以通过 Flink Client 或者 Dashboard 的方式,然后通过 Service 到 Dispatcher,Dispatcher 会产生一个 JobMaster。JobMaster 会向 K8sResourceManager 申请资源。ResourceManager 会发现现在没有任何可用的资源,它就会继续向 K8s 的 Master 去请求资源,请求资源之后将其发送回去,起新的 Taskmanager。Taskmanager 起来之后,再注册回来,此时的 ResourceManager 再向它去申请 slot 提供给 JobMaster,最后由 JobMaster 将相应的 Task 部署到 TaskManager 上。这样整个从 Session 的拉起到用户提交都完成了。
  • 需注意的是,图中 SVC 是一个 External Service。必须要保证 Client 通过 Service 可以访问到 Master。在很多 K8s 集群里,K8s 和 Flink Client 是不在同一个网络环境的,这时候可以通过 LoadBalancer 的方式或者 NodePort 的方式,使 Flink Client 可以访问到 Jobmanager Dispatcher,否则 Jar 包是无法提交的。

Native Kubernetes Perjob 方式

6 Native Kubernetes Perjob方式.png

我们再来看一下 Perjob 的方式,如图所示,Perjob 方式其实和之前是有一些类似,差别在于不需要先去起一个 Session Cluster,再提交任务,而是一步的。

  • 首先创建出了 Service、Master 和 ConfigMap 这几个资源以后,Flink Master Deployment 里面已经带了一个用户 Jar,这个时候 entrypoint 就会从用户 Jar 里面去提取出或者运行用户的 main,然后产生 JobGraph。之后再提交到 Dispatcher,由 Dispatcher 去产生 Master,然后再向 ResourceManager 申请资源,后面的逻辑的就和 Session 的方式是一样的。
  • 它和 Session 最大的差异就在于它是一步提交的。因为没有了两步提交的需求,如果不需要在任务起来以后访问外部 UI,就可以不用外部的 Service。可直接通过一步提交使任务运行。通过本地的 port-forward 或者是用 K8s ApiServer 的一些 proxy 可以访问 Flink 的 Web UI。此时,External Service 就不需要了,意味着不需要再占用一个 LoadBalancer 或者占用 NodePort。这就是 perjob 方式。

Session 与 Perjob 方式的不同

我们来看一下 Session 和 Perjob 方式有哪些不同?

表格.jpg

Demo 演示

Session

  1. 启动 Session

注意:image 需要替换为自己的镜像或者使用 docker hub 上的 Flink 官方镜像库。

./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=k8s-session-1 \
-Dkubernetes.container.image=<ImageName> \
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2
  1. 提交 job 到 Session
./bin/flink run -d -p 10 -e kubernetes-session -Dkubernetes.cluster-id=k8s-session-1 examples/streaming/WindowJoin.jar
  1. 停止 Session
echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=k8s-session-1 -Dexecution.attached=true

Application

Application 模式是 FLIP-85 引入的一种新的模式,长远来看是用于替换社区目前的 perjob 模式的。目前 application 模式和 perjob 模式最大的区别是用户代码在 client 端还是 jobmanager 端运行。在 K8s 部署上,由于用户的 jar 和依赖都可以提前打在镜像里面,所以支持 application 模式就变得非常容易。

注意:Application 模式是在 Flink 1.11 中才支持的功能,需要使用对应的 Flink client 和镜像。可以参考社区文档来构建自己的镜像。

./bin/flink run-application -p 10 -t kubernetes-application \
-Dkubernetes.cluster-id=k8s-app1 \
-Dkubernetes.container.image=<ImageName> \
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
local:///opt/flink/examples/streaming/WindowJoin.jar

目前功能的状态

  • Native Kubernetes Session模式

FLINK-9953:已经在 Flink 1.10发布:
https://issues.apache.org/jira/browse/FLINK-9953

  • Native Kubernetes Application 模式

FLINK-10934:计划在 Flink 1.11发布
https://issues.apache.org/jira/browse/FLINK-10934

  • Native模式下高可用

FLINK-12884:目前高可用方式是基于 zk 实现的,未来是希望不依赖于外部组件,基于 K8s 的 ConfigMap 去做 Meta 存储和 Leader 选举。目前已经有内部实现,未来将会贡献给社区。
https://issues.apache.org/jira/browse/FLINK-12884

  • 其他功能支持:

FLINK-14460:计划陆续在 Flink 1.11/1.12两个版本中发布和完善
https://issues.apache.org/jira/browse/FLINK-14460

包括内容如下:

  • Label,annotation,node-selector:希望将任务调度到某个集群特定的机器上的应用场景可能需要 node-selector,希望给集群打特定的 Label 并且外部能访问到,可以使用 Label 的功能等等。
  • Sidecar container:帮助完成日志收集等
  • Init container:可以帮助在 JobManager,TaskManager 启动之前,把 Jar 下载好,这样我们可以使用统一的景象,不需要把用户 Jar 打到镜像里
  • 存储优化
  • Pod 模板完成一些不常用的功能等等

目前社区的 Flink on K8s 的 native 方案还在快速发展和完善,希望大家多多试用并且提出反馈意见,如果有兴趣也非常欢迎一起参与进来开发。

文章不够看?点击「阅读原文」可直接回顾作者现场分享的讲解视频~

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:

一套基于Apache Flink构建的一站式、高性能实时大数据处理平台,广泛适用于流式数据处理、离线数据处理、DataLake计算等场景。

官方博客
链接