Spark on Kubernetes原生支持浅析

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 概述 Kubernetes自推出以来,以其完善的集群配额、均衡、故障恢复能力,成为开源容器管理平台中的佼佼者。从设计思路上,Spark以开放Cluster Manager为理念,Kubernetes则以多语言、容器调度为卖点,二者的结合是顺理成章的。

作者简介:无咎,阿里云EMR技术专家。目前专注于大数据开发平台建设。

概述

Kubernetes自推出以来,以其完善的集群配额、均衡、故障恢复能力,成为开源容器管理平台中的佼佼者。从设计思路上,Spark以开放Cluster Manager为理念,Kubernetes则以多语言、容器调度为卖点,二者的结合是顺理成章的。

使用Kubernetes调度Spark的好处:

  • 集中式资源调度:接入k8s的Spark应用与其他k8s应用共享资源池。
  • 多租户:可利用Kubernetes的namespace和ResourceQuota做用户粒度的资源调度。
  • 容器生态:以监控为例,开发者可利用Prometheus检测Spark应用的性能。

Kubernetes社区早期尝试将Spark以standalone模式运行在Kubernetes上。SPARK-18278则提出了一个子项目apache-spark-on-k8s,旨在支持Spark driver/executor的Pod化。该项目于2018年正式合并到主版本,在Spark 2.3发布。

k8scluster

(图片来自databricks.com)

Spark 2.4的Kubernetes支持包含以下特性:

  • 支持pyspark应用
  • 支持R语言应用
  • 支持client mode:允许用户运行spark-shell或Notebook,可以是集群之外的单独机器,或者是k8s集群的pod。client mode要求用户保证driver与k8s集群内executor之间的连通性。如果driver运行在ks8集群的pod内,推荐使用headless service以允许executor通过FQDN连接到driver;如果driver运行在k8s集群之外,用户需要确保集群内的executor Pod可访问到driver。

kubernetes operator则是k8s上运行Spark另一种的途径,用户可以通过Helm chart安装。运行时转换为CRD对象执行。

以下我们以spark-2.4.3-bin-hadoop2.7为例,使用minikube实验Spark on Kubernetes。(minikube是Kubernetes官方工具,可运行单节点k8s服务,方便在本机上测试。)

在minikube中测试Spark on k8s

最低要求:

  • Spark 2.3+版本,client mode需要2.4+。
  • kubernetes 1.6以上版本,可调用kubectl。
  • Kubernetes启动DNS。

cluster mode

1. 启动minikube

运行Spark应用需要足够的系统资源,执行以下命令并重新启动minikube

minikube config set memory 4096
minikube config set cpus 4

或者显式指定资源启动minikube

minikube start --cpus 4 --memory 4096

2. 构建Spark镜像

为简便起见,直接在本地k8s环境中构建镜像,以便k8s运行时直接取用。注意:构建时必须位于Spark安装根目录下。

./bin/docker-image-tool.sh -r <repo> -t my-tag build

或者

eval $(minikube docker-env)
docker build -t spark:2.4.3 -f ./kubernetes/dockerfiles/spark/Dockerfile  .

也可以将构建镜像推送到k8s集群可达的镜像仓库中。

3. 执行examples

先为Spark应用配置必要的serviceAccount。

kubectl create serviceaccount spark
kubectl create clusterrolebinding spark-role --clusterrole=admin --serviceaccount=default:spark --namespace=default

master地址可以通过kubectl获取

kubectl cluster-info

本机执行spark-submit

./bin/spark-submit \
--master k8s://https://192.168.99.103:8443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=spark:2.4.3 \
local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar

启动控制台 minikube dashboard,在可以在界面上看到对应的pods

_1

我们可以通过kubectl查看其运行日志。程序运行结束之后,driver Pod仍然保留。

kubectl -n=default logs -f spark-pi-4e673e6fc64432bb8bda3f5632ce9596-driver

client mode

以下演示在k8s集群的独立Pod中启动Spark应用和spark-shell的方法。

