在 kubernetes 中用 alluxio 加速 spark 数据访问

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: Alluxio是一个开源的基于内存的分布式存储系统,适合作为云上大数据和AI / ML的数据编排方案。本文主要为大家讲解如何用alluxio加速spark数据访问。

7.jpg
镜像下载、域名解析、时间同步请点击 阿里巴巴开源镜像站

一、背景信息

1. alluxio

Alluxio是一个开源的基于内存的分布式存储系统,适合作为云上大数据和AI / ML的数据编排方案。Alluxio可以同时管理多个底层文件系统,将不同的文件系统统一在同一个名称空间下,让上层客户端可以自由访问统一名称空间内的不同路径,不同存储系统的数据。
alluxio的short-circuit功能可以使alluxio客户端直接访问alluxio worker所在主机的工作存储,而不需要通过网络栈与alluxio worker完成通信,可以提高性能。

2. spark operator

Spark-operator用于管理k8s集群中spark job。通过spark-operator可以在k8s集群中创建、查看和删除spark job。

二、前提条件

本文档的操作依赖如下的一些条件:

  • kubernetes集群:版本大于1.8,本次实验的集群通过阿里云容器服务创建,集群名称为"ack-create-by-openapi-1"。

image1.png

  • 安装有linux或者mac操作系统的计算机作为我们的实验环境(本次实验中,假设该计算机名称为alluxio-test)。该计算机需要准备如下环境:
  • docker >= 17.06
  • kubectl >= 1.8,能够连接kubernets集群ack-create-by-openapi-1

三、实验步骤

实验步骤主要包括如下几步:

  • 部署alluxio
  • 部署spark-operator
  • 制作spark docker镜像
  • 上传文件到alluxio
  • 提交spark job

下面将对每个步骤进行说明:
### 1. 部署alluxio
进入容器服务应用目录,在右上角的搜索框中搜索"alluxio",然后进入alluxio主界面,如图:

image2.png 然后选择将alluxio安装到目标集群上(本次实验的集群为"ack-create-by-openapi-1"),最后点击创建,如图 image3.png 点击创建后,使用kubectl给待安装的alluxio组件的节点打上标签"alluxio=true",首先查看该集群有哪些节点:

# kubectl get nodes -o wide
NAME                      STATUS   ROLES    AGE   VERSION            INTERNAL-IP    EXTERNAL-IP   OS-IMAGE                               KERNEL-VERSION            CONTAINER-RUNTIME
cn-beijing.192.168.8.12   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.12   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.13   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.13   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.14   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.14   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.15   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.15   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.16   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.16   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.17   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.17   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5

可以看到有三个worker节点,分别为:

  • cn-beijing.192.168.8.15
  • cn-beijing.192.168.8.16
  • cn-beijing.192.168.8.17

我们给是三个节点都打上标签"alluxio=true":

# kubectl label nodes cn-beijing.192.168.8.15 \
  cn-beijing.192.168.8.16 \
  cn-beijing.192.168.8.17 \
  alluxio=true

使用kubectl查看各个pod是否都处于running状态:

# kubectl get po -n alluxio
NAME                   READY   STATUS    RESTARTS   AGE
alluxio-master-0       2/2     Running   0          4h1m
alluxio-worker-5zg26   2/2     Running   0          4h1m
alluxio-worker-ckmr9   2/2     Running   0          4h1m
alluxio-worker-dvgvd   2/2     Running   0          4h1m

验证alluxio是否处于ready:

# kubectl exec -ti alluxio-master-0 -n alluxio bash
//下面步骤alluxio-master-0 pod中执行
bash-4.4# alluxio fsadmin report capacity
Capacity information for all workers:
    Total Capacity: 3072.00MB
        Tier: MEM  Size: 3072.00MB
    Used Capacity: 0B
        Tier: MEM  Size: 0B
    Used Percentage: 0%
    Free Percentage: 100%
Worker Name      Last Heartbeat   Storage       MEM
192.168.8.15    0                capacity      1024.00MB
                                  used          0B (0%)
192.168.8.16    0                capacity      1024.00MB
                                  used          0B (0%)
192.168.8.17    0                capacity      1024.00MB
                                  used          0B (0%)

2. 部署spark-operator

进入容器服务应用目录,在右上角的搜索框中搜索"ack-spark-operator",然后进入ack-spark-operator主界面,如图: image4.png 选择将ack-spark-operator安装到目标集群上(本次实验的集群为"ack-create-by-openapi-1"),然后点击创建,如图: image5.png sparkctl是一个用于提交spark job到k8s的命令行工具,需要将sparkctl安装到我们在"前提条件"中所提到的实验环境"alluxio-test"中:

# wget http://spark-on-k8s.oss-cn-beijing.aliyuncs.com/sparkctl/sparkctl-linux-amd64 -O /usr/local/bin/sparkctl
# chmod +x /usr/local/bin/sparkctl

3. 制作spark docker镜像

spark下载页面下载所需的spark版本,本次实验选择的saprk版本为2.4.6。运行如下命令下载spark:

