Spark 系列教程(2)运行模式介绍

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spark 系列教程(2)运行模式介绍

Spark 运行模式

Apache Spark 是用于大规模数据处理的统一分析引擎,它提供了 Java、Scala、Python 和 R 语言的高级 API,以及一个支持通用的执行图计算的优化引擎。

Spark Core 是 Spark 的核心模块,负责任务调度、内存管理等功能。Spark Core 的实现依赖于 RDD(Resilient Distributed Datasets,弹性分布式数据集)的程序抽象概念。

在 Spark Core 的基础上,Spark 提供了一系列面向不同应用需求的组件,包括使用 SQL 进行结构化数据处理的 Spark SQL、用于实时流处理的 Spark Streaming、用于机器学习的 MLlib 以及用于图处理的 GraphX。

Spark 本身并没有提供分布式文件系统,因而 Spark 的数据存储主要依赖于 HDFS,也可以使用 HBase 和 S3 等作为存储层。

Spark 有多种运行模式:

  • 1.可以运行在一台机器上,称为 Local(本地)运行模式。
  • 2.可以使用 Spark 自带的资源调度系统,称为 Standalone 模式。
  • 3.可以使用 Yarn、Mesos、Kubernetes 作为底层资源调度系统,称为 Spark On Yarn、Spark On Mesos、Spark On K8S。image.pngDriver 是 Spark 中的主控进程,负责执行应用程序的 main() 方法,创建 SparkContext 对象,负责与 Spark 集群进行交互,提交 Spark 作业,并将作业转化为 Task(一个作业由多个 Task 任务组成),然后在各个 Executor 进程间对 Task 进行调度和监控。

根据应用程序提交方式的不同,Driver 在集群中的位置也有所不同,应用程序提交方式主要有两种:Client 和 Cluster,默认是 Client,可以在向 Spark 集群提交应用程序时使用 --deploy-mode 参数指定提交方式。

image.png

Local 模式

Local 模式的部署方式比较简单,只需下载安装包并解压就可以使用了。具体可以参考上一章的 Spark 系列教程(1)Word Count 的介绍,本文就不再赘述了。

在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。

❯ bin/spark-shell
21/10/07 11:50:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://chengzw:4040
Spark context available as 'sc' (master = local[*], app id = local-1633578611004).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val range = spark.range(100)
range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala>  range.collect()
res0: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)

Standalone 模式

在 Spark Standalone 模式中,资源调度是由 Spark 自己实现的。 Spark Standalone 模式是 Master-Slaves 架构的集群模式,和大部分的 Master-Slaves 结构的集群一样,存在着 Master 单点故障的问题。对于单点故障的问题,Spark 提供了两种方案:

  • 基于文件系统的单点恢复(Single-Node Recovery with Local File System),将 Application 和 Worker 的注册信息写入文件中,当 Master 宕机时,可以重新启动 Master 进程恢复工作。该方式只适用于开发或测试环境。
  • 基于 Zookeeper 的 Standby Masters(Standby Masters with ZooKeeper)。ZooKeeper 提供了一个 Leader Election 机制,利用这个机制可以保证虽然集群存在多个 Master,但是只有一个是 Active 的,其他的都是 Standby。当 Active 的 Master 出现故障时,另外的一个 Standby Master 会被选举出来,对于恢复期间正在运行的应用程序,由于 Application 在运行前已经向 Master 申请了资源,运行时 Driver 负责与 Executor 进行通信,管理整个 Application,因此 Master 的故障对 Application 的运行不会造成影响,但是会影响新的 Application 的提交。接下来将介绍 Spark Standalone 模式基于 Zookeeper 的 HA 高可用部署。image.png

前提条件

Host 设置

编辑 /etc/hosts 文件:

192.168.1.117 hadoop1
192.168.1.118 hadoop2
192.168.1.119 hadoop3

拷贝到其他两台机器上:

scp  /etc/hosts root@hadoop2:/etc/hosts
scp  /etc/hosts root@hadoop3:/etc/hosts
配置免密登录

为了方便后续拷贝文件以及执行脚本,配置 SSH 免密登录。在 hadoop1 上生成 RSA 非对称密钥对:

[root@hadoop1 hadoop]# ssh-keygen 
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa): 
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:wkMiPVpbBtjoZwBIpyvvluYtfQM9hQeHtgBFVfrwL1I root@hadoop1
The key's randomart image is:
+---[RSA 2048]----+
|+o.O+..o.        |
|. *.o.+..        |
| o..=o*=         |
|  o+oOo+o        |
|...o..+oE        |
|..  . o+ .       |
|  .o .... .      |
| .=.. o. .       |
| +o... .         |
+----[SHA256]-----+

