利用SparkLauncher实现Spark Cluster模式下的远端交互

简介: 利用SparkLauncher实现Spark Cluster模式下的远端交互

前言

编者按——————————

前一阵子在技术小群有位同学在群里做了和SparkLauncher相关的问题,当然聊天信息还有前后左右,大致就是希望可以定制化Spark的提交。也是,Spark的官网其实一出来就是教我们用命令行怎么去提交,诸如命令行什么的,有下面这样的

./bin/run-example SparkPi 10

还有这样的

./bin/spark-submit examples/src/main/python/pi.py 10

这种搞搞学习还好,到了实际需求来说这这架势整不会了。当然除了本身有调度平台怼提交作业,许多场景期望在系统内部进行提交,我遇到的场景就是平台有页面生成DQC变成JAR包提交、还有就是算法平台也是流程化,这种时候比较迫切期望按照自己的需求比较灵活提交Spark的作业。

之前我也做过这块的功能,所以整个小文说道说道。

需求说明

我的需求背景是这样子的,调度平台原有的模式是通过Agent机器走Client方式提交Spark作业的:

在Agent处,其实直接就提交了,所以Master和Agent其实是在一台机器上

踩坑清单如下:

  1. 在Hive时代,Agent只是充当提交作业的角色,所以稳定分配个2G作业有的内存其实也满足,到了Spark时代整个就崩了,Spark的作业Master充当整个调度的作用,信息交互比较多,尤其是一些作业需要进行数据广播,这个时候经常把内存开到6G、8G、12G甚至不止,Agent机器负荷比较大所以作业比较容易失败。
  2. 我们其实发现了Spark本身支持Cluster模式,经常是Yarn上面有资源,但是由于Agent负荷高所以也没法提交。Cluster模式可以弥补这点,把Agent的压力进行转移,不过有个蛋疼的事情是Yarn上面不会有任何作业状态反馈,所以作业跑到多少了,成功还是失败也不知道。
  3. 也考虑做直接的走Docker,k8s什么的,不过当时Docker暂时内部还没搞得那么成熟,尤其是在Yarn这种离线平台调度上。

这就引出了我们要这玩意是"卧槽"还是变成"哇塞"的话题了,我们其实是需要改造Spark以Cluster模式提交,同时满足实时交互作业进度和状态。

SparkLauncher引入

其实Spark本身已经对比较灵活的提交方式做了支持,按照惯例,我们一起读一读源码中对SparkLauncher的描述

/**
 * Launcher for Spark applications.
 * <p>
 * Use this class to start Spark applications programmatically. The class uses a builder pattern
 * to allow clients to configure the Spark application and launch it as a child process.
 * </p>
 */
public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
......

两点信息:

  1. 使用SparkLauncher类可以编程的方式启动Spark的应用,该类使用建造者模式进行构建
  2. 允许客户端作为一个子进程来配置使用

看到这个就开心了,可以编程嘛,丢到什么tomcat里面或者找个什么服务里面调用就是了,实际上就是提供了API的方式去提交Spark,然后想咋折腾咋折腾。

SparkLauncher使用

实际上就是编程的方式去组装参数,在SparkLauncher的package-info中给出了例子:

当然官方也有api文档,链接

一般情况下官网给的都是太简单了,没法运行,我这里给一个跑过的例子:


public class SparkLauncherTest {
    public static void main(String[] args) throws IOException, InterruptedException {
        HashMap env = new HashMap();
        //基础环境变量
       // env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc");
        //env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");

        CountDownLatch countDownLatch = new CountDownLatch(1);
        SparkAppHandle sparkAppHandle = new SparkLauncher(env)
                .setSparkHome("/usr/local/spark")
                .setAppResource("/usr/local/spark/spark-demo.jar")
                .setMainClass("oprg.apache.spark.Dummy")
                .setMaster("yarn")
                .setDeployMode("cluster")
                .setConf("spark.driver.memory", "8g")
                .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                    //这里监听任务状态,当任务结束时,isFinal()方法会返回true,否则返回false
                    @Override
                    public void stateChanged(SparkAppHandle sparkAppHandle) {
                        if (sparkAppHandle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                        System.out.println("state:" + sparkAppHandle.getState().toString());
                    }

                    @Override
                    public void infoChanged(SparkAppHandle sparkAppHandle) {
                        System.out.println("Info:" + sparkAppHandle.getState().toString());
                    }
                });
        System.out.println("The task is executing, please wait ....");
        //线程等待任务结束
        countDownLatch.await();
        System.out.println("The task is finished!");
    }
}

其实就是对应我们在命令行提交时候的参数,用JAVA代码组装而已。提交之后会生成一个进程,对应提交.

顺带说明一下,Spark参数有些是默认的,有些是自己给的,但是很多人并不知道可以真正传递哪些参数,这个事情其实也是体现在源代码里面的,在SparkSubmitArguments上面有直接的定义:

