Spark 运行模式
Apache Spark 是用于大规模数据处理的统一分析引擎,它提供了 Java、Scala、Python 和 R 语言的高级 API,以及一个支持通用的执行图计算的优化引擎。
Spark Core 是 Spark 的核心模块,负责任务调度、内存管理等功能。Spark Core 的实现依赖于 RDD(Resilient Distributed Datasets,弹性分布式数据集)的程序抽象概念。
在 Spark Core 的基础上,Spark 提供了一系列面向不同应用需求的组件,包括使用 SQL 进行结构化数据处理的 Spark SQL、用于实时流处理的 Spark Streaming、用于机器学习的 MLlib 以及用于图处理的 GraphX。
Spark 本身并没有提供分布式文件系统,因而 Spark 的数据存储主要依赖于 HDFS,也可以使用 HBase 和 S3 等作为存储层。
Spark 有多种运行模式:
- 1.可以运行在一台机器上,称为 Local(本地)运行模式。
- 2.可以使用 Spark 自带的资源调度系统,称为 Standalone 模式。
- 3.可以使用 Yarn、Mesos、Kubernetes 作为底层资源调度系统,称为 Spark On Yarn、Spark On Mesos、Spark On K8S。Driver 是 Spark 中的主控进程,负责执行应用程序的 main() 方法,创建 SparkContext 对象,负责与 Spark 集群进行交互,提交 Spark 作业,并将作业转化为 Task(一个作业由多个 Task 任务组成),然后在各个 Executor 进程间对 Task 进行调度和监控。
根据应用程序提交方式的不同,Driver 在集群中的位置也有所不同,应用程序提交方式主要有两种:Client 和 Cluster,默认是 Client,可以在向 Spark 集群提交应用程序时使用 --deploy-mode
参数指定提交方式。
Local 模式
Local 模式的部署方式比较简单,只需下载安装包并解压就可以使用了。具体可以参考上一章的 Spark 系列教程(1)Word Count 的介绍,本文就不再赘述了。
在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。
❯ bin/spark-shell 21/10/07 11:50:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://chengzw:4040 Spark context available as 'sc' (master = local[*], app id = local-1633578611004). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.1.2 /_/ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_302) Type in expressions to have them evaluated. Type :help for more information. scala> val range = spark.range(100) range: org.apache.spark.sql.Dataset[Long] = [id: bigint] scala> range.collect() res0: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
Standalone 模式
在 Spark Standalone 模式中,资源调度是由 Spark 自己实现的。 Spark Standalone 模式是 Master-Slaves 架构的集群模式,和大部分的 Master-Slaves 结构的集群一样,存在着 Master 单点故障的问题。对于单点故障的问题,Spark 提供了两种方案:
- 基于文件系统的单点恢复(Single-Node Recovery with Local File System),将 Application 和 Worker 的注册信息写入文件中,当 Master 宕机时,可以重新启动 Master 进程恢复工作。该方式只适用于开发或测试环境。
- 基于 Zookeeper 的 Standby Masters(Standby Masters with ZooKeeper)。ZooKeeper 提供了一个 Leader Election 机制,利用这个机制可以保证虽然集群存在多个 Master,但是只有一个是 Active 的,其他的都是 Standby。当 Active 的 Master 出现故障时,另外的一个 Standby Master 会被选举出来,对于恢复期间正在运行的应用程序,由于 Application 在运行前已经向 Master 申请了资源,运行时 Driver 负责与 Executor 进行通信,管理整个 Application,因此 Master 的故障对 Application 的运行不会造成影响,但是会影响新的 Application 的提交。接下来将介绍 Spark Standalone 模式基于 Zookeeper 的 HA 高可用部署。
前提条件
Host 设置
编辑 /etc/hosts 文件:
192.168.1.117 hadoop1 192.168.1.118 hadoop2 192.168.1.119 hadoop3
拷贝到其他两台机器上:
scp /etc/hosts root@hadoop2:/etc/hosts scp /etc/hosts root@hadoop3:/etc/hosts
配置免密登录
为了方便后续拷贝文件以及执行脚本,配置 SSH 免密登录。在 hadoop1 上生成 RSA 非对称密钥对:
[root@hadoop1 hadoop]# ssh-keygen Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: SHA256:wkMiPVpbBtjoZwBIpyvvluYtfQM9hQeHtgBFVfrwL1I root@hadoop1 The key's randomart image is: +---[RSA 2048]----+ |+o.O+..o. | |. *.o.+.. | | o..=o*= | | o+oOo+o | |...o..+oE | |.. . o+ . | | .o .... . | | .=.. o. . | | +o... . | +----[SHA256]-----+
将公钥拷贝到集群中的其他机器:
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop1 [root@hadoop1 hadoop]# ssh-copy-id root@hadoop2 [root@hadoop1 hadoop]# ssh-copy-id root@hadoop3
安装 Java
进入 [Oracle 官网] (https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html) 下载并解压 JDK 安装包。设置环境变量,编辑 vim /etc/profile:
export JAVA_HOME=/software/jdk export PATH=$PATH:$JAVA_HOME/bin
Zookeeper 集群部署
下载并解压安装包
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz tar -xzvf apache-zookeeper-3.5.8-bin.tar.gz mv apache-zookeeper-3.5.8 /software/zk
编辑配置文件
编辑 zk/conf/zoo.cfg 文件:
#用于配置 Zookeeper 中最小时间单位的长度,单位是毫秒 tickTime=2000 #该参数用于配置 Leader 服务器等待 Follower 启动,并完成数据同步的时间 #乘上 tickTime 得到具体时间:10 * 2000 = 20000 毫秒 initLimit=10 #Leader 与 Follower 心跳检测的超时时间。 #乘上 tickTime 得到具体时间:5 * 2000 = 10000 毫秒 syncLimit=5 #数据存放目录 dataDir=/software/zk/data #客户端连接端口 clientPort=2181 #Zookeeper集群成员地址 #2888端口用于集群间通信,leader会监听此端口 #3888端口用于leader选举 server.1=hadoop1:2888:3888 server.2=hadoop2:2888:3888 server.3=hadoop3:2888:3888
同步修改后的配置文件到集群的其他节点:
scp -r zk root@hadoop2:/software/ scp -r zk root@hadoop3:/software/
标识 Server ID
#在 hadoop1 节点上执行 echo 1 > /root/zookeeper-cluster/zk1/myid #在 hadoop2 节点上执行 echo 2 > /root/zookeeper-cluster/zk2/myid #在 hadoop3 节点上执行 echo 3 > /root/zookeeper-cluster/zk3/myid
启动 Zookeeper 集群
分别在 3 台节点上执行以下命令启动 Zookeeper:
zk/bin/zkServer.sh start
查看 Zookeeper 集群状态
分别在 3 台节点上查看 Zookeeper 状态,可以看到此时 hadoop2 节点为 Zookeeper 的 Master 节点。
hadoop1 节点:
[root@hadoop1 software]# zk/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /software/zk/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower
hadoop2 节点:
[root@hadoop2 software]# zk/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /software/zk/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leader
hadoop3 节点:
[root@hadoop3 software]# zk/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /software/zk/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower
Spark Standalone 模式 HA 集群部署
下载并解压安装包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz tar -xzvf spark-3.1.2-bin-hadoop2.7.tgz mv spark-3.1.2-bin-hadoop2.7 /software/spark
修改配置文件
编辑 spark/conf/spark-env.sh 文件,由于 Spark HA 使用 Zookeeper 来协调主从,因此需要指定 Zookeeper 的地址和 Spark 在 Zookeeper 中使用的目录。
export JAVA_HOME=/software/jdk export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop1:2181,hadoop2:2181,hadoop3:2181 -Dspark.deploy.zookeeper.dir=/spark"
编辑 spark/conf/slaves 文件:
hadoop1 hadoop2 hadoop3
同步修改后的配置文件到集群的其他节点:
scp -r spark root@hadoop2:/software/ scp -r spark root@hadoop3:/software/
启动 Spark 集群
在 hadoop1 节点上启动 Spark 集群,执行 start-all.sh 脚本会在 hadoop1 节点上启动 Master 进程,并且在 spark/conf/slaves 文件中配置的所有节点上启动 Worker 进程。
[root@hadoop1 software]# spark/sbin/start-all.sh starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out hadoop2: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop2.out hadoop1: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop1.out hadoop3: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop3.out
登录 hadoop2 节点,启动第二个 Master(Standby Master)。
[root@hadoop2 software]# spark/sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop2.out
查看各节点的进程
在各节点执行 jps 命令查看启动的 Java 进程。可以看到 Spark 的 Master 进程分别在 hadoop1 和 hadoop2 节点上运行,Worker 进程在所有节点上运行。QuorumPeerMain 是 Zookeeper 的进程。
hadoop1 节点:
[root@hadoop1 software]# jps 18528 Worker 18427 Master 23468 QuorumPeerMain 18940 Jps
hadoop2 节点:
[root@hadoop2 software]# jps 27824 Worker 29954 Jps 23751 QuorumPeerMain 28135 Master
hadoop3 节点:
[root@hadoop3 software]# jps 11696 Worker 12939 QuorumPeerMain 13021 Jps
Zookeeper 查看节点注册状态
可以看到此时 3 个 Spark 节点都注册到 Zookeeper 上了,并且此时 192.168.1.117 hadoop1 这个节点是 Master。
[zk: localhost:2181(CONNECTED) 33] ls /spark/master_status [worker_worker-20210821150002-192.168.1.117-42360, worker_worker-20210821150002-192.168.1.118-39584, worker_worker-20210821150002-192.168.1.119-42991] [zk: localhost:2181(CONNECTED) 34] get /spark/master_status 192.168.1.117
Spark HA 测试
浏览器访问 http://hadoop1:8081 进入 Spark WebUI 界面,此时 hadoop1 节点 Master 的状态为 ALIVE。
[root@hadoop1 software]# spark/bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 21/08/21 18:00:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop1:4040 Spark context available as 'sc' (master = spark://hadoop1:7077,hadoop2:7077, app id = app-20210821180100-0000). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.1.2 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information.
使用第三方资源调度系统
Spark 可以使用 Yarn、Mesos、Kubernetes 作为底层资源调度系统,目前 Mesos 使用的已经比较少了,本文将介绍 Spark 使用 Yarn 和 Kubernetes 作为调度系统的应用。
Spark On Yarn
Spark On Yarn 模式的搭建比较简单,仅需要在 Yarn 集群的一个节点上安装 Spark 客户端即可,该节点可以作为提交 Spark 应用程序到 Yarn 集群的客户端。Spark 本身的 Master 节点和 Worker 节点不需要启动。前提是我们需要准备好 Yarn 集群,关于 Yarn 集群的安装可以参考 Hadoop 分布式集群安装。
使用此模式需要修改 Spark 的配置文件 conf/spark-env.sh,添加 Hadoop 相关属性,指定 Hadoop 配置文件所在的目录:
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
修改完毕后,即可运行 Spark 应用程序,例如运行 Spark 自带的求圆周率的例子,并以 Spark On Yarn 的 Cluster 模式运行。
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ /software/spark/examples/jars/spark-examples_2.12-3.1.2.jar
在 Yarn 的 ResourceManager 对应的 WebUI 界面中可以查看应用程序执行的详细信息。
Spark On K8S
目前基于 Kubernetes 的 Spark 的应用主要采用两种方式运行:
- 1.基于 Kubernetes 的 Operator 的 [Spark on K8S Operator] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator),是 Google Cloud Platform 为了支持 Spark 而开发的一种 Operator。个人比较推荐使用 Spark on K8S Operator 的方式提交作业。
- 2.Spark 原生支持的 [Spark On K8S] (http://spark.apache.org/docs/3.0.0/running-on-kubernetes.html),是 Spark 社区为支持 Kubernetes 这种资源管理框架而引入的 Kubernetes Client 的实现。
Spark Operator 定义了两个 CRD(Custom Resource Definitions,自定义资源定义)对象,SparkApplication 和 ScheduledSparkApplication。 这些 CRD 是 Spark 作业的抽象,使得在 Kubernetes 集群中可以使用 YAML 来定义这些作业。另外还提供了 [sparkctl] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/sparkctl/README.md) 命令行工具方便我们操控 SparkApplication 和 ScheduledSparkApplication CRD 资源对象。
使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator 容器,用于将 SparkApplication 和 ScheduledSparkApplication 这些 CRD 资源对象转换为 Kubernetes 原生的资源对象,例如 Pod,Service 等等。
Spark On K8S Operator(推荐)
使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator。
部署 Spark Operator
添加 Spark On K8S Operator Helm 仓库并下载 Helm 资源文件。
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator helm pull spark-operator/spark-operator --untar
修改 values.yaml 文件中有以下两个地方需要修改:
- 1.repository 镜像仓库地址,由于国内拉取 Spark 相关镜像速度较慢,我已经提前下载好镜像并且上传至阿里云镜像仓库中了,大家可以直接使用我的镜像。
- 2.sparkJobNamespace:设置 Spark 提交作业的命名空间,会为该命名空间创建一个 ServiceAccount 并赋予相应的权限,ServiceAccount 的名字为
helm 项目名-spark
。 - 使用
helm install
命令安装 Spark Operator,spark-job 命名空间是之后提交 Spark 作业时使用的。
kubectl create namespace spark-job helm install my-spark spark-operator \ --namespace spark-operator --create-namespace
确认 Spark Operator Pod 已经正常运行。
❯ kubectl get pod -n spark-operator NAME READY STATUS RESTARTS AGE my-spark-spark-operator-674cbc9d9c-8x22x 1/1 Running 0 5m24s
查看在 spark-job 命名空间创建的 ServiceAccount。
❯ kubectl get serviceaccounts -n spark-job NAME SECRETS AGE ..... my-spark-spark 1 2m33s
运行 SparkApplications
SparkApplications 资源对象中通常使用的 Cluster 模式来提交作业。在 YAML 文件中指定运行应由程序的 jar 包以及 main() 方法所在的类。
apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-pi namespace: spark-job spec: type: Scala mode: cluster image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1" imagePullPolicy: Always mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar" sparkVersion: "3.1.1" restartPolicy: type: Never driver: cores: 1 coreLimit: "1200m" memory: "512m" labels: version: 3.1.1 serviceAccount: my-spark-spark executor: cores: 1 instances: 1 memory: "512m" labels: version: 3.1.1
等待一会,查看 SparkApplications 状态,COMPLETED 表示已经执行完成该作业。
❯ kubectl get sparkapplications -n spark-job spark-piNAME STATUS ATTEMPTS START FINISH AGEspark-pi COMPLETED 1 2021-10-04T13:13:27Z 2021-10-04T13:13:48Z 8h
查看在 spark-job 命名空间创建的 Pod 的日志,可以看到本次作业执行的详情。
kubectl logs -n spark-job spark-pi-driver spark-kubernetes-driver
Spark On K8S
使用 Spark On K8S 模式提交作业时我们通常可以使用 spark-submit 或者 spark-shell 两种命令行工具,其中 spark-submit 支持 Cluster 和 Client 两种提交方式,而 spark-shell 只支持 Client 一种提交方式。
Spark-Submit
Cluster 模式
使用 spark-submit 的 Cluster 模式提交作业时,由于我们的 Kubernetes 集群的 API Server 是使用自签名的证书进行 HTTPS 加密的,因此需要使用 spark.kubernetes.authenticate.submission.caCertFile
参数指定 Kubernetes 集群的 CA 证书,让 Spark 客户端信任自签名证书。注意这里的 ServiceAccount 需要自行创建并且赋予以下权限,如果你是按照顺序完成实验的,那么在前面 Spark On K8S Operator 中已经创建了该 ServiceAccount,可以跳过这一步。
❯ kubectl get rolebindings -n spark-job spark -o yaml apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: annotations: meta.helm.sh/release-name: my-spark meta.helm.sh/release-namespace: spark-operator creationTimestamp: "2021-09-29T16:10:51Z" labels: app.kubernetes.io/instance: my-spark app.kubernetes.io/managed-by: Helm app.kubernetes.io/name: spark-operator app.kubernetes.io/version: v1beta2-1.2.3-3.1.1 helm.sh/chart: spark-operator-1.1.6 name: spark namespace: spark-job resourceVersion: "204712527" selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/rolebindings/spark uid: 225970e8-472d-4ea5-acb5-08630852f76c roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: spark-role subjects: - kind: ServiceAccount name: my-spark-spark namespace: spark-job ❯ kubectl get role -n spark-job spark-role -o yaml apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: annotations: meta.helm.sh/release-name: my-spark meta.helm.sh/release-namespace: spark-operator creationTimestamp: "2021-09-29T16:10:51Z" labels: app.kubernetes.io/instance: my-spark app.kubernetes.io/managed-by: Helm app.kubernetes.io/name: spark-operator app.kubernetes.io/version: v1beta2-1.2.3-3.1.1 helm.sh/chart: spark-operator-1.1.6 name: spark-role namespace: spark-job resourceVersion: "204712525" selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/roles/spark-role uid: 436afb3f-a304-4756-b64a-978d5836c3a2 rules: - apiGroups: - "" resources: - pods verbs: - '*' - apiGroups: - "" resources: - services verbs: - '*' - apiGroups: - "" resources: - configmaps verbs: - '*'
执行 spark-submit 命令向 Kubernetes 集群提交作业。
bin/spark-submit \ --master k8s://https://11.16.0.153:6443 \ --conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \ --deploy-mode cluster \ --name spark-pi-submit \ --class org.apache.spark.examples.SparkPi \ --conf spark.kubernetes.namespace=spark-job \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.container.image.pullPolicy=Always \ --conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \ local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
关于证书不受信任这里也有个讨巧的方式,就是使用 kubectl proxy
命令将 API Server 的 HTTPS 转化为 HTTP。
❯ kubectl proxy Starting to serve on 127.0.0.1:8001
然后通过 http://localhost:8001 和 API Server 进行交互,此时就无需指定 CA 证书了。
bin/spark-submit \ --master k8s://http://localhost:8001 \ --deploy-mode cluster \ --name spark-pi-submit \ --class org.apache.spark.examples.SparkPi \ --conf spark.kubernetes.namespace=spark-job \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.container.image.pullPolicy=Always \ --conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \ local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
通过查看 Kubernetes 为本次 Spark 作业创建的 Pod 的日志,可以看到运行结果。
❯ kubectl logs -n spark-job spark-pi-submit-fc7b507c4be84351-driver ...... Pi is roughly 3.140075700378502 ......
Client 模式
Client 模式无需指定 CA 证书,但是需要使用 spark.driver.host
和 spark.driver.port
指定提交作业的 Spark 客户端所在机器的地址,端口号默认就是 7078。
bin/spark-submit \ --master k8s://https://11.16.0.153:6443 \ --deploy-mode client \ --name spark-pi-submit-client \ --class org.apache.spark.examples.SparkPi \ --conf spark.kubernetes.namespace=spark-job \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \ --conf spark.driver.host=11.8.38.43 \ --conf spark.driver.port=7078 \ /home/chengzw/spark-3.1.2-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.1.2.jar
使用 Client 模式提交作业在终端就可以直接看到输出结果了。
Spark-Shell
spark-shell 只支持 Client 方式,使用以下命令连接 Kubernetes API Server 并打开 spark-shell 交互式界面。
bin/spark-shell \ --master k8s://https://11.16.0.153:6443 \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \ --deploy-mode client \ --name spark-shell \ --conf spark.kubernetes.namespace=spark-job \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.container.image.pullPolicy=Always \ --conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \ --conf spark.driver.host=11.8.38.43 \ --conf spark.driver.port=7078
在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。
21/10/05 10:44:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://11.8.38.43:4040 Spark context available as 'sc' (master = k8s://https://11.16.0.153:6443, app id = spark-application-1633401878962). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.1.2 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101) Type in expressions to have them evaluated. Type :help for more information. scala> val range = spark.range(100) range: org.apache.spark.sql.Dataset[Long] = [id: bigint] scala> range.collect() res1: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
Spark History Server(可选)
部署 Spark History Server
在运行 Spark Application 的时候,Spark 会提供一个 WebUI 列出应用程序的运行时信息,但是一旦该应用程序执行完毕后,将无法查看应用程序执行的历史记录。Spark History Server 就是为了处理这种情况而诞生的,我们可以将 Spark 作业的日志提交到一个统一的地方,例如 HDFS,然后 Spark History Server 就可以通过读取 HDFS 目录中的文件来重新渲染生成 WebUI 界面来展示应用程序执行的历史信息。
使用以下资源文件部署一个 Spark History Server,并且通过 NodePort Service 的方式将服务暴露到集群外部,集群外部可以通过节点地址:NodePort 来访问 Spark History Server。前提是我们需要准备好 HDFS 集群,关于 HDFS 集群的安装可以参考 Hadoop 分布式集群安装。
apiVersion: apps/v1 kind: Deployment metadata: name: spark-history-server namespace: spark-job spec: selector: matchLabels: run: spark-history-server replicas: 1 template: metadata: labels: run: spark-history-server spec: containers: - image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1" name: spark-history-server args: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.history.HistoryServer"] ports: - containerPort: 18080 name: http env: - name: SPARK_HISTORY_OPTS value: "-Dspark.history.fs.logDirectory=hdfs://11.8.36.125:8020/spark-k8s" --- apiVersion: v1 kind: Service metadata: name: spark-history-server namespace: spark-job spec: ports: - name: http nodePort: 30080 port: 18080 protocol: TCP targetPort: 18080 selector: run: spark-history-server type: NodePort
Spark On K8S Operator 使用 History Server
设置 spark.eventLog.enabled
参数值为 true 启用记录 Spark 日志,spark.eventLog.dir
指定输出日志的目录为 HDFS 目录。
apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-pi namespace: spark-job spec: type: Scala mode: cluster image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1" imagePullPolicy: Always mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar" sparkVersion: "3.1.1" sparkConf: "spark.eventLog.enabled": "true" "spark.eventLog.dir": "hdfs://11.8.36.125:8020/spark-k8s" restartPolicy: type: Never driver: cores: 1 coreLimit: "1200m" memory: "512m" labels: version: 3.1.1 serviceAccount: my-spark-spark executor: cores: 1 instances: 1 memory: "512m" labels: version: 3.1.1
在集群外通过节点地址:30080 访问 Spark History Server,可以在应用程序执行完毕后看到详细的信息。
bin/spark-submit \ --master k8s://https://11.16.0.153:6443 \ --conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \ --deploy-mode cluster \ --name spark-pi-submit \ --class org.apache.spark.examples.SparkPi \ --conf spark.kubernetes.namespace=spark-job \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.container.image.pullPolicy=Always \ --conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=hdfs://11.8.36.125:8020/spark-k8s \ local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
构建镜像
上面的例子都是使用 Spark 官方自带的程序来提交作业,如果我们想要自定义一个程序可以使用 Spark 官网提供的脚本来构建镜像。
程序代码
该项目使用 Maven 来管理依赖。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.12</version> <scope>compile</scope> </dependency> </dependencies>
程序代码如下,使用 Java 编写了一个 Word Count 程序。
package com.chengzw.wordcount; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @description WordCount 示例 * @author chengzw * @since 2021/7/25 8:39 下午 */ public class MyJavaWordCount { public static void main(String[] args) { Logger.getLogger("org").setLevel(Level.OFF); System.setProperty("spark.ui.showConsoleProgress","false"); //创建配置对象 //本地运行 //SparkConf conf = new SparkConf().setAppName("MyJavaWordCount").setMaster("local"); //在Spark上运行 SparkConf conf = new SparkConf().setAppName("MyJavaWordCount"); //创建SparkContext对象 JavaSparkContext sc = new JavaSparkContext(conf); //读取hdfs数据 //在本地运行 //JavaRDD<String> rdd1= sc.textFile("/tmp/data.txt"); //在Spark上运行 JavaRDD<String> rdd1= sc.textFile(args[0]); //分词 JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String input) throws Exception { return Arrays.asList(input.split(" ")).iterator(); } }); //单词计数 word,1 JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); //相同Key的值累加 JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } }); //触发计算 List<Tuple2<String, Integer>> result = rdd4.collect(); //打印 for(Tuple2<String,Integer> r : result){ System.out.println(r._1 + "\t" + r._2); } //释放资源 sc.stop(); } }
打包
点击 mvn package 将程序打成 jar 包。
构建并上传镜像
将 jar 包放到 Spark 安装包的 examples/jars 目录中,进入 Spark 目录然后执行以下命令构建镜像。
bin/docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t my-spark:1.0.0 build
查看构建好的镜像。
❯ docker images | grep spark registry.cn-hangzhou.aliyuncs.com/public-namespace/spark v1.0.0 372341ae930d 12 minutes ago 529MB
上传镜像。
./docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t v1.0.0 push
使用自己构建的镜像执行 Word Count 程序。
bin/spark-submit \ --master k8s://https://11.16.0.153:6443 \ --conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \ --deploy-mode cluster \ --name spark-pi-submit \ --class com.chengzw.wordcount.MyJavaWordCount \ --conf spark.kubernetes.namespace=spark-job \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.container.image.pullPolicy=Always \ --conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v1.0.0 \ local:///opt/spark/examples/jars/spark-lab-1.0-SNAPSHOT.jar /etc/security/limits.conf
查看执行结果:
kubectl logs -n spark-job spark-pi-submit-37945f7c4f24e729-driver #返回结果 ...... rss 2 space 2 priority 4 4 1 this 1 "soft" 1 max 14 cpu 1 memlock 1 apply 1 ......
参考资料
- Spark 大数据分析实战
- [Spark Standalone Mode] (https://spark.apache.org/docs/0.9.0/spark-standalone.html#standby-masters-with-zookeeper)
- [Spark:Master High Availability(HA)高可用配置的2种实现] (https://www.cnblogs.com/byrhuangqiang/p/3937654.html)
- [【k8s系列1】spark on k8s 与 spark on k8s operator的对比] (https://segmentfault.com/a/1190000037503030)
- [Running Spark on Kubernetes] (http://spark.apache.org/docs/3.0.0/running-on-kubernetes.html)
- [spark-on-k8s-operator User Guide] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md)
- [spark-on-k8s-operator Quick Start Guide] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/quick-start-guide.md)
- [Kubernetes Operator for Apache Spark Design] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/design.md#architecture)
- [Setting up, Managing & Monitoring Spark on Kubernetes] (https://www.datamechanics.co/blog-post/setting-up-managing-monitoring-spark-on-kubernetes)
- [Spark UI History server on Kubernetes?] (https://stackoverflow.com/questions/51798927/spark-ui-history-server-on-kubernetes)
- [How To Manage And Monitor Apache Spark On Kubernetes - Part 1: Spark-Submit VS Kubernetes Operator] (https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-introduction-spark-submit-kubernetes-operator)
- [How To Manage And Monitor Apache Spark On Kubernetes - Part 2: Deep Dive On Kubernetes Operator For Spark] (https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-deep-dive-kubernetes-operator-for-spark)
- [Spark on K8S (Kubernetes Native)] (https://www.cnblogs.com/moonlight-lin/p/13296909.html)
- [Getting Started with Spark on Kubernetes] (http://blog.brainlounge.de/memoryleaks/getting-started-with-spark-on-kubernetes/)
- [运行支持kubernetes原生调度的Spark程序] (https://jimmysong.io/kubernetes-handbook/usecases/running-spark-with-kubernetes-native-scheduler.html)
- [深入浅出理解 Spark 部署与工作原理] (https://zhuanlan.zhihu.com/p/99398378)
- [Spark client mode 和 cluster mode 的区别] (https://baixin.ink/2018/04/28/spark-mode/)