# cd /root
# wget https://mirror.bit.edu.cn/apache/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz

下载完成后,执行解压操作:

# tar -xf spark-2.4.6-bin-hadoop2.7.tgz
# export SPARK_HOME=/root/spark-2.4.6-bin-hadoop2.7

spark docker镜像是我们提交spark任务时使用到的镜像,这个镜像中需要包含alluxio client jar包。使用如下的命令获取alluxio client jar包:

# id=$(docker create alluxio/alluxio-enterprise:2.2.1-1.4)
# docker cp $id:/opt/alluxio/client/alluxio-enterprise-2.2.1-1.4-client.jar \
    $SPARK_HOME/jars/alluxio-enterprise-2.2.1-1.4-client.jar
# docker rm -v $id 1>/dev/null

alluxio client jar包准备好以后,开始构建镜像:

# docker build -t spark-alluxio:2.4.6 -f kubernetes/dockerfiles/spark/Dockerfile $SPARK_HOME

请记住镜像名称“spark-alluxio:2.4.6”,在向k8s提交spark job中会用到这个信息。
镜像构建完成以后,对镜像的处理有两种方式:

  • 如果有私有镜像仓库,将该镜像推送到私有镜像仓库中,同时保证k8s集群节点能够pull该镜像
  • 如果没有私有镜像仓库,那么需要使用docker save命令将该镜像导出,然后scp到k8s集群的各个节点,在每个节点上使用docker load命令将镜像导入,这样就能保证每个节点上都存在该镜像。

4. 上传文件到alluxio

文章开头提到过:本次实验是提交一个spark job到k8s中,该spark job的目标是对某一个文件统计每一个单词出现的次数。现在需要把这个文件传到alluxio存储上,这里为了方便,直接把alluxio master中/opt/alluxio-2.3.0-SNAPSHOT/LICENSE(文件路径可能因alluxio版本有点差异)这个文件传到alluxio上。
使用"kubectl exec"进入alluxio master pod,并拷贝当前目录下的LICENSE文件到alluxio的根目录中:

# kubectl exec -ti alluxio-master-0  -n alluxio bash
//下面步骤alluxio-master-0 pod中执行
bash-4.4# alluxio fs copyFromLocal LICENSE /

接着查看一下LICENSE这个文件分成的block被alluxio放到哪些worker上了。

# kubectl exec -ti alluxio-master-0 -n alluxio bash
//下面步骤alluxio-master-0 pod中执行
bash-4.4# alluxio fs stat /LICENSE
/LICENSE is a file path.
FileInfo{fileId=33554431, fileIdentifier=null, name=LICENSE, path=/LICENSE, ufsPath=/opt/alluxio-2.3.0-SNAPSHOT/underFSStorage/LICENSE, length=27040, blockSizeBytes=67108864, creationTimeMs=1592381889733, completed=true, folder=false, pinned=false, pinnedlocation=[], cacheable=true, persisted=false, blockIds=[16777216], inMemoryPercentage=100, lastModificationTimesMs=1592381890390, ttl=-1, lastAccessTimesMs=1592381890390, ttlAction=DELETE, owner=root, group=root, mode=420, persistenceState=TO_BE_PERSISTED, mountPoint=false, replicationMax=-1, replicationMin=0, fileBlockInfos=[FileBlockInfo{blockInfo=BlockInfo{id=16777216, length=27040, locations=[BlockLocation{workerId=8217561227881498090, address=WorkerNetAddress{host=192.168.8.17, containerHost=, rpcPort=29999, dataPort=29999, webPort=30000, domainSocketPath=, tieredIdentity=TieredIdentity(node=192.168.8.17, rack=null)}, tierAlias=MEM, mediumType=MEM}]}, offset=0, ufsLocations=[]}], mountId=1, inAlluxioPercentage=100, ufsFingerprint=, acl=user::rw-,group::r--,other::r--, defaultAcl=}
Containing the following blocks:
BlockInfo{id=16777216, length=27040, locations=[BlockLocation{workerId=8217561227881498090, address=WorkerNetAddress{host=192.168.8.17, containerHost=, rpcPort=29999, dataPort=29999, webPort=30000, domainSocketPath=, tieredIdentity=TieredIdentity(node=192.168.8.17, rack=null)}, tierAlias=MEM, mediumType=MEM}]}

可以看到LICENSE这个文件只有一个block(id为16777216),被放在了ip为192.168.8.17的k8s节点上。我们使用kubectl查看该节点名称为cn-beijing.192.168.8.17

# kubectl get nodes -o wide
NAME                      STATUS   ROLES    AGE   VERSION            INTERNAL-IP    EXTERNAL-IP   OS-IMAGE                               KERNEL-VERSION            CONTAINER-RUNTIME
cn-beijing.192.168.8.12   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.12   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.13   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.13   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.14   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.14   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.15   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.15   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.16   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.16   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.17   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.17   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5

5. 提交spark job