将公钥拷贝到集群中的其他机器:

[root@hadoop1 hadoop]# ssh-copy-id root@hadoop1
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop2
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop3
安装 Java

进入 [Oracle 官网] (https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html) 下载并解压 JDK 安装包。设置环境变量,编辑 vim /etc/profile:

export JAVA_HOME=/software/jdk
export PATH=$PATH:$JAVA_HOME/bin

Zookeeper 集群部署

下载并解压安装包
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar -xzvf apache-zookeeper-3.5.8-bin.tar.gz
mv apache-zookeeper-3.5.8 /software/zk
编辑配置文件

编辑 zk/conf/zoo.cfg 文件:

#用于配置 Zookeeper 中最小时间单位的长度,单位是毫秒
tickTime=2000
#该参数用于配置 Leader 服务器等待 Follower 启动,并完成数据同步的时间
#乘上 tickTime 得到具体时间:10 * 2000 = 20000 毫秒
initLimit=10
#Leader 与 Follower 心跳检测的超时时间。
#乘上 tickTime 得到具体时间:5 * 2000 = 10000 毫秒
syncLimit=5
#数据存放目录
dataDir=/software/zk/data
#客户端连接端口
clientPort=2181
#Zookeeper集群成员地址
#2888端口用于集群间通信,leader会监听此端口
#3888端口用于leader选举
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888

同步修改后的配置文件到集群的其他节点:

scp -r zk root@hadoop2:/software/
scp -r zk root@hadoop3:/software/
标识 Server ID
#在 hadoop1 节点上执行
echo 1 > /root/zookeeper-cluster/zk1/myid
#在 hadoop2 节点上执行
echo 2 > /root/zookeeper-cluster/zk2/myid
#在 hadoop3 节点上执行
echo 3 > /root/zookeeper-cluster/zk3/myid
启动 Zookeeper 集群

分别在 3 台节点上执行以下命令启动 Zookeeper:

zk/bin/zkServer.sh start
查看 Zookeeper 集群状态

分别在 3 台节点上查看 Zookeeper 状态,可以看到此时 hadoop2 节点为 Zookeeper 的 Master 节点。

hadoop1 节点:

