在YARN上运行Spark
对 YARN (Hadoop NextGen) 的支持是从Spark-0.6.0开始的,后续的版本也一直持续在改进。
在YARN上启动
首先确保 HADOOP_CONF_DIR 或者 YARN_CONF_DIR 变量指向一个包含Hadoop集群客户端配置文件的目录。这些配置用于读写HDFS和连接YARN资源管理器(ResourceManager)。这些配置应该发布到YARN集群上所有的节点,这样所有的YARN容器才能使用同样的配置。如果这些配置引用了Java系统属性或者其他不属于YARN管理的环境变量,那么这些属性和变量也应该在Spark应用的配置中设置(包括驱动器、执行器,以及其AM【运行于client模式时的YARN Application Master】)
在YARN上启动Spark应用有两种模式。在cluster模式下,Spark驱动器(driver)在YARN Application Master中运行(运行于集群中),因此客户端可以在Spark应用启动之后关闭退出。而client模式下,Spark驱动器在客户端进程中,这时的YARN Application Master只用于向YARN申请资源。
与独立部署( Spark standalone )或 在Mesos 集群中不同,YARN的master地址不是在–master参数中指定的,而是在Hadoop配置文件中设置。因此,这种情况下,–master只需设置为yarn。
以下用cluster模式启动一个Spark应用:
$ ./bin/spark-submit --class path.to.your.Class \
--master yarn \
--deploy-mode cluster \
[options] \
<app jar> [app options]
例如:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar 10
以上例子中,启动了一个YARN客户端程序,使用默认的Application Master。而后SparkPi在Application Master中的子线程中运行。客户端会周期性的把Application Master的状态信息拉取下来,并更新到控制台。客户端会在你的应用程序结束后退出。参考“调试你的应用”,这一节说明了如何查看驱动器和执行器的日志。
要以client模式启动一个spark应用,只需在上面的例子中把cluster换成client。下面这个例子就是以client模式启动spark-shell:
$ ./bin/spark-shell --master yarn --deploy-mode client
增加其他JAR包
在cluster模式下,驱动器不在客户端机器上运行,所以SparkContext.addJar添加客户端本地文件就不好使了。要使客户端上本地文件能够用SparkContext.addJar来添加,可以用–jars选项:
$ ./bin/spark-submit --class my.main.Class \
--master yarn \
--deploy-mode cluster \
--jars my-other-jar.jar,my-other-other-jar.jar
my-main-jar.jar
app_arg1 app_arg2
准备
在YARN上运行Spark需要其二进制发布包构建的时候增加YARN支持。二进制发布包可以在这里下载:downloads page 。
想要自己编译,参考这里: Building Spark
配置
大多数配置,对于YARN或其他集群模式下,都是一样的。详细请参考这里: configuration page。
以下是YARN上专有的配置项。
调试你的应用
在YARN术语集中,执行器和Application Master在容器(container)中运行。YARN在一个应用程序结束后,有两种处理容器日志的模式。如果开启了日志聚合(yarn.log-aggregation-enable),那么容器日志将被复制到HDFS,并删除本地日志。而后这些日志可以在集群任何节点上用yarn logs命令查看:
yarn logs -applicationId <app ID>
以上命令,将会打印出指定应用的所有日志文件的内容。你也可以直接在HDFS上查看这些日志(HDFS shell或者HDFS API)。这些目录可以在你的YARN配置中指定(yarn.nodemanager.remote-app-log-dir和yarn.nodemanager-remote-app-log-dir-suffix)。这些日志同样还可以在Spark Web UI上Executors tab页查看。当然,你需要启动Spark history server和 MapReduce history server,再在 yarn-site.xml 中配置好 yarn.log.server.url。Spark history server UI 将把你重定向到MapReduce history server 以查看这些聚合日志。
如果日志聚合没有开启,那么日志文件将在每台机器上的 YARN_APP_LOGS_DIR 目录保留,通常这个目录指向 /tmp/logs 或者 $HADOOP_HOME/log/userlogs(这取决于Hadoop版本和安全方式)。查看日志的话,需要到每台机器上查看这些目录。子目录是按 application ID 和 container ID来组织的。这些日志同样可以在 Spark Web UI 上 Executors tab 页查看,而且这时你不需要运行MapReduce history server。
如果需要检查各个容器的启动环境,可以先把 yarn.nodemanager.delete.debug-delay-sec 增大(如:36000),然后访问应用缓存目录yarn.nodemanager.local-dirs,这时容器的启动目录。这里包含了启动脚本、jar包以及容器启动所用的所有环境变量。这对调试 classpath 相关问题尤其有用。(注意,启用这个需要管理员权限,并重启所有的node managers,因此,对托管集群不适用)
要自定义Application Master或执行器的 log4j 配置,有如下方法:
- 通过spark-submit –files 上传一个自定义的 log4j.properties 文件。
- 在 spark.driver.extraJavaOptions(对Spark驱动器)或者 spark.executor.extraJavaOptions(对Spark执行器)增加 -Dlog4j.configuration=<location of configuration file>。注意,如果使用文件,那么 file: 协议头必须显式写上,且文件必须在所节点上都存在。
- 更新 ${SPARK_CONF_DIR}/log4j.properties 文件以及其他配置。注意,如果在多个地方都配置了log4j,那么上面其他两种方法的配置优先级比本方法要高。
注意,第一种方法中,执行器和Application Master共享同一个log4j配置,在有些环境下(AM和执行器在同一个节点上运行)可能会有问题(例如,AM和执行器日志都写入到同一个日志文件)
如果你需要引用YARN放置日志文件的路径,以便YARN可以正确地展示和聚合日志,请在log4j.properties文件中使用spark.yarn.app.container.log.dir。例如,log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log 。对于流式应用,可以配置RollingFileAppender,并将文件路径设置为YARN日志目录,以避免磁盘打满,而且这些日志还可以利用YARN的日志工具访问和查看。
Spark属性
Property Name | Default | Meaning |
---|---|---|
spark.yarn.am.memory |
512m |
YARN Application Master在client模式下, 使用内存总量,与JVM内存设置格式相同(如:512m,2g)。 如果是cluster模式下,请设置 spark.driver.memory。 注意使用小写的后缀, 如:k、m、g、t、p,分别代表 kibi-, mebi, gibi-, tebi- 以及pebibytes。 |
spark.driver.cores |
1 |
YARN cluster模式下,驱动器使用的CPU core个数。 在cluster模式下,驱动器(driver)和YARN AM(application master)使用相同的JVM,所以这个属性也可以用来控制YARN AM。 如果是client模式下,请使用spark.yarn.am.cores来控制YARN AM的CPU core个数。 |
spark.yarn.am.cores |
1 |
client模式下,用来控制YARN AM的CPU core个数。 cluster模式下,请使用 spark.driver.cores。 |
spark.yarn.am.waitTime |
100s |
在cluster模式下,该属性表示YARN AM等待SparkContext初始化的时间。 在client模式下,该属性表示YARN AM等待驱动器连接的时间。 |
spark.yarn.submit.file |
默认的HDFS副本数(通常是3) | HDFS文件副本数。包括Spark jar,app jar以及其他分布式缓存文件和存档。 |
spark.yarn.preserve |
false |
设为true以保存stage相关文件(stage相关的jar包和缓存)到作业结束,而不是立即删除。 |
spark.yarn.scheduler |
3000 |
Spark AM发送给YARN资源管理器心跳的间隔(ms)。 这个值最多不能超过YARN配置的超时间隔的一半。(yarn.am.liveness-monitor.expiry-interval-ms) |
spark.yarn.scheduler |
200ms |
Spark AM的初始带外心跳间隔(有待定的资源申请时)。 其值不应该大于 spark.yarn.scheduler.heartbeat.interval-ms。 该资源分配间隔会在每次带外心跳成功后但仍有待定资源申请时倍增, 直至达到 spark.yarn.scheduler.heartbeat.interval-ms 所设定的值。 |
spark.yarn.max.executor |
执行器个数*2且不小于3 | Spark应用最大容忍执行器失败次数。 |
spark.yarn.historyServer |
(none) | Spark history server地址,如:host.com:18080 。 这个地址不要包含协议头(http://)。 默认不设置,因为history server是可选的。 应用程序结束以后,YARN资源管理器web UI通过这个地址链接到Spark history server UI。 对于这属性,可以使用YARN属性变量,且这些变量是Spark在运行时组装的。 例如,如果Spark history server和YARN资源管理器(ResourceManager)部署在同一台机器上运行, 那么这个属性可以设置为 ${hadoopconf-yarn.resourcemanager.hostname}:18080 |
spark.yarn.dist.archives |
(none) | 逗号分隔的文档列表,其指向的文档将被提取到每个执行器的工作目录下。 |
spark.yarn.dist.files |
(none) | 逗号分隔的文件列表,其指向的文件将被复制到每个执行器的工作目录下。 |
spark.executor.instances |
2 |
执行器个数。注意,这个属性和 spark.dynamicAllocation.enabled是不兼容的。 如果同时设置了 spark.dynamicAllocation.enabled,那么动态分配将被关闭,并使用 spark.executor.instances 所设置的值。 |
spark.yarn.executor |
执行器内存 * 0.10或者 384MB中较大者 | 每个执行器所分配的堆外内存(MB)总量。这些内存将被用于存储VM开销、字符串常量,以及其他原生开销等。这会使执行器所需内存增加(典型情况,增加6%~10%) |
spark.yarn.driver |
驱动器内存 * 0.10或者 384MB中较大者 | 每个驱动器所分配的堆外内存(MB)总量。 这些内存将被用于存储VM开销、字符串常量,以及其他原生开销等。 这会使执行器所需内存增加(典型情况,增加6%~10%) |
spark.yarn.am |
Application Master 内存 * 0.10或者 384MB中较大者 |
与 spark.yarn.driver.memoryOverhead 相同,只是仅用于YARN AM client模式下。 |
spark.yarn.am.port |
(random) | YARN AM所监听的端口。 在YARN client模式下,用于Spark驱动器(driver)和YARN AM通信。 而在YARN cluster模式下,这个端口将被用于动态执行器特性,这个特性会处理调度器后台杀死执行器的请求。 |
spark.yarn.queue |
default |
Spark应用提交到哪个yarn队列。 |
spark.yarn.jar |
(none) | Spark jar文件位置,如果需要覆盖默认位置,请设定这个值。 默认的,Spark on YARN会使用本地的Spark jar包,但Spark jar包同样可以使用整个集群可读的HDFS文件位置。 这使YARN可以在各节点上缓存Spark jar包,而不需要每次运行一个应用的时候都要分发。 使用 hdfs:///some/path 来指定HDFS上jar包文件路径。 |
spark.yarn.access |
(none) | 逗号分隔的HDFS namenodes。 例如 spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032。 Spark应用必须有这些机器的访问权限,并且需要配置好 kerberos(可以在同一个域或者信任的域)。 Spark需要每个namenode的安全token,以便访问集群中HDFS。 |
spark.yarn.appMasterEnv |
(none) | 增加EnvironmentVariableName所指定的环境变量到YARN AM的进程中。 用户可以指定多个环境变量。在cluster模式下,这个可以控制Spark驱动器的环境变量; 而在client模式下,只控制执行器启动器的环境变量。 |
spark.yarn |
25 |
YARN AM 启动执行器的容器最多包含多少线程数。 |
spark.yarn.am |
(none) | 在client模式下,传给YARN AM 的JVM参数。 在cluster模式下,请使用spark.driver.extraJavaOptions |
spark.yarn.am |
(none) | client模式下传给YARN AM 额外依赖库。 |
spark.yarn.maxAppAttempts |
yarn .am.max-attempts in YARN |
提交应用最大尝试次数。不应大于YARN全局配置的最大尝试次数。 |
spark.yarn.am |
(none) | 定义AM失败跟踪校验间隔。 AM运行了至少要运行这么多时间后,其失败计数才被重置。 这个特性只有配置其值后才会生效,且只支持Hadoop-2.6+ |
spark.yarn.submit |
true |
在YARN cluster模式下,控制是否客户端等到Spark应用结束后再退出。 如果设为true,客户端进程将一直等待,并持续报告应用状态。 否则,客户端会在提交完成后退出。 |
spark.yarn.am |
(none) | 一个YARN节点标签表达式(node label expression),以此来限制AM可以被调度到哪些节点上执行。 只有Hadoop 2.6+才能支持节点标签表达式,所以如果用其他版本运行,这个属性将被忽略。 |
spark.yarn.executor |
(none) | 一个YARN节点标签表达式(node label expression),以此来限制执行器可以被调度到哪些节点上启动。 只有Hadoop 2.6+才能支持节点标签表达式,所以如果在其他版本上运行时,这个属性将被忽略。 |
spark.yarn.tags |
(none) | 逗号分隔的字符串,传递YARN应用tags。 其值将出现在YARN Application Reports中,可以用来过滤和查询YARN 应用。 |
spark.yarn.keytab |
(none) | 认证文件keytab的全路径。 这个文件将被复制到访问Secure Distributed Cache的YARN 应用节点上,并且周期性的刷新登陆的ticket和代理token(本地模式下也能work) |
spark.yarn.principal |
(none) | 登陆KDC的认证,secure HDFS需要(local模式下也能用) |
spark.yarn.config |
(none) | 某些路径,可能在网关主机上能正常访问(Spark应用启动的地方),而在其他节点上的访问方式(路径)可能不同。 对于这样的路径,需要本属性配合 spark.yarn.config.replacementPath组合使用,对于支持异构配置的集群,必须配置好这两个值,Spark才能正确地启动远程进程。 replacement path 通常包含一些YARN导出的环境变量(因此,对Spark containers可见)。 例如,如果网关节点上Hadoop库安装在 /disk1/hadoop,并且其导出环境变量为 HADOOP_HOME, 就需要将 spark.yarn.config.gatewayPath 设置为 /disk1/hadoop 并将 replacement path设为 $HADOOP_HOME, 这样才能在远程节点上以正确的环境变量启动进程。 |
spark.yarn.config |
(none) | 见 spark.yarn.config.getewayPath |
spark.yarn.security |
true |
在启用安全设置的情况下,控制是否对non-HDFS服务,获取代理token。 默认地,所有支持的服务,都启用;但你也可以在某些有冲突的情况下,对某些服务禁用。 目前支持的服务有:hive,hbase |
重要提示
- 对CPU资源的请求是否满足,取决于调度器如何配置和使用。
- cluster模式下,Spark执行器(executor)和驱动器(driver)的local目录都由YARN配置决定(yarn.nodemanager.local-dirs);如果用户指定了spark.local.dir,这时候将被忽略。在client模式下,Spark执行器(executor)的local目录由YARN决定,而驱动器(driver)的local目录由spark.local.dir决定,因为这时候,驱动器不在YARN上运行。
- 选项参数 –files和 –archives中井号(#)用法类似于Hadoop。例如,你可以指定 –files localtest.txt#appSees.txt,这将会把localtest.txt文件上传到HDFS上,并重命名为 appSees.txt,而你的程序应用用 appSees.txt来引用这个文件。
- 当你在cluster模式下使用本地文件时,使用选项–jar 才能让SparkContext.addJar正常工作,而不必使用 HDFS,HTTP,HTTPS或者FTP上的文件。