1. 准备独立Pod

用任意linux镜像运行。

kubectl run transfer1 -it -n default --image=centos:centos7 --serviceaccount='spark' -- /bin/sh

如果可以进入命令行,说明Pod已经启动,查看一下Pod列表。

kubectl get pods
NAME                                               READY   STATUS             RESTARTS   AGE
transfer1-584c678c8c-fh8s6                         1/1     Running            0          12m

进入transfer1 Pod,安装OpenJDK和Spark。

2. 暴露service

我们任意指定一个端口暴露,后续client mode将通过去DNS去查找Driver Pod的位置,这也是Spark on k8s要求DNS的原因。

kubectl expose deployment transfer1 --port=19987 --type=ClusterIP --cluster-ip=None
kubectl get services
NAME                                                   TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE
kubernetes                                             ClusterIP   10.96.0.1    <none>        443/TCP             41h
transfer1                                              ClusterIP   None         <none>        19987/TCP           13m

3. 在Pod中启动Spark应用

./bin/spark-submit \
--master k8s://https://192.168.99.103:8443 \
--deploy-mode client \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=spark:2.4.3 \
--conf spark.driver.host=transfer1.default.svc.cluster.local  \
--conf spark.driver.port=19987  \
/root/spark-2.4.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.3.jar

注意运行入口指向是本地的Pod的jar包。

启动spark-shell:

./bin/spark-shell  \
--master k8s://https://192.168.99.103:8443 \
--deploy-mode client  \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark  \
--conf spark.kubernetes.namespace=default  \
--conf spark.driver.pod.name=transfer1driverpod  \
--conf spark.executor.instances=2  \
--conf spark.kubernetes.container.image=spark:2.4.3 \
--conf spark.driver.host=transfer1.default.svc.cluster.local \
--conf spark.driver.port=19987

稍等片刻spark-shell即启动完成。

Pod创建原理

镜像

先了解Spark镜像做了什么。打开Spark发行目录下的kubernetes/dockerfiles/spark/Dockerfile,可以发现,Dockerfile只做了jars、bin、sbin等目录的文件拷贝,指向/opt/entrypoint.sh作为镜像入口。

entrypoint.sh支持传入"driver"或者"executor"参数(对于python和R支持,则是dirver-py和dirver-r),这样,默认容器即支持创建driver或者executor容器。无需用户显式提供spark.kubernetes.driver.container.image参数。

组件

  • Driver: 以headless service存在。
  • Executor: 数量可以由spark-submit参数指定,也支持动态资源配置。
  • k8s API Server: Spark通过API Server创建与删除Pod。

spark-submit之后

再看spark-submit,SparkSubmit类会匹配master参数,如果以"k8s"开头,则会装载对应的submit类,对于Spark 2.4,这个submit类是org.apache.spark.deploy.k8s.submit.KubernetesClientApplication

KubernetesClientApplication中创建driver Pod:

// resolvedDriverPod是基于KubernetesDriverBuilder,读取conf创建的Pod定义
kubernetesClient
        .pods()
        .withName(resolvedDriverPod.getMetadata.getName)
        .watch(watcher)) { _ =>
      val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
      try {
        val otherKubernetesResources =
          resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
        addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
        kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
      } catch {
           // 如果创建失败则移除driver Pod
        case NonFatal(e) =>
          kubernetesClient.pods().delete(createdDriverPod)
          throw e
      }

另一方面,与Spark on YARN的初始化过程类似,SparkContext装载ExternalClusterManager的子类KubernetesClusterManager,并初始化ExecutorPodsAllocator和k8s的DefaultKubernetesClient,初始化KubernetesClusterSchedulerBackend

KubernetesClusterSchedulerBackend继承了CoarseGrainedSchedulerBackend, 请求创建executor时,其重载的doRequestTotalExecutors方法,使用PodAllocator中的内部线程创建executor Pod,后者会自动增减executor数量;killExecutors时调用doKillExecutors来销毁Pods。

override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
    podAllocator.setTotalExpectedExecutors(requestedTotal)
    true
  }

