利用SparkLauncher实现Spark Cluster模式下的远端交互

简介: 利用SparkLauncher实现Spark Cluster模式下的远端交互

前言

编者按——————————

前一阵子在技术小群有位同学在群里做了和SparkLauncher相关的问题,当然聊天信息还有前后左右,大致就是希望可以定制化Spark的提交。也是,Spark的官网其实一出来就是教我们用命令行怎么去提交,诸如命令行什么的,有下面这样的

./bin/run-example SparkPi 10

还有这样的

./bin/spark-submit examples/src/main/python/pi.py 10

这种搞搞学习还好,到了实际需求来说这这架势整不会了。当然除了本身有调度平台怼提交作业,许多场景期望在系统内部进行提交,我遇到的场景就是平台有页面生成DQC变成JAR包提交、还有就是算法平台也是流程化,这种时候比较迫切期望按照自己的需求比较灵活提交Spark的作业。

之前我也做过这块的功能,所以整个小文说道说道。

需求说明

我的需求背景是这样子的,调度平台原有的模式是通过Agent机器走Client方式提交Spark作业的:

在Agent处,其实直接就提交了,所以Master和Agent其实是在一台机器上

踩坑清单如下:

  1. 在Hive时代,Agent只是充当提交作业的角色,所以稳定分配个2G作业有的内存其实也满足,到了Spark时代整个就崩了,Spark的作业Master充当整个调度的作用,信息交互比较多,尤其是一些作业需要进行数据广播,这个时候经常把内存开到6G、8G、12G甚至不止,Agent机器负荷比较大所以作业比较容易失败。
  2. 我们其实发现了Spark本身支持Cluster模式,经常是Yarn上面有资源,但是由于Agent负荷高所以也没法提交。Cluster模式可以弥补这点,把Agent的压力进行转移,不过有个蛋疼的事情是Yarn上面不会有任何作业状态反馈,所以作业跑到多少了,成功还是失败也不知道。
  3. 也考虑做直接的走Docker,k8s什么的,不过当时Docker暂时内部还没搞得那么成熟,尤其是在Yarn这种离线平台调度上。

这就引出了我们要这玩意是"卧槽"还是变成"哇塞"的话题了,我们其实是需要改造Spark以Cluster模式提交,同时满足实时交互作业进度和状态。

SparkLauncher引入

其实Spark本身已经对比较灵活的提交方式做了支持,按照惯例,我们一起读一读源码中对SparkLauncher的描述

/**
 * Launcher for Spark applications.
 * <p>
 * Use this class to start Spark applications programmatically. The class uses a builder pattern
 * to allow clients to configure the Spark application and launch it as a child process.
 * </p>
 */
public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
......

两点信息:

  1. 使用SparkLauncher类可以编程的方式启动Spark的应用,该类使用建造者模式进行构建
  2. 允许客户端作为一个子进程来配置使用

看到这个就开心了,可以编程嘛,丢到什么tomcat里面或者找个什么服务里面调用就是了,实际上就是提供了API的方式去提交Spark,然后想咋折腾咋折腾。

SparkLauncher使用

实际上就是编程的方式去组装参数,在SparkLauncher的package-info中给出了例子:

当然官方也有api文档,链接

一般情况下官网给的都是太简单了,没法运行,我这里给一个跑过的例子:


public class SparkLauncherTest {
    public static void main(String[] args) throws IOException, InterruptedException {
        HashMap env = new HashMap();
        //基础环境变量
       // env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc");
        //env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");

        CountDownLatch countDownLatch = new CountDownLatch(1);
        SparkAppHandle sparkAppHandle = new SparkLauncher(env)
                .setSparkHome("/usr/local/spark")
                .setAppResource("/usr/local/spark/spark-demo.jar")
                .setMainClass("oprg.apache.spark.Dummy")
                .setMaster("yarn")
                .setDeployMode("cluster")
                .setConf("spark.driver.memory", "8g")
                .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                    //这里监听任务状态,当任务结束时,isFinal()方法会返回true,否则返回false
                    @Override
                    public void stateChanged(SparkAppHandle sparkAppHandle) {
                        if (sparkAppHandle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                        System.out.println("state:" + sparkAppHandle.getState().toString());
                    }

                    @Override
                    public void infoChanged(SparkAppHandle sparkAppHandle) {
                        System.out.println("Info:" + sparkAppHandle.getState().toString());
                    }
                });
        System.out.println("The task is executing, please wait ....");
        //线程等待任务结束
        countDownLatch.await();
        System.out.println("The task is finished!");
    }
}

其实就是对应我们在命令行提交时候的参数,用JAVA代码组装而已。提交之后会生成一个进程,对应提交.

顺带说明一下,Spark参数有些是默认的,有些是自己给的,但是很多人并不知道可以真正传递哪些参数,这个事情其实也是体现在源代码里面的,在SparkSubmitArguments上面有直接的定义:

private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
  extends SparkSubmitArgumentsParser with Logging {
  var master: String = null
  var deployMode: String = null
  var executorMemory: String = null
  var executorCores: String = null
  var totalExecutorCores: String = null
  var propertiesFile: String = null
  var driverMemory: String = null
  ......

参数给过去其实就是这个类的成员变量,这样就不迷茫了。具体我们还能看到转化过程,当字典翻翻挺好的

override protected def handle(opt: String, value: String): Boolean = {
    opt match {
      case NAME =>
        name = value
      case MASTER =>
        master = value
      case CLASS =>
        mainClass = value
      case DEPLOY_MODE =>
        if (value != "client" && value != "cluster") {
          error("--deploy-mode must be either \"client\" or \"cluster\"")
        }
        deployMode = value
      case NUM_EXECUTORS =>
        numExecutors = value

我做的改造

我的目标其实是实现在Spark以Yarn Cluster提交,但是又可以在Agent端进行交互,对整体交互流程进行了重新设计:

核心改变主要是通过Yarn Cluster 运行,Agent和Mater通过远程RPC的方式进行消息交互,同时实现了 Agent部分获取作业的状态。

整体代码量很少,分别介绍:

  1. SQLApplicationDriver是运行在Container中作为Application Master角色存在,在SQLApplicationDriver中实现真正对SQL作业的提交和运行,所以作为一个Application,核心的逻辑其实就是创建SparkSession然后直接执行sql的操作。
  2. SparkAgentLauncher 会在agent端运行,他只是作为作业提交的代理,通过RPC的方式传输给SQLApplicationDriver,并且把作业的状态同步给调度系统,当然,提交方式就是用到了SparkAgentLauncher

  def runApplicationOnCluster(appArgs:SparkAgentArguments)={
    val sparkAgentLauncher=new SparkAgentLauncher()
    val actorSystemName = "SparSQLkAgent"
    val hostName = Utils.localHostName()
    val port = 0
    val sparkConf=new SparkConf()
    rpcEnv = RpcEnv.create(actorSystemName, hostName, port, sparkConf, new SecurityManager(sparkConf))
    sparkAgentEndpoint = rpcEnv.setupEndpoint(actorSystemName, new SparkAgentEndpoint(rpcEnv,sparkAgentLauncher))
    println(s"start SparSQLkAgent, hostPort= ${sparkAgentEndpoint.address}")
    val hostPort = sparkAgentEndpoint.address.hostPort

    val file =new JFile(appArgs.sqlFile)
    val launcher = new SparkLauncher
    val appJar=System.getProperty("app.resource.jar")
    launcher.setAppResource(appJar)
    val fileList=System.getProperty("app.resource.files").split(",")
    //attach file
    if(fileList.size>0){
      fileList.foreach(f=>launcher.addFile(f))
    }
    launcher.addFile(appArgs.sqlFile)
    val agentSubmitTime=System.currentTimeMillis()
    launcher.addAppArgs(file.getName,hostPort,agentSubmitTime.toString)
    println(s"app.resource.jar: ${appJar}")

    launcher.setMaster("yarn") // 在yarn-cluster上启动,也可以再local[*]上
    launcher.setMainClass("org.apache.spark.sql.yarn.SQLApplicationDriver")
    launcher.setDeployMode("cluster")
  1. SQLClientMessage 就是作为RPC的交互
object SQLClientMessage {

  case class  ReportSQLCommand(command:String) extends SQLClientMessage
  case class  ReportAppId(appId:String,url:String,agentSubmitTime:Long,launchTime:Long) extends SQLClientMessage
  case class  ReportStageProcess(process:ListBuffer[Progress]) extends SQLClientMessage
  case class  ReportJobProcess(process:ListBuffer[JobProgress]) extends SQLClientMessage
  case class  ErrorMessage(message:String) extends SQLClientMessage
  case class  InterruptMessage(message:String) extends SQLClientMessage
  case class  JobEndMessage(jobId:Int,message:String) extends SQLClientMessage
  case class RegisteredDriver(driver: RpcEndpointRef) extends SQLClientMessage

}
  1. Spark在启动的时候可以增加监听器,我们自己实现了监听器JobsListener,并且把Agent的Rpc的远程调用进行引入,这样就可以直接在agent拿到进度消息了
sparkSession.sparkContext.addSparkListener(new JobsListener(agentClientEndpoint))
  1. 一直在提RPC交互,整套是直接利用Spark自带的RPC(RpcAddress, RpcEndpointRef, RpcEnv)所以无需引入额外的依赖,我们把SQLApplicationDriver和SparkAgentLauncher分别作为Endpoint这样就在两端实现了远程交互了

整体实现了把作业推到远端执行啦!!!

后记

也算是实现一次定制化的源码开发吧,因为平时顶多打打补丁啥的,利用Spark自己的东西做点小改造还是有点意思的。另外补充点就是我还实现了进度条的功能,但是总觉得控制台打印的有点挫,因为有个RPC交互,所以会看到在一个进度上卡着的情况,小功能优化点还是比较多的~

目录
相关文章
|
1月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
149 6
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
64 2
|
3月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
251 3
|
4月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
存储 分布式计算 监控
Spark Standalone模式是一种集群部署方式
【6月更文挑战第17天】Spark Standalone模式是一种集群部署方式
70 7
|
6月前
|
机器学习/深度学习 分布式计算 并行计算
Spark 3.0 中的屏障执行模式_Spark的MPI时代来了
Spark 3.0 中的屏障执行模式_Spark的MPI时代来了
39 0
|
6月前
|
分布式计算 资源调度 监控
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
199 1
|
18天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
52 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0