Apache Kyuubi & Celeborn (Incubating) 助力 Spark 拥抱云原生

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 网易数帆软件工程师潘成,在 ASF CommunityOverCode Asia 2023(北京)的分享。

本文整理自网易数帆软件工程师潘成,在 ASF CommunityOverCode Asia 2023(北京)的分享。本篇内容主要为:

  1. Spark 云原生的收益和挑战
  2. 如何基于 Apache Kyuubi 构建统一 Spark 任务网关
  3. 如何基于 Apache Celeborn (Incubating) 构建 Shuffle Service
  4. 网易在其他方面对 Spark on Kubernetes 方案的改进

前言

在过去数年中,网易在大数据云原生领域进行了长足的探索。本文围绕如何基于 Apache Kyuubi & Celeborn 等开源技术,构建企业级 Spark on Kubernetes 云原生离线计算平台展开,包含技术选型、架构设计、经验教训、缺陷改进、降本增效等内容,深入剖析网易在该领域的探索成果。

一、Spark on Kubernetes 的收益与挑战

Apache Spark 作为如今大数据离线计算领域事实标准,被广泛应用于网易内部以及数据中台等商业化产品中。目前,Spark on YARN 是业界最主流、也是最成熟的使用方式,但随着以 Kubernetes 为代表的云原生技术的流行,Spark on K8s 正在受到越来越多用户的青睐。

网易自 2018 年起,就有对 Spark on K8s 技术开始探索。相比于 Spark on YARN,Spark on K8s 在很多方面有显著的优势;同时作为较新的技术,在某些特性方面还不如 Spark on YARN 完备。下面我们就其中较为关键的一些部分做些简单的对比:

1

在隔离性上,得益于容器技术的加持,Spark on K8s 相较于 YARN 进程级的作业隔离机制具备显著的优势。一方面,容器化大幅简化了 Spark 作业依赖管理,尤其是 Python 依赖、动态链接库做到了很好的隔离;同时,容器化配合 cgroup 机制,可以更加严格和精细化地对作业资源做出限制。

在集群层面的资源管理策略中,往往应用程序并不会 100% 使用自身所申请到资源,超售是常见的提升集群资源利用率的策略。以 CPU 为例,YARN 可以在集群层面设置 vCore 与物理 Core 的比例,即 CPU 的超售比,但 K8s 却可以支持作业级别的 CPU 超售比;集群中的任务对 CPU 的利用率不尽相同,这对很多以数据传输为代表的、重 IO 的作业来说,设置较高的 CPU 超售比可以大幅节约 CPU 资源。

动态资源分配是 Spark 作业提升资源利用率的一个非常重要的特性,在 Spark on YARN 中,External Shuffle Service 作为插件常驻在每个 NodeManager 进程中,用于提供当前节点 shuffle 数据的读取服务,因此 Executor 可以随时退出而无需考虑下游 Reduce Task 如何读取 shuffle 数据的问题;但在 K8s 上,没有与之对应的组件,取而代之的是众多可选的方案,会在后文详谈。

Spark on YARN 提供了很多辅助功能,比如 YARN 天然有 Application 的概念、提供日志聚合服务、支持 Spark Live UI 代理等,这些在 Spark on K8s 中并不是开箱即用的。

在部署方案上,Spark on YARN 提供标准化的方案;但 Spark on K8s 却有各种各样的玩儿法,如前文提及的 shuffle 方案,又比如以任务提交为例,有以 Spark Operator 为代表的 yaml 提交方案,和 Spark 原生的 spark-submit 方案,层出不穷。

同时,我们面临着一个非常普遍的挑战:用户的 Kubernetes 基础设施不尽相同,我们如何在做到支持各种基础设施的前提下,尽可能利用各自的特点,发挥最大收益呢?

2

例如,以公有云和私有化部署为代表的基础设施存在显著差异:

本着降本增效的原则,就成本优势上来讲:

  • 公有云上除了支持按时间买断制以外,还提供按量计费的模式,根据资源类型的不同,一般在整体使用率低于总时间 30%~60% 的情况下,按量计费可以大幅度降低成本;公有云竞价实例在价格上有显著的竞争力,但却充满着不确定性和随时会被抢占的风险;
  • 私有部署的硬件天然不如公有云那么灵活,基本上都是要提前采购的。为了最大化提升资源利用率,往往会从在离线混布入手。通常情况下,在线业务波峰在白天,离线任务波峰在夜间,通过混合部署、资源出让来提升集群资源利用率,降低综合成本。

