前言
编者按——————————
前一阵子在技术小群有位同学在群里做了和SparkLauncher相关的问题,当然聊天信息还有前后左右,大致就是希望可以定制化Spark的提交。也是,Spark的官网其实一出来就是教我们用命令行怎么去提交,诸如命令行什么的,有下面这样的
./bin/run-example SparkPi 10
还有这样的
./bin/spark-submit examples/src/main/python/pi.py 10
这种搞搞学习还好,到了实际需求来说这这架势整不会了。当然除了本身有调度平台怼提交作业,许多场景期望在系统内部进行提交,我遇到的场景就是平台有页面生成DQC变成JAR包提交、还有就是算法平台也是流程化,这种时候比较迫切期望按照自己的需求比较灵活提交Spark的作业。
之前我也做过这块的功能,所以整个小文说道说道。
需求说明
我的需求背景是这样子的,调度平台原有的模式是通过Agent机器走Client方式提交Spark作业的:
在Agent处,其实直接就提交了,所以Master和Agent其实是在一台机器上
踩坑清单如下:
- 在Hive时代,Agent只是充当提交作业的角色,所以稳定分配个2G作业有的内存其实也满足,到了Spark时代整个就崩了,Spark的作业Master充当整个调度的作用,信息交互比较多,尤其是一些作业需要进行数据广播,这个时候经常把内存开到6G、8G、12G甚至不止,Agent机器负荷比较大所以作业比较容易失败。
- 我们其实发现了Spark本身支持Cluster模式,经常是Yarn上面有资源,但是由于Agent负荷高所以也没法提交。Cluster模式可以弥补这点,把Agent的压力进行转移,不过有个蛋疼的事情是Yarn上面不会有任何作业状态反馈,所以作业跑到多少了,成功还是失败也不知道。
- 也考虑做直接的走Docker,k8s什么的,不过当时Docker暂时内部还没搞得那么成熟,尤其是在Yarn这种离线平台调度上。
这就引出了我们要这玩意是"卧槽"还是变成"哇塞"的话题了,我们其实是需要改造Spark以Cluster模式提交,同时满足实时交互作业进度和状态。
SparkLauncher引入
其实Spark本身已经对比较灵活的提交方式做了支持,按照惯例,我们一起读一读源码中对SparkLauncher的描述
/** * Launcher for Spark applications. * <p> * Use this class to start Spark applications programmatically. The class uses a builder pattern * to allow clients to configure the Spark application and launch it as a child process. * </p> */ public class SparkLauncher extends AbstractLauncher<SparkLauncher> { ......
两点信息:
- 使用SparkLauncher类可以编程的方式启动Spark的应用,该类使用建造者模式进行构建
- 允许客户端作为一个子进程来配置使用
看到这个就开心了,可以编程嘛,丢到什么tomcat里面或者找个什么服务里面调用就是了,实际上就是提供了API的方式去提交Spark,然后想咋折腾咋折腾。
SparkLauncher使用
实际上就是编程的方式去组装参数,在SparkLauncher的package-info中给出了例子:
当然官方也有api文档,链接
一般情况下官网给的都是太简单了,没法运行,我这里给一个跑过的例子:
public class SparkLauncherTest { public static void main(String[] args) throws IOException, InterruptedException { HashMap env = new HashMap(); //基础环境变量 // env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc"); //env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151"); CountDownLatch countDownLatch = new CountDownLatch(1); SparkAppHandle sparkAppHandle = new SparkLauncher(env) .setSparkHome("/usr/local/spark") .setAppResource("/usr/local/spark/spark-demo.jar") .setMainClass("oprg.apache.spark.Dummy") .setMaster("yarn") .setDeployMode("cluster") .setConf("spark.driver.memory", "8g") .setVerbose(true).startApplication(new SparkAppHandle.Listener() { //这里监听任务状态,当任务结束时,isFinal()方法会返回true,否则返回false @Override public void stateChanged(SparkAppHandle sparkAppHandle) { if (sparkAppHandle.getState().isFinal()) { countDownLatch.countDown(); } System.out.println("state:" + sparkAppHandle.getState().toString()); } @Override public void infoChanged(SparkAppHandle sparkAppHandle) { System.out.println("Info:" + sparkAppHandle.getState().toString()); } }); System.out.println("The task is executing, please wait ...."); //线程等待任务结束 countDownLatch.await(); System.out.println("The task is finished!"); } }
其实就是对应我们在命令行提交时候的参数,用JAVA代码组装而已。提交之后会生成一个进程,对应提交.
顺带说明一下,Spark参数有些是默认的,有些是自己给的,但是很多人并不知道可以真正传递哪些参数,这个事情其实也是体现在源代码里面的,在SparkSubmitArguments上面有直接的定义:
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) extends SparkSubmitArgumentsParser with Logging { var master: String = null var deployMode: String = null var executorMemory: String = null var executorCores: String = null var totalExecutorCores: String = null var propertiesFile: String = null var driverMemory: String = null ......
参数给过去其实就是这个类的成员变量,这样就不迷茫了。具体我们还能看到转化过程,当字典翻翻挺好的
override protected def handle(opt: String, value: String): Boolean = { opt match { case NAME => name = value case MASTER => master = value case CLASS => mainClass = value case DEPLOY_MODE => if (value != "client" && value != "cluster") { error("--deploy-mode must be either \"client\" or \"cluster\"") } deployMode = value case NUM_EXECUTORS => numExecutors = value
我做的改造
我的目标其实是实现在Spark以Yarn Cluster提交,但是又可以在Agent端进行交互,对整体交互流程进行了重新设计:
核心改变主要是通过Yarn Cluster 运行,Agent和Mater通过远程RPC的方式进行消息交互,同时实现了 Agent部分获取作业的状态。
整体代码量很少,分别介绍:
- SQLApplicationDriver是运行在Container中作为Application Master角色存在,在SQLApplicationDriver中实现真正对SQL作业的提交和运行,所以作为一个Application,核心的逻辑其实就是创建SparkSession然后直接执行sql的操作。
- SparkAgentLauncher 会在agent端运行,他只是作为作业提交的代理,通过RPC的方式传输给SQLApplicationDriver,并且把作业的状态同步给调度系统,当然,提交方式就是用到了SparkAgentLauncher
def runApplicationOnCluster(appArgs:SparkAgentArguments)={ val sparkAgentLauncher=new SparkAgentLauncher() val actorSystemName = "SparSQLkAgent" val hostName = Utils.localHostName() val port = 0 val sparkConf=new SparkConf() rpcEnv = RpcEnv.create(actorSystemName, hostName, port, sparkConf, new SecurityManager(sparkConf)) sparkAgentEndpoint = rpcEnv.setupEndpoint(actorSystemName, new SparkAgentEndpoint(rpcEnv,sparkAgentLauncher)) println(s"start SparSQLkAgent, hostPort= ${sparkAgentEndpoint.address}") val hostPort = sparkAgentEndpoint.address.hostPort val file =new JFile(appArgs.sqlFile) val launcher = new SparkLauncher val appJar=System.getProperty("app.resource.jar") launcher.setAppResource(appJar) val fileList=System.getProperty("app.resource.files").split(",") //attach file if(fileList.size>0){ fileList.foreach(f=>launcher.addFile(f)) } launcher.addFile(appArgs.sqlFile) val agentSubmitTime=System.currentTimeMillis() launcher.addAppArgs(file.getName,hostPort,agentSubmitTime.toString) println(s"app.resource.jar: ${appJar}") launcher.setMaster("yarn") // 在yarn-cluster上启动,也可以再local[*]上 launcher.setMainClass("org.apache.spark.sql.yarn.SQLApplicationDriver") launcher.setDeployMode("cluster")
- SQLClientMessage 就是作为RPC的交互
object SQLClientMessage { case class ReportSQLCommand(command:String) extends SQLClientMessage case class ReportAppId(appId:String,url:String,agentSubmitTime:Long,launchTime:Long) extends SQLClientMessage case class ReportStageProcess(process:ListBuffer[Progress]) extends SQLClientMessage case class ReportJobProcess(process:ListBuffer[JobProgress]) extends SQLClientMessage case class ErrorMessage(message:String) extends SQLClientMessage case class InterruptMessage(message:String) extends SQLClientMessage case class JobEndMessage(jobId:Int,message:String) extends SQLClientMessage case class RegisteredDriver(driver: RpcEndpointRef) extends SQLClientMessage }
- Spark在启动的时候可以增加监听器,我们自己实现了监听器JobsListener,并且把Agent的Rpc的远程调用进行引入,这样就可以直接在agent拿到进度消息了
sparkSession.sparkContext.addSparkListener(new JobsListener(agentClientEndpoint))
- 一直在提RPC交互,整套是直接利用Spark自带的RPC(RpcAddress, RpcEndpointRef, RpcEnv)所以无需引入额外的依赖,我们把SQLApplicationDriver和SparkAgentLauncher分别作为Endpoint这样就在两端实现了远程交互了
整体实现了把作业推到远端执行啦!!!
后记
也算是实现一次定制化的源码开发吧,因为平时顶多打打补丁啥的,利用Spark自己的东西做点小改造还是有点意思的。另外补充点就是我还实现了进度条的功能,但是总觉得控制台打印的有点挫,因为有个RPC交互,所以会看到在一个进度上卡着的情况,小功能优化点还是比较多的~