作者简介:无咎,阿里云EMR技术专家。目前专注于大数据开发平台建设。
概述
Kubernetes自推出以来,以其完善的集群配额、均衡、故障恢复能力,成为开源容器管理平台中的佼佼者。从设计思路上,Spark以开放Cluster Manager为理念,Kubernetes则以多语言、容器调度为卖点,二者的结合是顺理成章的。
使用Kubernetes调度Spark的好处:
- 集中式资源调度:接入k8s的Spark应用与其他k8s应用共享资源池。
- 多租户:可利用Kubernetes的namespace和ResourceQuota做用户粒度的资源调度。
- 容器生态:以监控为例,开发者可利用Prometheus检测Spark应用的性能。
Kubernetes社区早期尝试将Spark以standalone模式运行在Kubernetes上。SPARK-18278则提出了一个子项目apache-spark-on-k8s,旨在支持Spark driver/executor的Pod化。该项目于2018年正式合并到主版本,在Spark 2.3发布。
(图片来自databricks.com)
Spark 2.4的Kubernetes支持包含以下特性:
- 支持pyspark应用
- 支持R语言应用
- 支持client mode:允许用户运行spark-shell或Notebook,可以是集群之外的单独机器,或者是k8s集群的pod。client mode要求用户保证driver与k8s集群内executor之间的连通性。如果driver运行在ks8集群的pod内,推荐使用headless service以允许executor通过FQDN连接到driver;如果driver运行在k8s集群之外,用户需要确保集群内的executor Pod可访问到driver。
kubernetes operator则是k8s上运行Spark另一种的途径,用户可以通过Helm chart安装。运行时转换为CRD对象执行。
以下我们以spark-2.4.3-bin-hadoop2.7为例,使用minikube实验Spark on Kubernetes。(minikube是Kubernetes官方工具,可运行单节点k8s服务,方便在本机上测试。)
在minikube中测试Spark on k8s
最低要求:
- Spark 2.3+版本,client mode需要2.4+。
- kubernetes 1.6以上版本,可调用kubectl。
- Kubernetes启动DNS。
cluster mode
1. 启动minikube
运行Spark应用需要足够的系统资源,执行以下命令并重新启动minikube
minikube config set memory 4096
minikube config set cpus 4
或者显式指定资源启动minikube
minikube start --cpus 4 --memory 4096
2. 构建Spark镜像
为简便起见,直接在本地k8s环境中构建镜像,以便k8s运行时直接取用。注意:构建时必须位于Spark安装根目录下。
./bin/docker-image-tool.sh -r <repo> -t my-tag build
或者
eval $(minikube docker-env)
docker build -t spark:2.4.3 -f ./kubernetes/dockerfiles/spark/Dockerfile .
也可以将构建镜像推送到k8s集群可达的镜像仓库中。
3. 执行examples
先为Spark应用配置必要的serviceAccount。
kubectl create serviceaccount spark
kubectl create clusterrolebinding spark-role --clusterrole=admin --serviceaccount=default:spark --namespace=default
master地址可以通过kubectl获取
kubectl cluster-info
本机执行spark-submit
./bin/spark-submit \
--master k8s://https://192.168.99.103:8443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=spark:2.4.3 \
local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar
启动控制台 minikube dashboard
,在可以在界面上看到对应的pods
我们可以通过kubectl查看其运行日志。程序运行结束之后,driver Pod仍然保留。
kubectl -n=default logs -f spark-pi-4e673e6fc64432bb8bda3f5632ce9596-driver
client mode
以下演示在k8s集群的独立Pod中启动Spark应用和spark-shell的方法。
1. 准备独立Pod
用任意linux镜像运行。
kubectl run transfer1 -it -n default --image=centos:centos7 --serviceaccount='spark' -- /bin/sh
如果可以进入命令行,说明Pod已经启动,查看一下Pod列表。
kubectl get pods
NAME READY STATUS RESTARTS AGE
transfer1-584c678c8c-fh8s6 1/1 Running 0 12m
进入transfer1 Pod,安装OpenJDK和Spark。
2. 暴露service
我们任意指定一个端口暴露,后续client mode将通过去DNS去查找Driver Pod的位置,这也是Spark on k8s要求DNS的原因。
kubectl expose deployment transfer1 --port=19987 --type=ClusterIP --cluster-ip=None
kubectl get services
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 41h
transfer1 ClusterIP None <none> 19987/TCP 13m
3. 在Pod中启动Spark应用
./bin/spark-submit \
--master k8s://https://192.168.99.103:8443 \
--deploy-mode client \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=spark:2.4.3 \
--conf spark.driver.host=transfer1.default.svc.cluster.local \
--conf spark.driver.port=19987 \
/root/spark-2.4.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.3.jar
注意运行入口指向是本地的Pod的jar包。
启动spark-shell:
./bin/spark-shell \
--master k8s://https://192.168.99.103:8443 \
--deploy-mode client \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=default \
--conf spark.driver.pod.name=transfer1driverpod \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=spark:2.4.3 \
--conf spark.driver.host=transfer1.default.svc.cluster.local \
--conf spark.driver.port=19987
稍等片刻spark-shell即启动完成。
Pod创建原理
镜像
先了解Spark镜像做了什么。打开Spark发行目录下的kubernetes/dockerfiles/spark/Dockerfile,可以发现,Dockerfile只做了jars、bin、sbin等目录的文件拷贝,指向/opt/entrypoint.sh
作为镜像入口。
entrypoint.sh支持传入"driver"或者"executor"参数(对于python和R支持,则是dirver-py和dirver-r),这样,默认容器即支持创建driver或者executor容器。无需用户显式提供spark.kubernetes.driver.container.image
参数。
组件
- Driver: 以headless service存在。
- Executor: 数量可以由spark-submit参数指定,也支持动态资源配置。
- k8s API Server: Spark通过API Server创建与删除Pod。
spark-submit之后
再看spark-submit
,SparkSubmit类会匹配master参数,如果以"k8s"开头,则会装载对应的submit类,对于Spark 2.4,这个submit类是org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
。
KubernetesClientApplication
中创建driver Pod:
// resolvedDriverPod是基于KubernetesDriverBuilder,读取conf创建的Pod定义
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(watcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val otherKubernetesResources =
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
// 如果创建失败则移除driver Pod
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}
另一方面,与Spark on YARN的初始化过程类似,SparkContext装载ExternalClusterManager的子类KubernetesClusterManager
,并初始化ExecutorPodsAllocator
和k8s的DefaultKubernetesClient,初始化KubernetesClusterSchedulerBackend
。
KubernetesClusterSchedulerBackend
继承了CoarseGrainedSchedulerBackend, 请求创建executor时,其重载的doRequestTotalExecutors方法,使用PodAllocator中的内部线程创建executor Pod,后者会自动增减executor数量;killExecutors时调用doKillExecutors来销毁Pods。
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
podAllocator.setTotalExpectedExecutors(requestedTotal)
true
}
...
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
.delete()
// Don't do anything else - let event handling from the Kubernetes API do the Spark changes
}
总结
Kubernetes为Spark开发者提供了新的调度手段,Spark 2.4支持cluster/client mode运行,client mode可以运行在单独Pod或者k8s集群之外。
Spark on Kubernetes项目正在快速发展之中,目前支持的功能仍然是实验性质的,未来其内部实现可能会发生变化,对于Spark 3.0版本中的k8s新特性,我们不妨拭目以待。
[扩展阅读]
- Spark on k8s早期工作:https://issues.apache.org/jira/browse/SPARK-18278
- Lyft在SparkAI Summit上分享k8s实践: https://www.youtube.com/watch?v=PPtrY_XxYBE
- 关于Spark Operator的设计,参考:https://yq.aliyun.com/articles/695315,或者阅读官方文档:https://github.com/GoogleCloudPlatform/spark-on-k8s-operator