Apache Spark是一个大规模数据处理的统一分析引擎,是当今最流行的大数据框架之一。Spark常用于进行分布式、大数据处理,相比MapReduce,Spark提供了更高级的编程接口和更好的性能。同时Spark提供了对流式计算、机器学习的支持。
Kubernetes(k8s)是一个开源的容器集群管理系统。在Docker技术的基础上,为容器化的应用提供部署运行、资源调度、服务发现和动态伸缩等一系列完整功能,提高了大规模容器集群管理的便捷性。
Apache Spark 2.3.0版本开始支持使用kubernetes作为资源管理的原生调度,在kubernetes上运行spark工作负载有以下优势:
- 通过把Spark应用和依赖项打包成容器,可享受容器的各种优点,还可以给容器镜像打上标签进行版本控制。
- 重用Kubernetes生态的各种组件,比如监控、日志等。把Spark工作负载部署在已有的的Kubernetes基础设施中,能够快速开始工作,大大减少运维成本。
- 支持多租户,可利用Kubernetes的Namespace和ResourceQuota做用户粒度的资源调度,利用NodeSelector机制保证Spark工作负载得到专用的资源,实现资源隔离,避免受其他工作负载影响。
- 统一资源层调度与管理,可以实现与其他应用的混部。比如把Spark和数据管理应用运行在同一个集群中,可以使用单个编排机制构建端到端生命周期的解决方案,并能复制到其他区域部署,甚至是在私有化环境部署。
- 存储与计算分离。
Spark on ACK的优化方案
阿里云容器服务Kubernetes版(ACK)是全球首批通过Kubernetes一致性认证的服务平台,致力于提供高性能可伸缩的容器应用管理服务,支持企业级Kubernetes容器化应用的生命周期管理,简化集群的搭建和扩容等运维工作,并整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳的Kubernetes容器化应用运行环境。
为了帮助用户在ACK上更好更快的运行Spark工作负载,阿里云ACK团队在Spark内核、分布式缓存加速、Spark Operator、调度等方面都做了很多性能和易用性的优化工作。
下面介绍一下Spark on ACK的主要优化工作。
Scheduler
传统的Kubernetes调度器最小调度单元为Pod,相比Yarn调度缺少二级调度能力。ACK基于Scheduler Framework V2支持以下调度能力:
- FIFO Scheduling:支持通过设置任务优先级,提供任务级别的排队能力。
- GangScheduling:支持PodGroup的概念,以完整的PodGroup为单位进行调度,当集群资源无法满足完整的Job运行时,则不进行调度,避免资源的无效等待。
- Binpack Scheduling:当集群使用弹性伸缩时,可以使用binpack调度进行资源压缩,减少调度碎片。
- Capacity Scheduling: 通过Namespace将集群划分为不同的租户,租户有自己的资源配额,在集群资源紧张时,不同租户可以根据其申请的配额分配资源,在集群资源空闲时,不同租户之间进行借用和归还,提升了集群的资源使用率,具体场景与Yarn的Capacity Scheduling一致。
关于任务调度的更多细节可以查看进击的 Kubernetes 调度系统和capacity-scheduling。
Spark Operator & Kernel
为了让Spark能更好的运行在Kubernetes中,我们对Spark Operator和Kernel也做了一些优化工作。
- 相比社区版Spark Operator实现中的阻塞串行调度,ACK版本支持非阻塞并行调度,调度性能可达350 Pods/s,能够快速把Spark作业调度到节点上。
- 增强Spark Kernel对Kubernetes原生能力的支持,如Tolerations、Labels、Node Name。
- Spark Kernel支持dynamic allocation,资源利用率可提升30%。
- 支持设置Spark Job使用自定义调度器。
Serverless架构
通过将Spark的Driver和Executor动态调度到ECI上,实现无服务器Spark作业执行,存储和计算分离,可在几十秒内交付大量算力。并能够支持Spot实例、多种实例组合,节省成本。
我们只需要在创建Driver或Executor时通过指定nodeSelector、tolerations就可以把Spark作业运行在ECI上。
executor:
cores: 8
instances: 20
memory: "20480m"
annotations:
k8s.aliyun.com/eci-use-specs: "8-32Gi"
nodeSelector:
type: virtual-kubelet
tolerations:
- key: virtual-kubelet.io/provider
operator: Exists
资源混部、分时复用与突发抢占
在ACK上基于虚拟节点,通过合理的离在线任务混部调度,可以合理的利用计算资源,帮助用户降低成本。
数据本地化
Alluxio是一个面向基于云的数据分析和人工智能的开源的数据编排技术。 它为数据驱动型应用和存储系统构建了桥梁, 将数据从存储层移动到距离数据驱动型应用更近的位置从而能够更容易被访问。 这还使得应用程序能够通过一个公共接口连接到许多存储系统。 Alluxio内存至上的层次化架构使得数据的访问速度能比现有方案快几个数量级。
计算与存储分离的架构,会导致计算端读写数据的延迟增大。我们在ACK上基于Alluxio的分布式缓存来实现Spark的数据本地性,Alluxio会把OSS上的数据缓存在Alluxio worker中,当Spark任务需要数据时可以直接以内存速度从本地Alluxio worker中读取,避免通过网络进行数据传输,这样就能大大提高性能。
LVM数据卷
Spark Shuffle或通过Alluxio做数据本地化时,需要配置数据盘。ECS部分大数据机型,每个节点可能会有多个数据盘设备,这些设备需要先格式化再挂载才能被使用,当集群节点多时,人肉操作是十分繁琐且耗时。ACK支持在本地盘上通过VolumeGroup进行磁盘虚拟化,并通过LVM数据卷切分磁盘给应用使用,这样可以简化操作,并能根据需要对数据盘大小灵活划分。
Spark配套组件
ACK还提供了监控、日志、历史任务记录等能力,方便用户定位排查问题。
- 监控:
-
- 集成prometheus,支持通过grafana查看spark任务状态与资源。
- 支持通过spark-ui查看spark执行进度与状态。
- 日志:支持通过annotation动态采集spark日志到SLS。
- 历史:
-
- 支持一键部署spark history server。
- 支持历史数据离线存储OSS。
在ACK上提交Spark作业
相比传统的spark submit,ACK上提供了spark operator这样更简便的方式来提交作业。用户只需要像在kubernetes上部署普通应用一样,在yaml文件中指定spark镜像地址、任务main函数代码文件以及spark driver、executor的规格数量即可。
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: "registry.cn-hangzhou.aliyuncs.com/acs/spark:ack-2.4.5-f757ab6"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
sparkVersion: "2.4.5"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 2.4.5
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.5
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
性能评测
为了测试Spark在ACK上的性能,我们采用TPC-DS做了评测。TPC-DS测试基准是TPC组织推出的一个决策支持系统测试基准,TPC-DS采用星型、雪花型等多维数据模式。它包含7张事实表,17张纬度表平均每张表含有18列。其工作负载包含99个SQL查询,覆盖SQL99和2003的核心部分以及OLAP。这个测试集包含对大数据集的统计、报表生成、联机查询、数据挖掘等复杂应用,测试用的数据和值是有倾斜的,与真实数据一致。可以说TPC-DS是与真实场景非常接近的一个测试集,也是难度较大的一个测试集。
我们分别测试在1TB数据时,Spark + OSS和Spark + Alluxio在执行99个Spark SQL时的性能,从下面的数据里可以直观的看到其性能表现。
测试环境
硬件配置
- ACK集群说明
集群配置 | 参数 |
---|---|
集群类型 | ACK标准专有集群 |
集群版本 | 1.16.9-aliyun.1 |
ECS实例 | ECS规格:ecs.d1ne.6xlarge操作系统:CentOS 7.7 64位CPU: 24核内存:96G数据盘:5500G HDD x 12 |
Worker Node个数 | 20 |
软件配置
软件版本:
spark version: 2.4.5
alluxio version: 2.4.0
Spark配置说明:
spark配置 | 参数 |
---|---|
spark.driver.cores | 5 |
spark.driver.memory (MB) | 20480 |
spark.executor.cores | 7 |
spark.executor.memory (MB) | 20480 |
spark.executor.instances | 20 |
测试数据
基于TPC-DS生成的1TB测试数据。
测试结果
以下所有图中纵轴表示query执行时间,单位秒。
首次测试时Alluxio需要从OSS加载数据,性能较低,可以等首次测试结束再跑一次测试任务,查看测试结果,本文中的数据为Alluxio Warm下的测试结果。
其中99个Spark SQL总耗时
从图上的对比看到,经过数据本地化后,在ACK上运行Spark工作负载时,整体性能有了较大的提升。
测试分析
为了进一步分析结果,我们把TPC-DS的query分为三大类,可以看到这三类常见的query类型,性能都有提升。
- Reporting Queries
- Interactive queries
- Deep analytics queries
对于一些有代表性的Shuffle、CPU、IO密集型的query,如q64、q70、q82,性能也都有较大提升。
总结
通过上面的介绍可以看到,针对Spark这种工作场景,ACK做了调度、弹性、性能、易用性等诸多优化,以及配套的监控、日志等工具,方便用户排查定位问题。要在ACK上运行Spark工作负载是很简单的,用户只需要提交一个YAML文件即可。如果要重现上面的测试结果或者跑其他的Spark工作负载,可以参考ack-spark-benchmark ,里面包含详细的测试环境搭建、测试步骤和源码。
本文介绍的主要是开源的Apache Spark和分布式缓存框架Alluxio在ACK上的优化和实践,阿里云EMR(E-MapReduce)团队和ACK团队一起,针对Spark内核、Shuffle机制、分布式缓存等在ACK环境上做了更多优化,可以为用户提供更好的性能,详细介绍可查看EMR Spark on ACK。
欢迎加入钉钉群沟通交流。