本文整理自网易数帆软件工程师潘成,在 ASF CommunityOverCode Asia 2023(北京)的分享。本篇内容主要为:
- Spark 云原生的收益和挑战
- 如何基于 Apache Kyuubi 构建统一 Spark 任务网关
- 如何基于 Apache Celeborn (Incubating) 构建 Shuffle Service
- 网易在其他方面对 Spark on Kubernetes 方案的改进
前言
在过去数年中,网易在大数据云原生领域进行了长足的探索。本文围绕如何基于 Apache Kyuubi & Celeborn 等开源技术,构建企业级 Spark on Kubernetes 云原生离线计算平台展开,包含技术选型、架构设计、经验教训、缺陷改进、降本增效等内容,深入剖析网易在该领域的探索成果。
一、Spark on Kubernetes 的收益与挑战
Apache Spark 作为如今大数据离线计算领域事实标准,被广泛应用于网易内部以及数据中台等商业化产品中。目前,Spark on YARN 是业界最主流、也是最成熟的使用方式,但随着以 Kubernetes 为代表的云原生技术的流行,Spark on K8s 正在受到越来越多用户的青睐。
网易自 2018 年起,就有对 Spark on K8s 技术开始探索。相比于 Spark on YARN,Spark on K8s 在很多方面有显著的优势;同时作为较新的技术,在某些特性方面还不如 Spark on YARN 完备。下面我们就其中较为关键的一些部分做些简单的对比:
在隔离性上,得益于容器技术的加持,Spark on K8s 相较于 YARN 进程级的作业隔离机制具备显著的优势。一方面,容器化大幅简化了 Spark 作业依赖管理,尤其是 Python 依赖、动态链接库做到了很好的隔离;同时,容器化配合 cgroup 机制,可以更加严格和精细化地对作业资源做出限制。
在集群层面的资源管理策略中,往往应用程序并不会 100% 使用自身所申请到资源,超售是常见的提升集群资源利用率的策略。以 CPU 为例,YARN 可以在集群层面设置 vCore 与物理 Core 的比例,即 CPU 的超售比,但 K8s 却可以支持作业级别的 CPU 超售比;集群中的任务对 CPU 的利用率不尽相同,这对很多以数据传输为代表的、重 IO 的作业来说,设置较高的 CPU 超售比可以大幅节约 CPU 资源。
动态资源分配是 Spark 作业提升资源利用率的一个非常重要的特性,在 Spark on YARN 中,External Shuffle Service 作为插件常驻在每个 NodeManager 进程中,用于提供当前节点 shuffle 数据的读取服务,因此 Executor 可以随时退出而无需考虑下游 Reduce Task 如何读取 shuffle 数据的问题;但在 K8s 上,没有与之对应的组件,取而代之的是众多可选的方案,会在后文详谈。
Spark on YARN 提供了很多辅助功能,比如 YARN 天然有 Application 的概念、提供日志聚合服务、支持 Spark Live UI 代理等,这些在 Spark on K8s 中并不是开箱即用的。
在部署方案上,Spark on YARN 提供标准化的方案;但 Spark on K8s 却有各种各样的玩儿法,如前文提及的 shuffle 方案,又比如以任务提交为例,有以 Spark Operator 为代表的 yaml 提交方案,和 Spark 原生的 spark-submit 方案,层出不穷。
同时,我们面临着一个非常普遍的挑战:用户的 Kubernetes 基础设施不尽相同,我们如何在做到支持各种基础设施的前提下,尽可能利用各自的特点,发挥最大收益呢?
例如,以公有云和私有化部署为代表的基础设施存在显著差异:
本着降本增效的原则,就成本优势上来讲:
- 公有云上除了支持按时间买断制以外,还提供按量计费的模式,根据资源类型的不同,一般在整体使用率低于总时间 30%~60% 的情况下,按量计费可以大幅度降低成本;公有云竞价实例在价格上有显著的竞争力,但却充满着不确定性和随时会被抢占的风险;
- 私有部署的硬件天然不如公有云那么灵活,基本上都是要提前采购的。为了最大化提升资源利用率,往往会从在离线混布入手。通常情况下,在线业务波峰在白天,离线任务波峰在夜间,通过混合部署、资源出让来提升集群资源利用率,降低综合成本。
存储是影响 Spark on K8s 中一个需要重点关注的对象。公有云上,一般可以提供各规格的网络磁盘,满足各种远程挂载需求;而私有部署场景往往会受很大限制,多以绑定物理节点的本地磁盘为主,相应的,同等 IO 性能和容量上,本地硬件的成本往往更低。
其他硬件,如网卡、CPU、内存也类似,公有云一般可以灵活地提供各种配比;私有部署多局限于特定规格型号,但往往单价更低。
二、如何基于 Apache Kyuubi 构建统一 Spark 任务网关
在网易内部,所有 Spark 服务都是托管的。我们使用 Apache Kyuubi 作为统一的 Spark 任务提交网关,Kyuubi 提供多种用户接口,也支持多种类型的 Spark 任务。典型的使用场景包括:用户可以使用 JDBC/BeeLine 以及各种 BI 工具,连接进行交互的数据分析;使用 RESTful API 向 Kyuubi 提交 SQL/Python/Scala/Jar 批作业。
Kyuubi 作为一个企业级的大数据网关,在多租户和安全性也做了充分的支持。举例来说,Kyuubi 在 Kerberos 支持上做了深度适配,比如简化了 JDBC 客户端使用 Kerberos 认证的方式;支持 Kerberos/LDAP 同时启用,客户端可以选择任何一种方式认证;支持 Hadoop 用户代理机制,在保证安全的同时,省去海量用户 keytab 的管理;支持 Hadoop Delegation Token 续期,满足 Spark 常驻任务的认证需求等。
Kyuubi 不乏一些来自金融券商和欧美企业的社区用户和贡献者,他们对安全有着更为极致的要求,比如服务组件间的内部通信也需要加密,支持权限控制和 SQL 审计等, Kyuubi 对此类场景亦可胜任。
除了作为网关的主体功能外,Kyuubi还提供一系列可以独立使用Spark插件,可以提供如小文件治理、Z-Order、SQL 血缘提取、限制查询数据扫描量等企业级功能。
在架构上,Kyuubi 两个重要组件分别为 Server 和 Engine。其中 Server 是一个轻量级常驻服务,而 Engine 是按需启停的 Spark Application。客户端接入后,Kyuubi Server 会根据路由规则寻找合适的 Engine,若没有命中,则会调用 spark-submit 拉起一个新的 Spark Application,当 Spark Application 闲置一段时间后,会主动退出释放资源。Kyuubi 选择了使用 Spark 原生的方式对接 Kubernetes,而非 Spark Operator 模式,这种选择使得 Kyuubi 能够更加一致地使用 spark-submit 命令对接不同的资源管理系统,如 YARN、Mesos、Standalone。这种设计,对于已有大数据基础设施的企业而言,更适合用于平滑过渡到云原生大数据架构。
对于交互会话,Kyuubi 创造性地提出引擎共享级别的概念,内置的四个选项:CONNECTION、USER、GROUP、SERVER 隔离性依次降低,共享程度依次增强,搭配使用可以满足多种负载场景。例如 CONNECTION 共享级别为每个会话拉起一个单独的 Spark Application,有效地保证了会话之间的隔离性,通常用于大型 ETL 调度任务;USER 共享级别使得同一个用户复用同一个 Spark Application,既加快了新会话的启动速度,又可以保证当 Spark Application 意外退出(如因大结果集查询导致 OOM)时不影响他人。对于批任务,仅支持类似 CONNECTION 共享级别的语义,此时 Kyuubi 表现得更像一个任务调度系统。
Kyuubi Server 被设计为一个轻量级网关,相比之下,Kyuubi Engine 的稳定性略低,很有可能因查询返回大结果集而 OOM,Server 和 Engine 进程分离的设计很好的保证了 Server 的稳定性,同时 Engine 共享级别的设计很好的控制了 Engine 崩溃的影响范围。
在具体内部实现上,Kyuubi 的交互式会话中有两个要的概念:Session 和 Operation,这两个概念分别与 JDBC 中的 Connection 和 Statement,以及 Spark 中的 SparkSession 和 QueryExecution 一一对应。
如上是一段典型的通过 JDBC 驱动连接 Kyuubi 执行 Spark SQL 的代码,可以清晰地看到客户端 JDBC 调用与 Spark 引擎侧之间的对应关系。特别地,在拉取结果集时,结果集会以微批的形式从 Spark Driver 经过 Kyuubi Server 返回给客户端,这有效地降低 Kyuubi Server 的内存压力,保障了 Kyuubi Server 的稳定性;在最新的 1.7 版本中,Kyuubi 支持了基于 Apache Arrow 的结果集序列化方式,大幅提升了大结果集场景的传输效率。
三、如何基于 Apache Celeborn(Incubating) 构建 Shuffle Service
正如前文所述,shuffle 方案对 Spark on K8s 动态资源分配起着至关重要的作用,Executor 只有在确保下游读取 shuffle 数据不受影响的前提下,才可以被释放。近年来,社区以及各大公司在 shuffle 方案上玩法层出不穷,这里简要介绍其中较为主流的几种方式。
首先是 Shuffle Tracking 配合 decommission,这是 Spark 内置的一个轻量级方案,无需维护额外服务。Shuffle Tracking 即通过追踪 RDD 的血缘,分析哪些 shuffle 数据还有可能被下游消费,进而阻止这些 Executor 退出以保证提供 shuffle 数据读取服务。显然,延迟退出会造成一定的资源浪费,并且不能处理 Executor OOM 的情况,decommission 作为一个补充手段,当 Executor 闲置一段时间后,退出前将 shuffle 数据搬运到还未超时的 Executor 上。该方案经过我们的实践,在数据量较大、集群负载较高时表现并不理想。
另一个自然的想法是,在 K8s 上复刻 YARN 上的方案,即通过 DaemonSet 在每个 K8s Node 启动一个 External Shuffle Service 进程提供 Shuffle 读取服务。该方案在性能和可靠性上与 Spark on YARN 完全一致,在网易早期有一定规模的应用。但同时也有一定的弊端,比如不适用于竞价实例(只能使用 Pod,不允许在 Node 上启动 DaemonSet),需要 Host Network 等。
最近一两年,Remote Shuffle Service 方案被越来越多用户青睐。随着网卡技术的发展,网络读写与磁盘读写的效率差异逐渐缩小,理论上,将 Spark 原生 shuffle 对本地磁盘的读写转换成网络读写在性能上不一定会造成劣势。最重要的是,将 Shuffle Service 作为一个单独的服务部署,按需伸缩更符合云原生的理念;同时我们也可以有更多的操作空间,比如通过节点之间平衡存储空间提升利用率,通过层级存储在保证性能的同时,降低对高性能磁盘容量的需求等。不过我们应该要明确的是,Spark 项目自诞生以来,shuffle 就作为一项核心特性被不断改进;Remote Shuffle Service 作为相对新潮的技术,在稳定性、正确性、性能上都会有很长的路要走。
在具体的Remote Shuffle Service技术选型中,网易选择了基于 Apache Celeborn (Incubating) 构建内部的 Shuffle Service 平台。
其中我们关注的核特性包括:
- Celeborn 服务端包含 Master 和 Worker 两种角色。其中 Master 起协调作用,是一个 Raft 集群,具备很好的容灾能力,且支持滚动升级;Worker 作为数据节点提供 shuffle 数据的读写服务,可以根据负载随时扩缩容;并且组件之间的心跳、健康检查机制可以快速发现和剔除故障 Worker 节点;
- Celeborn 提供异步高效的副本机制,开启后对性能影响很小,Client 只需向主 Worker 节点数据写入成功即可返回,主 Worker 节点会异步向备份 Worker 节点复制 shuffle 数据;
- 可以根据 Worker 的负载,智能地分配 shuffle 分区,使得集群负载更加平衡。这对于异构节点部署 Worker 至关重要,比如某些节点采用 SSD,而某些节点采用 HDD;又比如由于新旧服务器混用,硬件老化而导致的不同 Worker 磁盘性能差异等;
- 支持层级存储,且对于分布式存储,Client 可以直接从存储系统读取数据,降低对 Worker 的压力。
总结一下 Spark on Kubernetes 在网易的演进过程:
早期方案:
仅支持通过 JDBC、BeeLine 提交 SQL 任务
Kyuubi 集群部署在 K8s 集群外的物理机节点上
Spark 作业以 Client 模式运行
在每台节点上以 DaemonSet 形式启动 External Shuffle Service
Spark 作业、ESS 等均以 Host Network 模式运行
每台节点上安装 SSD,并以 hostPath 模式挂载到 Pod 里
改进后的方案:
支持通过 JDBC、BeeLine、RESTful 提交 SQL/Jar 任务
Kyuubi 以 StatefulSet 的形式部署在 K8s 集群中
Kyuubi 使用 MySQL 存储状态数据
Spark 作业以 Cluster 模式运行
将 Celeborn 以 StatefulSet 的形式部署在 K8s 集群中,作为 Remote Shuffle Service
在公有云上,使用竞价实例 Pod 为 Spark 作业提供计算资源
特别地,竞价实例具有极低的成本优势,对降本增效起到了至关重要的作用。
四、网易在其他方面对 Spark on Kubernetes 的改进
如前所述,Spark on Kubernetes 原生并没有像 YARN 一样提供日志聚合服务,这对 Spark 作业分析和故障排查来说是很不友好的。
我们通过以下方式,使得 Spark on Kubernetes 能够获得与 Spark on YARN 类似的日志跳转体验:
使用 Grafana Loki 搭建一个建议的日志存储和查询服务,并配置 Grafana 作为日志展示服务
使用 log4j-loki-appender,将 Spark Application 日志写入到远程日志服务中
在 SPARK-40887 中,我们通过改进 Spark,支持以配置的方式在 Spark UI 中添加外部日志服务的跳转链接;其中链接可以是模版,比如可以在跳转链接中使用 POD_NAME 等变量作为查询条件
Pod 分配策略是另一个有趣的话题,比如在以下两个场景中,我们需要使用不同的分配策略。
在私有部署场景里,对于一些网络、IO 较重的任务,如果大量的 Executor 调度到同一个节点,很有可能会形成热点,造成硬件上的性能瓶颈。对于这种情况,我们可以使用反亲和性,使得 Executor Pod 在分配时,能够尽量地被打散在所有节点上。
在离线混布场景中,我们更希望使用 bin-packing 的 Pod 分配策略,让 Executor Pod 尽可能地集中在少量的节点上,这样在出让节点时,可以快速腾空机器,降低对 Spark 任务的影响。
来自网易以及 Kyuubi 社区的开发者还对 Spark on K8s 做出了很多重要的改进,限于时间和篇幅无法一一详述,各位可以根据 JIRA 工单到社区中搜索相应的 Pull Request。在此我们也非常感谢 Spark 社区的开发者在代码审查等方面所提供的帮助!
现场问答
Q:我们已经在 K8s 上部署了 Kyuubi 用于往 K8s 上提交 Spark 任务,下一步我们打算使用 Kyuubi 也往 YARN 提交 Spark 和 Flink 任务。请问在这种场景中,是推荐为每种负载单独部署一套 Kyuubi 服务,还是使用同一套 Kyuubi 服务呢?
A:首先要明确的一点是,单个 Kyuubi 实例或集群是支持管理多个 Spark 版本、使用多种计算引擎、往不同资源管理系统提交任务的。如前文所述,Kyuubi Server 是一个轻量稳定的服务,在实际场景中,我们也建议尽可能地使用单一的 Kyuubi Server 集群管理多个引擎,以实现 Unified Gateway。我们建议仅在用户有极高 SLA 要求,或者出于安全、合规性的考虑,必须物理隔离的场景中,独立部署 Kyuubi 集群。
Q:分享中提到,Celeborn 支持滚动升级,我实测下来,Celeborn Worker 节点重启后,会造成任务失败,可能是哪里的问题?
A:Celeborn 在设计上是支持滚动重启的。Master 节点是一个 Raft 集群,天然支持滚动升级。在 Celeborn 0.3.0 中,Celeborn 加入了对 Worker 节点的优雅停机特性,用于支持滚动升级。具体来说,当向 Worker 节点发送优雅停机信号时:正在写入的 client 会收在返回信息中感知到 Worker 正在停机的状态,暂停当前分区的写入,并通过 revive 机制请新的 slot 用以写入后续的数据;所有写入请求断开后,Worker 自身会将内存中的数据和状态 flush 到磁盘上,然后退出;正在读取的 client,会自动切换到 replica 节点读取数据;Worker 重启后,从磁盘恢复状态并可以继续提供数据读取服务。综上所述,要支持 Worker 的滚动升级,必须满足:版本 0.3.0 或以上;启用数据副本;启用优雅停机。