private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
  extends SparkSubmitArgumentsParser with Logging {
  var master: String = null
  var deployMode: String = null
  var executorMemory: String = null
  var executorCores: String = null
  var totalExecutorCores: String = null
  var propertiesFile: String = null
  var driverMemory: String = null
  ......

参数给过去其实就是这个类的成员变量,这样就不迷茫了。具体我们还能看到转化过程,当字典翻翻挺好的

override protected def handle(opt: String, value: String): Boolean = {
    opt match {
      case NAME =>
        name = value
      case MASTER =>
        master = value
      case CLASS =>
        mainClass = value
      case DEPLOY_MODE =>
        if (value != "client" && value != "cluster") {
          error("--deploy-mode must be either \"client\" or \"cluster\"")
        }
        deployMode = value
      case NUM_EXECUTORS =>
        numExecutors = value

我做的改造

我的目标其实是实现在Spark以Yarn Cluster提交,但是又可以在Agent端进行交互,对整体交互流程进行了重新设计:

核心改变主要是通过Yarn Cluster 运行,Agent和Mater通过远程RPC的方式进行消息交互,同时实现了 Agent部分获取作业的状态。

整体代码量很少,分别介绍:

  1. SQLApplicationDriver是运行在Container中作为Application Master角色存在,在SQLApplicationDriver中实现真正对SQL作业的提交和运行,所以作为一个Application,核心的逻辑其实就是创建SparkSession然后直接执行sql的操作。
  2. SparkAgentLauncher 会在agent端运行,他只是作为作业提交的代理,通过RPC的方式传输给SQLApplicationDriver,并且把作业的状态同步给调度系统,当然,提交方式就是用到了SparkAgentLauncher

  def runApplicationOnCluster(appArgs:SparkAgentArguments)={
    val sparkAgentLauncher=new SparkAgentLauncher()
    val actorSystemName = "SparSQLkAgent"
    val hostName = Utils.localHostName()
    val port = 0
    val sparkConf=new SparkConf()
    rpcEnv = RpcEnv.create(actorSystemName, hostName, port, sparkConf, new SecurityManager(sparkConf))
    sparkAgentEndpoint = rpcEnv.setupEndpoint(actorSystemName, new SparkAgentEndpoint(rpcEnv,sparkAgentLauncher))
    println(s"start SparSQLkAgent, hostPort= ${sparkAgentEndpoint.address}")
    val hostPort = sparkAgentEndpoint.address.hostPort

    val file =new JFile(appArgs.sqlFile)
    val launcher = new SparkLauncher
    val appJar=System.getProperty("app.resource.jar")
    launcher.setAppResource(appJar)
    val fileList=System.getProperty("app.resource.files").split(",")
    //attach file
    if(fileList.size>0){
      fileList.foreach(f=>launcher.addFile(f))
    }
    launcher.addFile(appArgs.sqlFile)
    val agentSubmitTime=System.currentTimeMillis()
    launcher.addAppArgs(file.getName,hostPort,agentSubmitTime.toString)
    println(s"app.resource.jar: ${appJar}")

    launcher.setMaster("yarn") // 在yarn-cluster上启动,也可以再local[*]上
    launcher.setMainClass("org.apache.spark.sql.yarn.SQLApplicationDriver")
    launcher.setDeployMode("cluster")
  1. SQLClientMessage 就是作为RPC的交互
object SQLClientMessage {

  case class  ReportSQLCommand(command:String) extends SQLClientMessage
  case class  ReportAppId(appId:String,url:String,agentSubmitTime:Long,launchTime:Long) extends SQLClientMessage
  case class  ReportStageProcess(process:ListBuffer[Progress]) extends SQLClientMessage
  case class  ReportJobProcess(process:ListBuffer[JobProgress]) extends SQLClientMessage
  case class  ErrorMessage(message:String) extends SQLClientMessage
  case class  InterruptMessage(message:String) extends SQLClientMessage
  case class  JobEndMessage(jobId:Int,message:String) extends SQLClientMessage
  case class RegisteredDriver(driver: RpcEndpointRef) extends SQLClientMessage

}
  1. Spark在启动的时候可以增加监听器,我们自己实现了监听器JobsListener,并且把Agent的Rpc的远程调用进行引入,这样就可以直接在agent拿到进度消息了
sparkSession.sparkContext.addSparkListener(new JobsListener(agentClientEndpoint))
  1. 一直在提RPC交互,整套是直接利用Spark自带的RPC(RpcAddress, RpcEndpointRef, RpcEnv)所以无需引入额外的依赖,我们把SQLApplicationDriver和SparkAgentLauncher分别作为Endpoint这样就在两端实现了远程交互了

整体实现了把作业推到远端执行啦!!!

后记

也算是实现一次定制化的源码开发吧,因为平时顶多打打补丁啥的,利用Spark自己的东西做点小改造还是有点意思的。另外补充点就是我还实现了进度条的功能,但是总觉得控制台打印的有点挫,因为有个RPC交互,所以会看到在一个进度上卡着的情况,小功能优化点还是比较多的~

目录
相关文章
|
13天前
|
存储 分布式计算 监控
Spark Standalone模式是一种集群部署方式
【6月更文挑战第17天】Spark Standalone模式是一种集群部署方式
23 7
|
2月前
|
机器学习/深度学习 分布式计算 并行计算
Spark 3.0 中的屏障执行模式_Spark的MPI时代来了
Spark 3.0 中的屏障执行模式_Spark的MPI时代来了
17 0
|
2月前
|
分布式计算 监控 Java
Note_Spark_Day02:Standalone集群模式和使用IDEA开发应用程序
Note_Spark_Day02:Standalone集群模式和使用IDEA开发应用程序
60 0
|
2月前
|
分布式计算 资源调度 监控
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
150 1
|
10月前
|
分布式计算 资源调度 监控
Spark Yarn模式部署集群
Spark Yarn模式部署集群
58 1
|
9月前
|
SQL 分布式计算 资源调度
分享一个 hive on spark 模式下使用 HikariCP 数据库连接池造成的资源泄露问题
分享一个 hive on spark 模式下使用 HikariCP 数据库连接池造成的资源泄露问题
|
9月前
|
分布式计算 资源调度 Kubernetes
大数据问题排查系列 - SPARK STANDALONE HA 模式的一个缺陷点与应对方案
大数据问题排查系列 - SPARK STANDALONE HA 模式的一个缺陷点与应对方案
|
9月前
|
分布式计算 资源调度 Hadoop
Spark on Yarn集群模式搭建及测试
Spark on Yarn集群模式搭建及测试
206 0
|
2月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
13天前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
115 59