开发者学堂课程【SaaS 模式云数据仓库系列课程 —— 2021数仓必修课:MaxCompute Spark 使用及常见问题】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/55/detail/1047
MaxCompute Spark 使用及常见问题
目录:
● MaxCompute Spark 介绍
● 开发环境搭建及常用配置
● 作业诊断
● 常见问题
一、MaxCompute Spark 介绍
MaxCompute Spark 是 MaxCompute 提供的兼容开源的 Spark 计算服务。
它在统一的计算资源和数据集权限体系之上,提供 Spark 计算框架,支持用户以熟悉的开发使用方式提交运行 Spark 作业,以满足更丰富的数据处理分析场景。
主要特性
■ 支持原生多版本 Spark 作业
■ 统一的计算资源
■ 统一的数据和权限管理
■ 与开源 Spark 相同的使用体验
适用场景∶
■离线计算场景∶GraphX、Mllib、RDD、Spark-SQL、PySpark
■ Streaming 场景
■读写 MaxCompute Table
■引用 MaxCompute Resource
■读写 VPC 环境下的服务,如 RDS、Redis、HBase、ECS 上部署的服务
■读写 OSS 非结构化存储使用限制
■不支持交互式类需求 Spark-Shell、Spark-SQL-Shell、PySpark-Shell 等
■不支持访问 MaxCompute 外部表,函数和 UDF
开发环境搭建
通过 Spark 客户端提交
(1)Yarn-Cluster 模式
下载 MC Spark 客户端
环境变量配置
√ JAVA HOME/SPARK_HOME/PATH
参数配置
√ Spark 社区版参数
√ MaxCompute 平台相关的参数
准备项目工程
√ Java/Scala∶ Jar 包
√ PySpark∶ Python 文件
任务提交
√./bin/spark-submit--master yarn-cluster
(2)Local 模式
./bin/spark-submit --master local[4]
IDEA调试
通过 Dataworks 提交
资源上传并提交
√ Java/Scala∶Jar包
√ PySpark∶ Python文件
√ File/Archive
参数配置
√主Jar+主类配置(Java/Scala)
√ 主Python文件( PySpark)
√ Spark参数配置(资源申请/平台相关/网络访问相关)
√参数配置(如果需要)
√选择文件/Archive/Jar/Python(如果需要)
任务提交并运行
√将资源和参数组合为 spark-submit 命令,本质上也是 Yarn-Cluster 模 式
二.开发环境搭建
2.1运行模式
● 通过Spark 客户端提交
Yarn-Cluster 模式,提交任务到 MaxCompute 集群中
Local模式
●通过 Dataworks 提交
本质上也是Yarn-Cluster模式,提交任务到 MaxCompute 集群中
2.2 通过客户端提交
2.2.1 Yarn-Cluster 模式
●下载 MC Spark 客户端
Spark 1.6.3
Spark 2.3.0
● 环境变量配置
参数配置
将 $SPARK_HOME/conf/spark-defaults.conf.template 重命名为 spark-defaults.conf
参数配置参考下文
●准备项目工程
1git clone https://github.com/aliyun/MaxCompute-Spark.git
2 cd spark-2.x
3 mvn clean package
●任务提交
2.2.2 Local 模式
●与Yarn Cluster 模式类似,用户首先需要做以上准备工作
●任务提交
IDEA 调试注意
IDEA 运行 Local 模式是不能直接引用 spark-defaults.conf 里的配置,需要手动在代码里指定相关配置。
一定要注意需要在 IDEA 里手动添加 MaxCompute Spark 客户端的相关依赖(jars目录),否则会出现以下报错∶
the value of spark.sql.catalogimplementation should be one of hive in-memory but was odps
2.3 通过DataWorks提交
2.3.1资源上传
●本质上 MC Spark 节点的配置对应 于 spark-submit 命令的参数和选项。
● 上传资源∶
0~50MB∶ 可以直接在 DataWorks 界面创建资源并上传
50MB~500MB∶ 可以先利用 MaxCompute 客户端(CMD)上传,然后在DataWorks 界面添加到数据开发,参考文档。
● 资源引用∶
资源提交后,可以在 DataWorks Spark 节点界面选择需要的资源(jar/python/file/archive)
任务运行时∶资源文件默认会上传到 Driver 和 Executor 的当前工作目录
2.3.2 参数和配置
● Spark 配置项∶对应于 spark-submit 命令的 --conf 选项
>accessid,accesskey,projectname,endpoint,runtime.end.point,task.major.version 无需配置
>除此之外,需要将 spark-default.conf 中的配置逐条加到 dataworks 的配置项中
●给主类传参数(如 bizdate)
>首先在调度->参数中添加参数,然后在 Spark 节点"参数"栏引用该参数。多个参数用空格分隔。
>该参数会传给用户主类,用户在代码中解析该参数即可。
>参考文档
3.1配置的位置
3.1.1 Spark 配置的位置
●用户使用 Maxcompute Spark 通常会有几个位置可以添加 Spark 配置,主要包括∶
位置1∶spark-defaults.conf,用户通过客户端提交时在 spark-defaults.conf 文件中添加的 Spark 配置
位置2∶dataworks 的配置项,用户通过 dataworks 提交时在配置项添加的 Spark 配置,这部分配置最终会在位置3中被添加
位置3∶配置在启动脚本 spark-submit --conf 选项中
位置4∶ 配置在用户代码中,用户在初始化 SparkContext 时设置的 Spark 配置
●Spark配置的优先级
用户代码> spark-submit --选项> spark-defaults.conf配置>spark-env.sh配置>默认值
3.1.2 需要区分的两种配置
● 一种是必须要配置在 bspark-defaults.conf 或者 dataworks 的配置项中才能生效、而不能配置在用户代码中、
这类配置主要的特征是∶
与Maxcompute/Cupid平台相关∶ 一般参数名中都会带odps或者cupid,通常这些参数与任务提交/资源
申请都关系∶
●显而易见,一些资源获取(如driver的内存,core,diskdriver,maxcompute资源) 在任务执行之前就会用到,如果这些参数设置在代码中,很明显平台没有办法读到,所以这些参数一定不要配置在代码中.
三、配置介绍
配置的位置
■ spark-defaults.conf(通过Spark客户端提交)■ spark-submit --confi选项(通过Spark客户端提交)
■ Dataworks配置项(本质上是 spark-submit -conf)
■ 代码中初始化 SparkContext 时设置
配置的优先级
■ 用户代码>spark-submit--选项>spark-defaults.conf配置> spark-env.sh配置>默认值区分两种类型的参数
■ 必须要配置在 spark-defaults.conf 或者 Dataworks 的配置项中才能生效(任务启动之前需要)
√访问 VPC 相关的网络白名单配置
√ 资源申请相关的配置(包括 cores/memory/disk_size)
√ MaxCompute 的 project/ 账号 /endpoint/resource
√spark.master
■ 配置在代码中可以生效的(任务运行中才需要)√ Spark 相关的优化参数
■ 推荐把任务运行与优化相关的参数配置在代码中,与资源/平台相关的参数配置在spark-defaults.conf 或 Dataworks 配置项中
四、作业诊断
Logview
■ 链接位置∶
√在任务提交时会打印日志∶日志中含有 logview 链接 (关键字 logview url)
■ 主要信息:
√ StdErr 打印的是 spark 引擎输出的日志
√ StdOut 中打印用户作业输出到控制台的内容
Spark UI 和 Spark History UI
■ Spark UI
√ 任务运行时可用
■ Spark History UI
√任务结束后可用,渲染有一定延
4.1 Logview
4.1.1 Logview 介绍
· 在任务提交时会打印日志∶日志中含有 logview链接(关键字 logview url)
· Master 以及 Worker 的 StdErr 打印的是 spark 引擎输出的日志,StdOut 中打印用户作业输出到控制台的内容。
4.1.2 利用 Logview 排查问题
● 拿到 Logview,一般首先看 Driver 的报错,briver 会包含一些关键性的错误
● 如果 Driver 中出现个中类或者方法找不到的问题,一般是 jar 包打包的问题
● 如果 Driver 中出现连接外部 VPC 或者 OSS 出现 Time out,这种情况一般要去排查一下参数配置
● 如果 Driver 中出现连接不到 Executor,或者找不到 Chunk 等错误,通常是Executor 已经提前退出,需要进一步查看 Executor 的报错,可能存在 OOM
1.根据 End Time 做排序,结束时间越早,越容易是发生问题的 Executor 节点
2.根据 Latency 做排序,Latency 代表了 Executor 的存活的时间,存活时间越短的,越有可能是根因所在
4.2 Spark Ul和HistoryServer
· Spark UI 与社区版一致;在 logivew 的 summary 模块下找到 Spark UIl 链接∶
· Spark UI 的使用与社区原生版是一致的,可以参考文档
· 注意
>Spark UI 需要鉴权,只有提交任务的 Owner 才能打开
>Spark UI 仅在作业运行时才能打开,如果任务已经结束,那么 Spark UI是无法打开的,这时候需要查看 Spark History Server Ul
五.常见问题
1.local 模式运行的问题
·问题—∶the value of spark.sql.catalogimplementation should be one of hive in-memory but was odps
>原因在于用户没有正确地按照文档将 Maxcompute Spark 的 jars 目录添加到类路径,导致加载了社区版的 spark 包,需要按照文档将 jars 目录添加到类路径
· 问题二∶ IDEA Local 模式是不能直接引用 spark-defaults.conf 里的配置,必须要把 Spark 配置项写在代码中
问题三∶访问 OSS 和 VPC∶
> Local 模式是处于用户本机环境,网络没有隔离。而Yarn-Cluster 模式是处于Maxcompute 的网络隔离环境中、必须要要配置vpc访问的相关参数
> Local 模式下访问 oss 的 endpoint 通常是外网 endpoint,而 Yarn-cluster 模式下访问 vpc 的 endpoint 是经典网络 endpoint
2.jar 包打包的问题
· java/scala 程序经常会遇到 Java 类找不到/类冲突问题∶
>类冲突∶用户 Jar 包与 Spark 或平台依赖的 Jar 包冲突
>类没有找到∶用户 Jar 包没有打成 Fat Jar 或者由于类冲突引起
● 打包是需要注意∶
> 依赖为 provided 和 cornpile 的区别∶
■ provided∶代码依赖该 jar 包,但是只在编译的时候需要用,而运行时不需要,运行时会去集群中去
寻找的相应的 jar 包
■ compile∶代码依赖该 jar 包,在编译、运行时候都需要,在集群中不存在这些 jar包,需要用户打到自己的jar包中。这种类型的 jar 包一般是一些三方库,且与 spark 运行无关,与用户代码逻辑有关
> 用户提交的 jar 包必须是 Fat jar∶
■ 必须要把 compile 类型的依赖都打到用户 jar 包中,保证代码运行时能加载到这些依赖的类
● 需要设置为 provided 的 jar 包
> groupld为org.apache.spark的Jar包
>平台相关的Jar包
■ cupid-sdk
■hadoop-yarn-client
■ odpS-sdk
需要设置为 compile 的 jar 包
>oss 相关的 jar 包
■ hadoop-fs-oss
>用户访问其他服务用到的 jar 包∶
■ 如 mysql, hbase
> 用户代码需要引用的第三方库
3.需要引入 Python 包
●很多时候用户需要用到外部 Python 依赖
,首先推荐用户使用我们打包的公共资源,包含了常用的一些数据处理,计算,以及连接外部服务(mysql,redis,hbase)的三方库
> 如果不能满足用户需要,用户可以在该公共资源的基础上上传 wheel 包
>如果 wheel 包依赖链较为复杂,可以通过 Docker 容器进行打包
●使用 Docker 容器打包∶
>为了保证与线上环境一致,避免运行时 so 包找不到的问题,需要使用 Docker 容器进行打包
>Docker 容器本质只是提供了兼容性较好的 os 环境,用户需要在容器中进行打包,并将整个 Python 目录压
缩后上传到 MaxCompute Resource 中,最后在 Spark 任务中直接引用即可
>参见文档
4.需要引入外部文件
● 需要引用到外部文件的场景
>用户作业需要读取一些配置文件
>用户作业需要额外的 jar 包/Python 库
●可以通过两种方式上传资源∶
>通过 Spark 参数上传文件
> 通过 MaxCompute Resource 上传文件
● 通过 Spark 参数上传文件
> MaxCompute Spark 支持 Spark 社区版原生的 --jars,--py-files 等参数,可以在作业提交时通过这些参数将文件上传,这些文件在任务运行时会被上传到用户的工作目录下
> 通过 DataWorks 添加任务需要的资源,参见上文
●MaxCompute Resource
> spark.hadoop.odps.cupid.resources 参数,可以直接引用 MaxCompute 中的资源,这些资源在任务运行时也会被上传到用户的工作目录下
>使用方式
(1)通过 MaxCompute 客户端将文件上传(单个文件最大支持500MB)
(2)在 Spark 作业配置中添加 spark.hadoop.odps.cupid.resources 参数∶格式为. ,如果需要引用多个文件,需要用逗号隔开
(3)如果需要重命名,格式为,∶
●如何读取上传的文件∶
>如果需要读取上传的文件资源
>或者直接通过类加载器获取文件路径,然后再读取
>参者文档
5.VPC 访问的问题
●Maxcompute Spark 是独立运行在 Maxcompute 集群的,网络与外界隔离,因此无法直接访问 vpc 和公网,需要添加以下配置。
●北京和上海 Region 使用 smartnat
>需要配置
■ spark.hadoop.odps.cupid.vpc.domain.list
■ spark.hadoop.odps.cupid.smartnat.enable=true
>访问公网∶假如要访问google.com∶443,需要做以下两步∶
■ 提工单设置 project 级别白名单,把 google.com∶443 加到odps.security.outbound.internetlist
■ 配置作业级别的公网访问白名单∶spark.hadoop.odps.cupid.internet.access.list=google.com∶443
● 其他Region∶
>只需要配置spark.hadoop.odps.cupid.vpc.domain.list
>无法访问公网
● 注意事项∶
>vpc.domain.list 需要压缩成一行,不能包含空格
>支持同时访问同一个 Region 下的多个 VPC,需要配置所有要访问的 ip∶port 的白名单
>需要在要访问的服务中添加 ip 白名单,允许 100.104.0.0/16 网段的访问
>用户要保证所有可能访问到的 IP 都已经加到 vpc.domain.list,例如如果用户要访问位于 hdfs,hbase 这种多个节点的服务,一定要把所有的节点都添加进来,不然可能会遇到 Time out
6.OOM 的问题
● 可能出现 OOM 的情况∶
> 错误1∶在某些 Executor 中出现 Cannot allocate memory,一般是系统内存不足,此时可以调整
spark.yarn.executor.memoryOverhead 参数,注意该参数是会计算到总内存数的,也不需要一次性增加太多,小心调整即可
>错误2∶Executor抛出java.lang.OutOfMemoryError∶ Java heap space
>错误3∶ GC overhead limit exceeded
> 错误4∶ No route to host∶workerd********/Could not find CoarseGrainedScheduler,这类错误一般是一些 Executor 提前退出。如果一个task 处理的数据非常大,容易发生 OOM
● Driver OOM∶Driver OOM 的可能性比较小,但是也是有可能出现的
>如果需要使用 collect 算子将 RDD 的数据全部拉取到 Driver 上进行处理,那么必须确保 Driver 的内存足够大,否则会出现 OOM 内存溢出的问题。
SparkContext,DAGScheduler 都是运行在 Driver 端的。Stage 切分也是在Driver端运行,如果用户程序有过多的步骤,切分出过多的Stage.这部分信息消耗的是Driver的内存.这个时候就需要调大Driver的内存。有时候如果stage过 多,Driver端可能会有栈溢出的问题
●一些解决方法∶
>限 制 executor 并行度,将 cores 调小∶多个同时运行的 Task 会共享一个Executor 的内存,使得单个 Task 可使用的内存减少,调小并行度能缓解内存压力
>增加单个Executor 内存
>增加分区数量,减少每个 exeCutor 负载
>考虑数据倾斜问题,因为数据倾斜导致某个 task 内存不足,其它 task 内存足够
7. No space left on device
●这个错误意味这本地磁盘不足,通常这个报错会在 executor 上出现,并导致executor 挂掉
● 解决方案
>直接增加更多的磁盘空间∶默认 driver 和 executor 都各提供 20g 的本地磁盘,当磁盘空间不足时可以调整 spark.hadoop.odps.cupid.disk.driver.device_size
>如果调整本地磁盘大小到 100g 后,仍然报该错误,说明单个 executor 写的shuffle 数据已经超过上限,可能是遇到了数据倾斜,这种情况下可以对数据重分区。或者增加 executor 的数量
8.申请资源的问题
●申请不到资源的几种现象∶
(1)在 driver 端一般会打以下日志
>WARN YarnClusterScheduler: Initial job has not accepted any resources;check your cluster UI to ensure that workers are registered and have sufficient resources
(2)在 logview 中只能看到 driver,而 worker 数量为0
(3)在 spark ui 中只能看到 driver,而 worker 数量为0
●解决方案∶
>调整任务资源∶调整用户申请的 executor 总数或者单个 executor 的资源数量(一般是内存),如果单个executor请求的内存过多可能不太容易申请到>合理安排任务执行时间
● 其他注意事项∶
>必须配置 spark.master=yarn-cluster 才会正确的申请资源
9.其他问题
●如何切换 Spark 版本
>版本号规则介绍∶示例 spark-2.3.0-odps0.32.5
■ spark-2.3.0 是社区版本的 spark 版本号,Maxcompute Spark 基于该社区版本进行适配
■ odps0.32.5 是 Maxcompute Spark 的小版本号,随着小版本号的升级,可能进行一些 bug 修复和 sdk 的升级
> 用户提交作业的的 Spark 版本可能有以下几种情况∶
■ 情况1∶直接通过本地客户端提交任务,spark 版本就是用户本地客户端的版本
■ 情况2∶用户通过 dataworks 提交任务,取决于 dataworks gateway 的默认 spark版本,当前公共云 dataworks 公共资源组 gateway 的默认版本是spark-2.3.0-odps0.32.1
■情况 3 ∶用户通过 dataworks 提交任务,配置参数spark.hadoop.odps.spark.version,则会按照配置的版本号来寻找对应的 spark 客户端,用户可以配置 spark.hadoop.odps.spark.version=spark-2.3.0-odps0.32.5 手动切换版本
■ 情况4∶该情况优先级最高,用户可以在本地客户端或者是 dataworks 提交任务时配置以下参数,则类加载的优先级最高,因此会在 spark 任务启动时优先使用该版本的spark
spark.hadoop.odps.cupid.resources = public.__spark_libs__2.3.0odps0.32.5.zip
spark.driver.extraClassPath=./public.spark_libs_2.3.0odps0.32.5.zip/*/34:52
● 需要在代码中访问配置项∶
>spark 开头的参数直接通过 SparkConf 类提供的接口直接读取即可
Server 渲染速度慢
可以添加压缩配置∶ spark.eventLog.compress=true
●如何正确地 Kill 一个运行中的 Spark 任务
通常通过两种方式kil正在运行的 Spark 任务
(1)通过 odps cmd 执行 kill + instanceld;
(2)通过 dataworks 界面执行 stop
注意,直接在 spark 客户端或者 dataworks 的任务提交界面执行 Ctrl+ C 是无法 kill一个 S park 任 务的
●日志中文乱码,添加以下配置
1. spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8-Dsun.jnu.encoding=UTF-8
2.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8-Dsun.jnu.encoding=UTF-8
3.如果是 pyspark 作业需要设置下如下两个参数∶
■ spark.yarn.appMasterEnv.PYTHONIOENCODING=utf8
■ spark.executorEnv.PYTHONIOENCODING=utf8
■ 另外在 python 脚本的最前面加上如下的代码∶
六.相关文档
● MC Spark github wiki: https:/github.com/aliyun/MaxCompute-Spark/wiki
● Spark UI: https://spark.apache.org/docs/latest/web-ui.html
● Spark 配置∶https∶//sparkapache.org/docs/2.4.5/configuration.html