...  

override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
    kubernetesClient
      .pods()
      .withLabel(SPARK_APP_ID_LABEL, applicationId())
      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
      .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
      .delete()
    // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
  }

总结

Kubernetes为Spark开发者提供了新的调度手段,Spark 2.4支持cluster/client mode运行,client mode可以运行在单独Pod或者k8s集群之外。

Spark on Kubernetes项目正在快速发展之中,目前支持的功能仍然是实验性质的,未来其内部实现可能会发生变化,对于Spark 3.0版本中的k8s新特性,我们不妨拭目以待。

[扩展阅读]

  1. Spark on k8s早期工作:https://issues.apache.org/jira/browse/SPARK-18278
  2. Lyft在SparkAI Summit上分享k8s实践: https://www.youtube.com/watch?v=PPtrY_XxYBE
  3. 关于Spark Operator的设计,参考:https://yq.aliyun.com/articles/695315,或者阅读官方文档:https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
5月前
|
存储 边缘计算 Kubernetes
边缘计算问题之YurtControllerManager 接管原生 Kubernetes 的调度如何解决
边缘计算问题之YurtControllerManager 接管原生 Kubernetes 的调度如何解决
44 1
|
4月前
|
Kubernetes Cloud Native Java
探索未来编程新纪元:Quarkus带你秒建高性能Kubernetes原生Java应用,云原生时代的技术狂欢!
Quarkus 是专为 Kubernetes 设计的全栈云原生 Java 框架,凭借其轻量级、快速启动及高效执行特性,在 Java 社区脱颖而出。通过编译时优化与原生镜像支持,Quarkus 提升了应用性能,同时保持了 Java 的熟悉度与灵活性。本文将指导你从创建项目、编写 REST 控制器到构建与部署 Kubernetes 原生镜像的全过程,让你快速上手 Quarkus,体验高效开发与部署的乐趣。
61 0
|
5月前
|
Kubernetes 网络协议 Docker
在K8S中,ip-cer-pod与docker原生端口映射有何区别?
在K8S中,ip-cer-pod与docker原生端口映射有何区别?
|
7月前
|
分布式计算 Kubernetes Spark
大数据之spark on k8s
大数据之spark on k8s
225 2
|
8月前
|
分布式计算 Kubernetes 监控
容器服务Kubernetes版产品使用合集之怎么实现把 spark 跑在k8s
容器服务Kubernetes版,作为阿里云提供的核心服务之一,旨在帮助企业及开发者高效管理和运行Kubernetes集群,实现应用的容器化与微服务化。以下是关于使用这些服务的一些建议和合集,涵盖基本操作、最佳实践、以及一些高级功能的使用方法。
101 1
|
7月前
|
Kubernetes Cloud Native Java
Java一分钟之-Quarkus:Kubernetes原生的Java框架
【6月更文挑战第12天】Quarkus是面向Kubernetes的Java框架,以其超快启动速度和低内存占用著称。核心特性包括AOT编译实现毫秒级启动、优化的运行时模型、与Kubernetes的无缝集成及丰富的扩展库。常见问题涉及Maven依赖管理、热重载机制理解和配置文件的忽视。解决这些问题的关键在于深入学习官方文档、使用Dev UI调试和参与社区交流。通过代码示例展示了如何快速创建REST服务。掌握Quarkus能提升开发效率,适应微服务架构。
92 0
|
8月前
|
分布式计算 Kubernetes Java
spark on k8s native
spark on k8s native
|
分布式计算 Kubernetes Serverless
Hago 的 Spark on ACK 实践
Hago 的 Spark on ACK 实践
|
Kubernetes Cloud Native Java
猫头虎博主赠书一期:《Kubernetes原生微服务开发》
猫头虎博主赠书一期:《Kubernetes原生微服务开发》
95 0
|
分布式计算 资源调度 Hadoop
Spark on Yarn集群模式搭建及测试
Spark on Yarn集群模式搭建及测试
356 0