存储是影响 Spark on K8s 中一个需要重点关注的对象。公有云上,一般可以提供各规格的网络磁盘,满足各种远程挂载需求;而私有部署场景往往会受很大限制,多以绑定物理节点的本地磁盘为主,相应的,同等 IO 性能和容量上,本地硬件的成本往往更低。

其他硬件,如网卡、CPU、内存也类似,公有云一般可以灵活地提供各种配比;私有部署多局限于特定规格型号,但往往单价更低。

二、如何基于 Apache Kyuubi 构建统一 Spark 任务网关

在网易内部,所有 Spark 服务都是托管的。我们使用 Apache Kyuubi 作为统一的 Spark 任务提交网关,Kyuubi 提供多种用户接口,也支持多种类型的 Spark 任务。典型的使用场景包括:用户可以使用 JDBC/BeeLine 以及各种 BI 工具,连接进行交互的数据分析;使用 RESTful API 向 Kyuubi 提交 SQL/Python/Scala/Jar 批作业。

3

Kyuubi 作为一个企业级的大数据网关,在多租户和安全性也做了充分的支持。举例来说,Kyuubi 在 Kerberos 支持上做了深度适配,比如简化了 JDBC 客户端使用 Kerberos 认证的方式;支持 Kerberos/LDAP 同时启用,客户端可以选择任何一种方式认证;支持 Hadoop 用户代理机制,在保证安全的同时,省去海量用户 keytab 的管理;支持 Hadoop Delegation Token 续期,满足 Spark 常驻任务的认证需求等。

Kyuubi 不乏一些来自金融券商和欧美企业的社区用户和贡献者,他们对安全有着更为极致的要求,比如服务组件间的内部通信也需要加密,支持权限控制和 SQL 审计等, Kyuubi 对此类场景亦可胜任。

除了作为网关的主体功能外,Kyuubi还提供一系列可以独立使用Spark插件,可以提供如小文件治理、Z-Order、SQL 血缘提取、限制查询数据扫描量等企业级功能。

4

在架构上,Kyuubi 两个重要组件分别为 Server 和 Engine。其中 Server 是一个轻量级常驻服务,而 Engine 是按需启停的 Spark Application。客户端接入后,Kyuubi Server 会根据路由规则寻找合适的 Engine,若没有命中,则会调用 spark-submit 拉起一个新的 Spark Application,当 Spark Application 闲置一段时间后,会主动退出释放资源。Kyuubi 选择了使用 Spark 原生的方式对接 Kubernetes,而非 Spark Operator 模式,这种选择使得 Kyuubi 能够更加一致地使用 spark-submit 命令对接不同的资源管理系统,如 YARN、Mesos、Standalone。这种设计,对于已有大数据基础设施的企业而言,更适合用于平滑过渡到云原生大数据架构。

对于交互会话,Kyuubi 创造性地提出引擎共享级别的概念,内置的四个选项:CONNECTION、USER、GROUP、SERVER 隔离性依次降低,共享程度依次增强,搭配使用可以满足多种负载场景。例如 CONNECTION 共享级别为每个会话拉起一个单独的 Spark Application,有效地保证了会话之间的隔离性,通常用于大型 ETL 调度任务;USER 共享级别使得同一个用户复用同一个 Spark Application,既加快了新会话的启动速度,又可以保证当 Spark Application 意外退出(如因大结果集查询导致 OOM)时不影响他人。对于批任务,仅支持类似 CONNECTION 共享级别的语义,此时 Kyuubi 表现得更像一个任务调度系统。

Kyuubi Server 被设计为一个轻量级网关,相比之下,Kyuubi Engine 的稳定性略低,很有可能因查询返回大结果集而 OOM,Server 和 Engine 进程分离的设计很好的保证了 Server 的稳定性,同时 Engine 共享级别的设计很好的控制了 Engine 崩溃的影响范围。

5

在具体内部实现上,Kyuubi 的交互式会话中有两个要的概念:Session 和 Operation,这两个概念分别与 JDBC 中的 Connection 和 Statement,以及 Spark 中的 SparkSession 和 QueryExecution 一一对应。

如上是一段典型的通过 JDBC 驱动连接 Kyuubi 执行 Spark SQL 的代码,可以清晰地看到客户端 JDBC 调用与 Spark 引擎侧之间的对应关系。特别地,在拉取结果集时,结果集会以微批的形式从 Spark Driver 经过 Kyuubi Server 返回给客户端,这有效地降低 Kyuubi Server 的内存压力,保障了 Kyuubi Server 的稳定性;在最新的 1.7 版本中,Kyuubi 支持了基于 Apache Arrow 的结果集序列化方式,大幅提升了大结果集场景的传输效率。