下面的步骤将提交一个spark job到k8s集群中,该job主要是计算alluxio中/LICENSE文件的每个单词出现的次数。
在步骤3.4中我们获取到LICENSE这个文件所包含的block都在节点cn-beijing.192.168.8.17上,此次实验中,我们通过指定node selector让spark driver和spark executor都运行在节点cn-beijing.192.168.8.17,验证在关闭alluxio的short-circuit功能的情况下,spark executor和alluxio worker之间的通信是否通过网络栈完成。

  • 说明:如果在开启alluxio的short-circuit功能的情况下,并且spark executor与其所要访问的文件(本次实验为/LICENSE这个文件)的block在同一个k8s节点上,那么spark executor中的alluxio client与该k8s节点上的alluxio worker之间的通信通过domain socket方式完成。

首先生成提交spark job的yaml文件:

# export SPARK_ALLUXIO_IMAGE=<步骤3.3中制作的image,即spark-alluxio:2.4.6>
# export ALLUXIO_MASTER="alluxio-master-0"
# export TARGET_NODE=<步骤3.4获取到的LICENSE文件的block存储的节点,即cn-beijing.192.168.8.17>
# cat > /tmp/spark-example.yaml <<- EOF
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-count-words
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: "$SPARK_ALLUXIO_IMAGE"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.JavaWordCount
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.6.jar"
  arguments:
    - alluxio://${ALLUXIO_MASTER}.alluxio:19998/LICENSE
  sparkVersion: "2.4.5"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 2.4.5
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
    nodeSelector:
      kubernetes.io/hostname: "$TARGET_NODE"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.5
    nodeSelector:
      kubernetes.io/hostname: "$TARGET_NODE"
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
EOF

然后,使用sparkctl提交spark job:

# sparkctl create /tmp/spark-example.yaml

四、实验结果

当提交任务后,使用kubectl查看spark driver的日志:

# kubectl get po -l spark-role=driver
NAME                                 READY   STATUS      RESTARTS   AGE
spark-alluxio-1592296972094-driver   0/1     Completed   0          4h33m
# kubectl logs spark-alluxio-1592296972094-driver --tail 20
USE,: 3
Patents: 2
d): 1
comment: 1
executed: 1
replaced: 1
mechanical: 1
20/06/16 13:14:28 INFO SparkUI: Stopped Spark web UI at http://spark-alluxio-1592313250782-driver-svc.default.svc:4040
20/06/16 13:14:28 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
20/06/16 13:14:28 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
20/06/16 13:14:28 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
20/06/16 13:14:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/06/16 13:14:28 INFO MemoryStore: MemoryStore cleared
20/06/16 13:14:28 INFO BlockManager: BlockManager stopped
20/06/16 13:14:28 INFO BlockManagerMaster: BlockManagerMaster stopped
20/06/16 13:14:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/06/16 13:14:28 INFO SparkContext: Successfully stopped SparkContext
20/06/16 13:14:28 INFO ShutdownHookManager: Shutdown hook called
20/06/16 13:14:28 INFO ShutdownHookManager: Deleting directory /var/data/spark-2f619243-59b2-4258-ba5e-69b8491123a6/spark-3d70294a-291a-423a-b034-8fc779244f40
20/06/16 13:14:28 INFO ShutdownHookManager: Deleting directory /tmp/spark-054883b4-15d3-43ee-94c3-5810a8a6cdc7

最后我们登陆到alluxio master上,查看相关指标统计到的值:

# kubectl exec -ti alluxio-master-0 -n alluxio bash
//下面步骤alluxio-master-0 pod中执行
bash-4.4# alluxio fsadmin report metrics
Cluster.BytesReadAlluxio  (Type: COUNTER, Value: 290.47KB)
Cluster.BytesReadAlluxioThroughput  (Type: GAUGE, Value: 22.34KB/MIN)
Cluster.BytesReadDomain  (Type: COUNTER, Value: 0B)
Cluster.BytesReadDomainThroughput  (Type: GAUGE, Value: 0B/MIN)

BytesReadAlluxio和BytesReadAlluxioThroughput代表数据从网络栈传输;BytesReadDomain和BytesReadDomainThroughput代表数据从domain socket传输。可以看到所有数据都是从网络栈传输的(即使spark executor和LICENSE文件的block在同一k8s节点上)。

五、参考文档

阿里巴巴开源镜像站 提供全面,高效和稳定的镜像下载服务。钉钉搜索 ' 21746399 ‘ 加入镜像站官方用户交流群。”

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
50 3
|
3月前
|
存储 分布式计算 Java
|
3月前
|
分布式计算 监控 大数据
如何处理 Spark 中的倾斜数据?
【8月更文挑战第13天】
248 4
|
3月前
|
存储 缓存 分布式计算
|
3月前
|
SQL 存储 分布式计算
|
3月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
51 1
|
4月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
弹性计算 分布式计算 DataWorks
DataWorks产品使用合集之spark任务如何跨空间取表数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
42 1
|
5月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。