[root@hadoop1 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

hadoop2 节点:

[root@hadoop2 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

hadoop3 节点:

[root@hadoop3 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

Spark Standalone 模式 HA 集群部署

下载并解压安装包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
tar -xzvf spark-3.1.2-bin-hadoop2.7.tgz 
mv spark-3.1.2-bin-hadoop2.7 /software/spark
修改配置文件

编辑 spark/conf/spark-env.sh 文件,由于 Spark HA 使用 Zookeeper 来协调主从,因此需要指定 Zookeeper 的地址和 Spark 在 Zookeeper 中使用的目录。

export JAVA_HOME=/software/jdk
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop1:2181,hadoop2:2181,hadoop3:2181 -Dspark.deploy.zookeeper.dir=/spark"

编辑 spark/conf/slaves 文件:

hadoop1
hadoop2
hadoop3

同步修改后的配置文件到集群的其他节点:

scp -r spark root@hadoop2:/software/
scp -r spark root@hadoop3:/software/
启动 Spark 集群

在 hadoop1 节点上启动 Spark 集群,执行 start-all.sh 脚本会在 hadoop1 节点上启动 Master 进程,并且在 spark/conf/slaves 文件中配置的所有节点上启动 Worker 进程。

[root@hadoop1 software]# spark/sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out
hadoop2: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop2.out
hadoop1: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop1.out
hadoop3: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop3.out

登录 hadoop2 节点,启动第二个 Master(Standby Master)。

[root@hadoop2 software]# spark/sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop2.out
查看各节点的进程

在各节点执行 jps 命令查看启动的 Java 进程。可以看到 Spark 的 Master 进程分别在 hadoop1 和 hadoop2 节点上运行,Worker 进程在所有节点上运行。QuorumPeerMain 是 Zookeeper 的进程。

hadoop1 节点:

[root@hadoop1 software]# jps
18528 Worker
18427 Master
23468 QuorumPeerMain
18940 Jps

hadoop2 节点:

[root@hadoop2 software]# jps
27824 Worker
29954 Jps
23751 QuorumPeerMain
28135 Master

hadoop3 节点:

[root@hadoop3 software]# jps
11696 Worker
12939 QuorumPeerMain
13021 Jps
Zookeeper 查看节点注册状态

可以看到此时 3 个 Spark 节点都注册到 Zookeeper 上了,并且此时 192.168.1.117 hadoop1 这个节点是 Master。

[zk: localhost:2181(CONNECTED) 33] ls /spark/master_status
[worker_worker-20210821150002-192.168.1.117-42360, worker_worker-20210821150002-192.168.1.118-39584, worker_worker-20210821150002-192.168.1.119-42991]
[zk: localhost:2181(CONNECTED) 34] get /spark/master_status
192.168.1.117
Spark HA 测试

浏览器访问 http://hadoop1:8081 进入 Spark WebUI 界面,此时 hadoop1 节点 Master 的状态为 ALIVE。image.pngimage.png

[root@hadoop1 software]# spark/bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077
21/08/21 18:00:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop1:4040
Spark context available as 'sc' (master = spark://hadoop1:7077,hadoop2:7077, app id = app-20210821180100-0000).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

使用第三方资源调度系统

Spark 可以使用 Yarn、Mesos、Kubernetes 作为底层资源调度系统,目前 Mesos 使用的已经比较少了,本文将介绍 Spark 使用 Yarn 和 Kubernetes 作为调度系统的应用。

Spark On Yarn

Spark On Yarn 模式的搭建比较简单,仅需要在 Yarn 集群的一个节点上安装 Spark 客户端即可,该节点可以作为提交 Spark 应用程序到 Yarn 集群的客户端。Spark 本身的 Master 节点和 Worker 节点不需要启动。前提是我们需要准备好 Yarn 集群,关于 Yarn 集群的安装可以参考 Hadoop 分布式集群安装

使用此模式需要修改 Spark 的配置文件 conf/spark-env.sh,添加 Hadoop 相关属性,指定 Hadoop 配置文件所在的目录:

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

修改完毕后,即可运行 Spark 应用程序,例如运行 Spark 自带的求圆周率的例子,并以 Spark On Yarn 的 Cluster 模式运行。

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
/software/spark/examples/jars/spark-examples_2.12-3.1.2.jar

在 Yarn 的 ResourceManager 对应的 WebUI 界面中可以查看应用程序执行的详细信息。image.pngimage.png

Spark On K8S

目前基于 Kubernetes 的 Spark 的应用主要采用两种方式运行:

  • 1.基于 Kubernetes 的 Operator 的 [Spark on K8S Operator] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator),是 Google Cloud Platform 为了支持 Spark 而开发的一种 Operator。个人比较推荐使用 Spark on K8S Operator 的方式提交作业。
  • 2.Spark 原生支持的 [Spark On K8S] (http://spark.apache.org/docs/3.0.0/running-on-kubernetes.html),是 Spark 社区为支持 Kubernetes 这种资源管理框架而引入的 Kubernetes Client 的实现。

Spark Operator 定义了两个 CRD(Custom Resource Definitions,自定义资源定义)对象,SparkApplication 和 ScheduledSparkApplication。 这些 CRD 是 Spark 作业的抽象,使得在 Kubernetes 集群中可以使用 YAML 来定义这些作业。另外还提供了 [sparkctl] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/sparkctl/README.md) 命令行工具方便我们操控 SparkApplication 和 ScheduledSparkApplication CRD 资源对象。

使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator 容器,用于将 SparkApplication 和 ScheduledSparkApplication 这些 CRD 资源对象转换为 Kubernetes 原生的资源对象,例如 Pod,Service 等等。

image.pngimage.pngimage.png

Spark On K8S Operator(推荐)

使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator。

部署 Spark Operator

添加 Spark On K8S Operator Helm 仓库并下载 Helm 资源文件。

helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm pull spark-operator/spark-operator --untar

修改 values.yaml 文件中有以下两个地方需要修改:

  • 1.repository 镜像仓库地址,由于国内拉取 Spark 相关镜像速度较慢,我已经提前下载好镜像并且上传至阿里云镜像仓库中了,大家可以直接使用我的镜像。
  • 2.sparkJobNamespace:设置 Spark 提交作业的命名空间,会为该命名空间创建一个 ServiceAccount 并赋予相应的权限,ServiceAccount 的名字为 helm 项目名-spark
  • image.png
  • 使用 helm install 命令安装 Spark Operator,spark-job 命名空间是之后提交 Spark 作业时使用的。
kubectl create namespace spark-job
helm install my-spark spark-operator \
--namespace spark-operator --create-namespace

确认 Spark Operator Pod 已经正常运行。

❯ kubectl get pod -n spark-operator
NAME                                       READY   STATUS    RESTARTS   AGE
my-spark-spark-operator-674cbc9d9c-8x22x   1/1     Running   0          5m24s

查看在 spark-job 命名空间创建的 ServiceAccount。

❯ kubectl get serviceaccounts -n spark-job
NAME             SECRETS   AGE
.....
my-spark-spark   1         2m33s
运行 SparkApplications

SparkApplications 资源对象中通常使用的 Cluster 模式来提交作业。在 YAML 文件中指定运行应由程序的 jar 包以及 main() 方法所在的类。

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark-job
spec:
  type: Scala
  mode: cluster
  image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
  sparkVersion: "3.1.1"
  restartPolicy:
    type: Never
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: my-spark-spark
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.1

等待一会,查看 SparkApplications 状态,COMPLETED 表示已经执行完成该作业。

❯ kubectl get sparkapplications -n spark-job  spark-piNAME       STATUS      ATTEMPTS   START                  FINISH                 AGEspark-pi   COMPLETED   1          2021-10-04T13:13:27Z   2021-10-04T13:13:48Z   8h

查看在 spark-job 命名空间创建的 Pod 的日志,可以看到本次作业执行的详情。

kubectl logs -n spark-job spark-pi-driver spark-kubernetes-driver

image.png


Spark On K8S

使用 Spark On K8S 模式提交作业时我们通常可以使用 spark-submit 或者 spark-shell 两种命令行工具,其中 spark-submit 支持 Cluster 和 Client 两种提交方式,而 spark-shell 只支持 Client 一种提交方式。

Spark-Submit

Cluster 模式

使用 spark-submit 的 Cluster 模式提交作业时,由于我们的 Kubernetes 集群的 API Server 是使用自签名的证书进行 HTTPS 加密的,因此需要使用 spark.kubernetes.authenticate.submission.caCertFile 参数指定 Kubernetes 集群的 CA 证书,让 Spark 客户端信任自签名证书。注意这里的 ServiceAccount 需要自行创建并且赋予以下权限,如果你是按照顺序完成实验的,那么在前面 Spark On K8S Operator 中已经创建了该 ServiceAccount,可以跳过这一步。

❯ kubectl get rolebindings -n spark-job spark -o yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  annotations:
    meta.helm.sh/release-name: my-spark
    meta.helm.sh/release-namespace: spark-operator
  creationTimestamp: "2021-09-29T16:10:51Z"
  labels:
    app.kubernetes.io/instance: my-spark
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: spark-operator
    app.kubernetes.io/version: v1beta2-1.2.3-3.1.1
    helm.sh/chart: spark-operator-1.1.6
  name: spark
  namespace: spark-job
  resourceVersion: "204712527"
  selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/rolebindings/spark
  uid: 225970e8-472d-4ea5-acb5-08630852f76c
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: spark-role
subjects:
- kind: ServiceAccount
  name: my-spark-spark
  namespace: spark-job
❯ kubectl get role -n spark-job  spark-role -o yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  annotations:
    meta.helm.sh/release-name: my-spark
    meta.helm.sh/release-namespace: spark-operator
  creationTimestamp: "2021-09-29T16:10:51Z"
  labels:
    app.kubernetes.io/instance: my-spark
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: spark-operator
    app.kubernetes.io/version: v1beta2-1.2.3-3.1.1
    helm.sh/chart: spark-operator-1.1.6
  name: spark-role
  namespace: spark-job
  resourceVersion: "204712525"
  selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/roles/spark-role
  uid: 436afb3f-a304-4756-b64a-978d5836c3a2
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - '*'
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - '*'
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - '*'

执行 spark-submit 命令向 Kubernetes 集群提交作业。

bin/spark-submit \
--master  k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
 local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar

关于证书不受信任这里也有个讨巧的方式,就是使用 kubectl proxy 命令将 API Server 的 HTTPS 转化为 HTTP。

❯ kubectl proxy
Starting to serve on 127.0.0.1:8001

然后通过 http://localhost:8001 和 API Server 进行交互,此时就无需指定 CA 证书了。

bin/spark-submit \
--master  k8s://http://localhost:8001 \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
 local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar

通过查看 Kubernetes 为本次 Spark 作业创建的 Pod 的日志,可以看到运行结果。

❯ kubectl logs -n spark-job  spark-pi-submit-fc7b507c4be84351-driver
......
Pi is roughly 3.140075700378502
......

Client 模式

Client 模式无需指定 CA 证书,但是需要使用 spark.driver.hostspark.driver.port 指定提交作业的 Spark 客户端所在机器的地址,端口号默认就是 7078。

bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--deploy-mode client \
--name spark-pi-submit-client \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.driver.host=11.8.38.43 \
--conf spark.driver.port=7078 \
/home/chengzw/spark-3.1.2-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.1.2.jar

使用 Client 模式提交作业在终端就可以直接看到输出结果了。image.png

Spark-Shell

spark-shell 只支持 Client 方式,使用以下命令连接 Kubernetes API Server 并打开 spark-shell 交互式界面。

bin/spark-shell \
--master  k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode client \
--name spark-shell \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.driver.host=11.8.38.43 \
--conf spark.driver.port=7078

在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。

21/10/05 10:44:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://11.8.38.43:4040
Spark context available as 'sc' (master = k8s://https://11.16.0.153:6443, app id = spark-application-1633401878962).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val range = spark.range(100)
range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> range.collect()
res1: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
Spark History Server(可选)
部署 Spark History Server

在运行 Spark Application 的时候,Spark 会提供一个 WebUI 列出应用程序的运行时信息,但是一旦该应用程序执行完毕后,将无法查看应用程序执行的历史记录。Spark History Server 就是为了处理这种情况而诞生的,我们可以将 Spark 作业的日志提交到一个统一的地方,例如 HDFS,然后 Spark History Server 就可以通过读取 HDFS 目录中的文件来重新渲染生成 WebUI 界面来展示应用程序执行的历史信息。

使用以下资源文件部署一个 Spark History Server,并且通过 NodePort Service 的方式将服务暴露到集群外部,集群外部可以通过节点地址:NodePort 来访问 Spark History Server。前提是我们需要准备好 HDFS 集群,关于 HDFS 集群的安装可以参考 Hadoop 分布式集群安装

apiVersion: apps/v1
kind: Deployment
metadata:
  name: spark-history-server
  namespace: spark-job
spec:
  selector:
    matchLabels:
      run: spark-history-server
  replicas: 1
  template:
    metadata:
      labels:
        run: spark-history-server
    spec:
      containers:
        - image:  "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
          name: spark-history-server
          args: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.history.HistoryServer"]
          ports:
            - containerPort: 18080
              name: http
          env:
          - name: SPARK_HISTORY_OPTS
            value: "-Dspark.history.fs.logDirectory=hdfs://11.8.36.125:8020/spark-k8s"
---
apiVersion: v1
kind: Service
metadata:
  name: spark-history-server
  namespace: spark-job
spec:
  ports:
  - name: http
    nodePort: 30080
    port: 18080
    protocol: TCP
    targetPort: 18080
  selector:
     run: spark-history-server
  type: NodePort
Spark On K8S Operator 使用 History Server

设置 spark.eventLog.enabled 参数值为 true 启用记录 Spark 日志,spark.eventLog.dir 指定输出日志的目录为 HDFS 目录。

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark-job
spec:
  type: Scala
  mode: cluster
  image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
  sparkVersion: "3.1.1"
  sparkConf:
    "spark.eventLog.enabled": "true"
    "spark.eventLog.dir": "hdfs://11.8.36.125:8020/spark-k8s"
  restartPolicy:
    type: Never
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: my-spark-spark
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.1

在集群外通过节点地址:30080 访问 Spark History Server,可以在应用程序执行完毕后看到详细的信息。image.pngimage.png


bin/spark-submit \
--master  k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs://11.8.36.125:8020/spark-k8s \
 local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar

构建镜像

上面的例子都是使用 Spark 官方自带的程序来提交作业,如果我们想要自定义一个程序可以使用 Spark 官网提供的脚本来构建镜像。

程序代码

该项目使用 Maven 来管理依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.12</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

程序代码如下,使用 Java 编写了一个 Word Count 程序。

package com.chengzw.wordcount;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @description WordCount 示例
 * @author chengzw
 * @since 2021/7/25 8:39 下午
 */
public class MyJavaWordCount {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.OFF);
        System.setProperty("spark.ui.showConsoleProgress","false");
        //创建配置对象
        //本地运行
        //SparkConf conf = new SparkConf().setAppName("MyJavaWordCount").setMaster("local");
        //在Spark上运行
        SparkConf conf = new SparkConf().setAppName("MyJavaWordCount");
        //创建SparkContext对象
        JavaSparkContext sc = new JavaSparkContext(conf);
        //读取hdfs数据
        //在本地运行
        //JavaRDD<String> rdd1= sc.textFile("/tmp/data.txt");
        //在Spark上运行
        JavaRDD<String> rdd1= sc.textFile(args[0]);
        //分词
        JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String input) throws Exception {
                return Arrays.asList(input.split(" ")).iterator();
            }
        });
        //单词计数 word,1
        JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        //相同Key的值累加
        JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer a, Integer b) throws Exception {
                return a + b;
            }
        });
        //触发计算
        List<Tuple2<String, Integer>> result = rdd4.collect();
        //打印
        for(Tuple2<String,Integer> r : result){
            System.out.println(r._1 + "\t" + r._2);
        }
        //释放资源
        sc.stop();
    }
}