三、如何基于 Apache Celeborn(Incubating) 构建 Shuffle Service

正如前文所述,shuffle 方案对 Spark on K8s 动态资源分配起着至关重要的作用,Executor 只有在确保下游读取 shuffle 数据不受影响的前提下,才可以被释放。近年来,社区以及各大公司在 shuffle 方案上玩法层出不穷,这里简要介绍其中较为主流的几种方式。

6

首先是 Shuffle Tracking 配合 decommission,这是 Spark 内置的一个轻量级方案,无需维护额外服务。Shuffle Tracking 即通过追踪 RDD 的血缘,分析哪些 shuffle 数据还有可能被下游消费,进而阻止这些 Executor 退出以保证提供 shuffle 数据读取服务。显然,延迟退出会造成一定的资源浪费,并且不能处理 Executor OOM 的情况,decommission 作为一个补充手段,当 Executor 闲置一段时间后,退出前将 shuffle 数据搬运到还未超时的 Executor 上。该方案经过我们的实践,在数据量较大、集群负载较高时表现并不理想。

另一个自然的想法是,在 K8s 上复刻 YARN 上的方案,即通过 DaemonSet 在每个 K8s Node 启动一个 External Shuffle Service 进程提供 Shuffle 读取服务。该方案在性能和可靠性上与 Spark on YARN 完全一致,在网易早期有一定规模的应用。但同时也有一定的弊端,比如不适用于竞价实例(只能使用 Pod,不允许在 Node 上启动 DaemonSet),需要 Host Network 等。

最近一两年,Remote Shuffle Service 方案被越来越多用户青睐。随着网卡技术的发展,网络读写与磁盘读写的效率差异逐渐缩小,理论上,将 Spark 原生 shuffle 对本地磁盘的读写转换成网络读写在性能上不一定会造成劣势。最重要的是,将 Shuffle Service 作为一个单独的服务部署,按需伸缩更符合云原生的理念;同时我们也可以有更多的操作空间,比如通过节点之间平衡存储空间提升利用率,通过层级存储在保证性能的同时,降低对高性能磁盘容量的需求等。不过我们应该要明确的是,Spark 项目自诞生以来,shuffle 就作为一项核心特性被不断改进;Remote Shuffle Service 作为相对新潮的技术,在稳定性、正确性、性能上都会有很长的路要走。

7

在具体的Remote Shuffle Service技术选型中,网易选择了基于 Apache Celeborn (Incubating) 构建内部的 Shuffle Service 平台。

其中我们关注的核特性包括:

  • Celeborn 服务端包含 Master 和 Worker 两种角色。其中 Master 起协调作用,是一个 Raft 集群,具备很好的容灾能力,且支持滚动升级;Worker 作为数据节点提供 shuffle 数据的读写服务,可以根据负载随时扩缩容;并且组件之间的心跳、健康检查机制可以快速发现和剔除故障 Worker 节点;
  • Celeborn 提供异步高效的副本机制,开启后对性能影响很小,Client 只需向主 Worker 节点数据写入成功即可返回,主 Worker 节点会异步向备份 Worker 节点复制 shuffle 数据;
  • 可以根据 Worker 的负载,智能地分配 shuffle 分区,使得集群负载更加平衡。这对于异构节点部署 Worker 至关重要,比如某些节点采用 SSD,而某些节点采用 HDD;又比如由于新旧服务器混用,硬件老化而导致的不同 Worker 磁盘性能差异等;
  • 支持层级存储,且对于分布式存储,Client 可以直接从存储系统读取数据,降低对 Worker 的压力。

8

总结一下 Spark on Kubernetes 在网易的演进过程:

早期方案:

  1. 仅支持通过 JDBC、BeeLine 提交 SQL 任务

  2. Kyuubi 集群部署在 K8s 集群外的物理机节点上

  3. Spark 作业以 Client 模式运行

  4. 在每台节点上以 DaemonSet 形式启动 External Shuffle Service

  5. Spark 作业、ESS 等均以 Host Network 模式运行

  6. 每台节点上安装 SSD,并以 hostPath 模式挂载到 Pod 里

改进后的方案:

  1. 支持通过 JDBC、BeeLine、RESTful 提交 SQL/Jar 任务

  2. Kyuubi 以 StatefulSet 的形式部署在 K8s 集群中

  3. Kyuubi 使用 MySQL 存储状态数据

  4. Spark 作业以 Cluster 模式运行

  5. 将 Celeborn 以 StatefulSet 的形式部署在 K8s 集群中,作为 Remote Shuffle Service

  6. 在公有云上,使用竞价实例 Pod 为 Spark 作业提供计算资源

