开发者社区> 问答> 正文

在单个EMR群集中调用多个spark作业

宋淑婷 2019-04-23 11:27:24 162

我想在单个EMR集群中使用spark-submit调用多个spark作业。EMR支持这个吗?怎么做到这一点?此时我使用AWS Lambda为我的spark作业调用EMR作业,但我们希望扩展到单个EMR集群中的多个spark作业。

分布式计算 Spark
分享到
取消 提交回答
全部回答(1)
  • 宋淑婷
    2019-07-17 23:34:03

    您可以在一个EMR上顺序运行多个spark作业 - 也就是说,下一个作业将在上一个作业完成后启动。这是使用EMR步骤完成的。

    我使用Java SDK来运行它,但您可以在本文档中看到如何仅使用CLI添加步骤。

    我下面的代码使用spark-submit,但它不会像在CLI中运行那样直接运行。相反,我将其作为shell脚本运行,并包含HADOOP_USER_NAME的环境变量,因此spark作业在我指定的用户名下运行。如果要在登录到EMR的用户名(默认为hadoop)下运行作业,可以跳过它。

    在下面的代码摘录中,对象emr是类型AmazonElasticMapReduce,在sdk中提供。如果您使用CLI方法,则不需要它。

    一些辅助方法是uploadConfFile不言自明的。我用了一个广泛的配置为spark应用程序,不同的是files和jars其可以是本地或S3 / HDFS,配置文件必须在EMR本身的本地文件。

    完成后,您将在EMR集群上创建一个步骤,该步骤将启动新的spark应用程序。您可以在EMR上指定许多步骤,这些步骤将一个接一个地运行。

    //Upload the spark configuration you wish to use to a local file
    uploadConfFile(clusterId, sparkConf, confFileName);

    //create a list of arguments - which is the complete command for spark-submit
    List stepargs = new ArrayList();
    //start with an envelope to specify the hadoop user name
    stepargs.add("/bin/sh");
    stepargs.add("-c");
    //call to spark-submit with the incantation stating its arguments are provided next.
    stepargs.add("HADOOP_USER_NAME="+task.getUserName()+" spark-submit "$@"");
    stepargs.add("sh");
    //add the spark-submit arguments
    stepargs.add("--class");
    stepargs.add(mainClass);
    stepargs.add("--deploy-mode");
    stepargs.add("cluster");
    stepargs.add("--master");
    stepargs.add("yarn");
    stepargs.add("--files");
    //a comma-separated list of file paths in s3
    stepargs.add(files);
    stepargs.add("--jars");
    //a comma-separated list of file paths in s3
    stepargs.add(jars);
    stepargs.add("--properties-file");
    //the file we uploaded to the EMR, with its full path
    stepargs.add(confFileName);
    stepargs.add(jar);
    //add the jar specific arguments in here

    AddJobFlowStepsResult result = emr.addJobFlowSteps(new AddJobFlowStepsRequest()

    .withJobFlowId(clusterId)
    .withSteps(new StepConfig()
            .withName(name)
            .withActionOnFailure(ActionOnFailure.CONTINUE)
            .withHadoopJarStep(new HadoopJarStepConfig()
                    .withJar("command-runner.jar")
                    .withArgs(stepargs))));
    0 0
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题
推荐课程