打包

点击 mvn package 将程序打成 jar 包。image.png

构建并上传镜像

将 jar 包放到 Spark 安装包的 examples/jars 目录中,进入 Spark 目录然后执行以下命令构建镜像。

bin/docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t my-spark:1.0.0 build

查看构建好的镜像。

❯ docker images | grep spark
registry.cn-hangzhou.aliyuncs.com/public-namespace/spark                v1.0.0       372341ae930d   12 minutes ago   529MB

上传镜像。

./docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t v1.0.0 push

使用自己构建的镜像执行 Word Count 程序。

bin/spark-submit \
--master  k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class com.chengzw.wordcount.MyJavaWordCount \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v1.0.0 \
 local:///opt/spark/examples/jars/spark-lab-1.0-SNAPSHOT.jar /etc/security/limits.conf

查看执行结果:

kubectl logs -n spark-job spark-pi-submit-37945f7c4f24e729-driver
#返回结果
......
rss 2
space 2
priority 4
4 1
this 1
"soft" 1
max 14
cpu 1
memlock 1
apply 1
......

参考资料

  • Spark 大数据分析实战
  • [Spark Standalone Mode] (https://spark.apache.org/docs/0.9.0/spark-standalone.html#standby-masters-with-zookeeper)
  • [Spark:Master High Availability(HA)高可用配置的2种实现] (https://www.cnblogs.com/byrhuangqiang/p/3937654.html)
  • [【k8s系列1】spark on k8s 与 spark on k8s operator的对比] (https://segmentfault.com/a/1190000037503030)
  • [Running Spark on Kubernetes] (http://spark.apache.org/docs/3.0.0/running-on-kubernetes.html)
  • [spark-on-k8s-operator User Guide] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md)
  • [spark-on-k8s-operator Quick Start Guide] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/quick-start-guide.md)
  • [Kubernetes Operator for Apache Spark Design] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/design.md#architecture)
  • [Setting up, Managing & Monitoring Spark on Kubernetes] (https://www.datamechanics.co/blog-post/setting-up-managing-monitoring-spark-on-kubernetes)
  • [Spark UI History server on Kubernetes?] (https://stackoverflow.com/questions/51798927/spark-ui-history-server-on-kubernetes)
  • [How To Manage And Monitor Apache Spark On Kubernetes - Part 1: Spark-Submit VS Kubernetes Operator] (https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-introduction-spark-submit-kubernetes-operator)
  • [How To Manage And Monitor Apache Spark On Kubernetes - Part 2: Deep Dive On Kubernetes Operator For Spark] (https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-deep-dive-kubernetes-operator-for-spark)
  • [Spark on K8S (Kubernetes Native)] (https://www.cnblogs.com/moonlight-lin/p/13296909.html)
  • [Getting Started with Spark on Kubernetes] (http://blog.brainlounge.de/memoryleaks/getting-started-with-spark-on-kubernetes/)
  • [运行支持kubernetes原生调度的Spark程序] (https://jimmysong.io/kubernetes-handbook/usecases/running-spark-with-kubernetes-native-scheduler.html)
  • [深入浅出理解 Spark 部署与工作原理] (https://zhuanlan.zhihu.com/p/99398378)
  • [Spark client mode 和 cluster mode 的区别] (https://baixin.ink/2018/04/28/spark-mode/)


相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
8月前
|
SQL 分布式计算 Spark
Spark 教程系列
Spark 教程系列
65 0
|
8月前
|
分布式计算 监控 Spark
Spark 任务运行时日志分析
Spark 任务运行时日志分析
115 0
|
3月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
201 6
|
3月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
88 2
|
5月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
285 3
|
3月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
47 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
6月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
441 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
6月前
|
分布式计算 运维 Serverless
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。
297 7
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
|
6月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
6月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。