特别地,竞价实例具有极低的成本优势,对降本增效起到了至关重要的作用。

四、网易在其他方面对 Spark on Kubernetes 的改进

如前所述,Spark on Kubernetes 原生并没有像 YARN 一样提供日志聚合服务,这对 Spark 作业分析和故障排查来说是很不友好的。

9

我们通过以下方式,使得 Spark on Kubernetes 能够获得与 Spark on YARN 类似的日志跳转体验:

  1. 使用 Grafana Loki 搭建一个建议的日志存储和查询服务,并配置 Grafana 作为日志展示服务

  2. 使用 log4j-loki-appender,将 Spark Application 日志写入到远程日志服务中

  3. 在 SPARK-40887 中,我们通过改进 Spark,支持以配置的方式在 Spark UI 中添加外部日志服务的跳转链接;其中链接可以是模版,比如可以在跳转链接中使用 POD_NAME 等变量作为查询条件

Pod 分配策略是另一个有趣的话题,比如在以下两个场景中,我们需要使用不同的分配策略。

10

在私有部署场景里,对于一些网络、IO 较重的任务,如果大量的 Executor 调度到同一个节点,很有可能会形成热点,造成硬件上的性能瓶颈。对于这种情况,我们可以使用反亲和性,使得 Executor Pod 在分配时,能够尽量地被打散在所有节点上。

在离线混布场景中,我们更希望使用 bin-packing 的 Pod 分配策略,让 Executor Pod 尽可能地集中在少量的节点上,这样在出让节点时,可以快速腾空机器,降低对 Spark 任务的影响。

11

来自网易以及 Kyuubi 社区的开发者还对 Spark on K8s 做出了很多重要的改进,限于时间和篇幅无法一一详述,各位可以根据 JIRA 工单到社区中搜索相应的 Pull Request。在此我们也非常感谢 Spark 社区的开发者在代码审查等方面所提供的帮助!

现场问答

Q:我们已经在 K8s 上部署了 Kyuubi 用于往 K8s 上提交 Spark 任务,下一步我们打算使用 Kyuubi 也往 YARN 提交 Spark 和 Flink 任务。请问在这种场景中,是推荐为每种负载单独部署一套 Kyuubi 服务,还是使用同一套 Kyuubi 服务呢?

A:首先要明确的一点是,单个 Kyuubi 实例或集群是支持管理多个 Spark 版本、使用多种计算引擎、往不同资源管理系统提交任务的。如前文所述,Kyuubi Server 是一个轻量稳定的服务,在实际场景中,我们也建议尽可能地使用单一的 Kyuubi Server 集群管理多个引擎,以实现 Unified Gateway。我们建议仅在用户有极高 SLA 要求,或者出于安全、合规性的考虑,必须物理隔离的场景中,独立部署 Kyuubi 集群。

Q:分享中提到,Celeborn 支持滚动升级,我实测下来,Celeborn Worker 节点重启后,会造成任务失败,可能是哪里的问题?

A:Celeborn 在设计上是支持滚动重启的。Master 节点是一个 Raft 集群,天然支持滚动升级。在 Celeborn 0.3.0 中,Celeborn 加入了对 Worker 节点的优雅停机特性,用于支持滚动升级。具体来说,当向 Worker 节点发送优雅停机信号时:正在写入的 client 会收在返回信息中感知到 Worker 正在停机的状态,暂停当前分区的写入,并通过 revive 机制请新的 slot 用以写入后续的数据;所有写入请求断开后,Worker 自身会将内存中的数据和状态 flush 到磁盘上,然后退出;正在读取的 client,会自动切换到 replica 节点读取数据;Worker 重启后,从磁盘恢复状态并可以继续提供数据读取服务。综上所述,要支持 Worker 的滚动升级,必须满足:版本 0.3.0 或以上;启用数据副本;启用优雅停机。

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
前端开发 Java API
Apache Seata(incubating) 首个版本重磅发布!
2.1.0 是 Seata 进入 Apache 基金会的第一个 Release Version。此次发布将 io.seata 包名更改为 org.apache.seata。除了按原有的 Roadmap 技术演进外,2.1.0 进行了大量兼容性工作,实现了 API、数据和协议的兼容。用户无需修改原有的 API 和配置,即可实现到 Apache 版本的平滑升级。
105 13
Apache Seata(incubating) 首个版本重磅发布!
|
1月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
37 1
|
4月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
150 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
3月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
66 0
|
3月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
205 0
|
4月前
|
分布式计算 Apache Spark
|
5月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
139 6
|
5月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
|
1月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
608 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
69 3

推荐镜像

更多
下一篇
无影云桌面