随着大数据时代的到来,以及kubernetes的愈发火热,好多公司已经把spark应用从yarn迁移到k8s,当然也踩了不少的坑, 现在我们来分析一下spark on k8s的DynamicResourceAllocation这个坑
spark on yarn 中的DynamicResourceAllocation
spark on yarn对于DynamicResourceAllocation分配来说,从spark 1.2版本就已经开始支持了.
对于spark熟悉的人都知道,如果我们要开启DynamicResourceAllocation,就得有ExternalShuffleService服务,
对于yarn来说ExternalShuffleService是作为辅助服务开启的,具体配置如下:
<property> <name>yarn.nodemanager.aux-services</name> <value>spark_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>spark.shuffle.service.port</name> <value>7337</value> </property>
重启nodeManager,这样在每个nodeManager节点就会启动一个YarnShuffleService,之后在spark应用中设置spark.dynamicAllocation.enabled 为true,这样就能达到运行时资源动态分配的效果
我们直接从CoarseGrainedExecutorBackend中SparkEnv创建开始说,每一个executor的启动,必然会经过CoarseGrainedExecutorBackend main方法,而main中就涉及到SparkEnv的创建
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
而sparkEnv的创建就涉及到BlockManager的创建。沿着代码往下走,最终
val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) val blockManager = new BlockManager( executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, externalShuffleClient)
在blockManager的initialize方法中,就会进行registerWithExternalShuffleServer
// Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() }
如果我们开启了ExternalShuffleService,对于yarn就是YarnShuffleService,就会把当前的ExecutorShuffleInfo注册到host为shuffleServerId.host, port为shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:
val shuffleConfig = new ExecutorShuffleInfo( diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName)
这里我重点分析一下registerWithExternalShuffleServer的方法中的以下片段
// Synchronous and will throw an exception if we cannot connect. blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer( shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
该代码中shuffleServerId来自于:
shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId }
而blockTransferService.hostName 是我们在SparkEnv中创建的时候由advertiseAddress传过来的,
最终由CoarseGrainedExecutorBackend 主类参数hostname过来的,那到底怎么传过来的呢?
参照ExecutorRunnable的prepareCommand方法,
val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId", resourceProfileId.toString) ++
而这个hostname的值最终由YarnAllocator的方法runAllocatedContainers
val executorHostname = container.getNodeId.getHost
传递过来的,也就是说我们最终获取到了yarn节点,也就是nodeManager的host
这样每个启动的executor,就向executor所在的nodeManager的YarnShuffleService注册了ExecutorShuffleInfo信息,这样对于开启了动态资源分配的
ExternalBlockStoreClient 来说fetchBlocksg过程就和未开启动态资源分配的NettyBlockTransferService大同小异了
spark on k8s(kubernetes) 中的DynamicResourceAllocation
参考之前的文章,我们知道在entrypoint中我们在启动executor的时候,我们传递了hostname参数
executor) shift 1 CMD=( ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP )
而SPARK_EXECUTOR_POD_IP是运行中的POD IP,参考BasicExecutorFeatureStep类片段:
Seq(new EnvVarBuilder() .withName(ENV_EXECUTOR_POD_IP) .withValueFrom(new EnvVarSourceBuilder() .withNewFieldRef("v1", "status.podIP") .build()) .build())
这样按照以上流程的分析,即使我们在每个k8s节点开启ExternalShuffleService服务,且pod挂载了持久化盘,
executor也不能向k8s节点ExternalShuffleService服务注册,因为我们注册的节点是POD IP,而不是节点IP,
当然spark社区早就提出了未开启external shuffle service的动态资源分配,且已经合并到master分支.
具体配置,可以参照如下:
spark.dynamicAllocation.enabled true spark.dynamicAllocation.shuffleTracking.enabled true spark.dynamicAllocation.minExecutors 1 spark.dynamicAllocation.maxExecutors 4 spark.dynamicAllocation.executorIdleTimeout 60s