Spark从入门到入土(二):任务提交(上)

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: Spark从入门到入土(二):任务提交

spark任务提交有三种方式


1:通过local方式提交


2:通过spark-submit脚本提交到集群


3:通过spark提交的API SparkLauncher提交到集群,这种方式可以将提交过程集成到我们的spring工程中,更加灵活


先来看一下spark架构,可以帮助理解任务的提交

image.png

任务提交


驱动程序:执行应用程序main方法的进程


集群管理器:启动执行器节点,有Mesos、YARN(Hadoop)、独立集群管理器(spark自带的集群管理器),在standalone模式中即为Master主节点。


执行器节点:工作进程,负责在spark作业中运行任务


过程大概如下


①:执行器节点(工作节点)在启动时会向驱动器注册自己

②:用户提交任务,驱动器调用main方法,驱动器与集群管理器通信申请执行器资源

③:集群管理器为驱动器程序启动执行器节点

④:驱动器程序执行应用程序,将任务发送到工作节点

⑤:工作节点进行计算并保存结果

⑥:驱动器main方法退出,通过集群管理器释放资源


注意:在客户端模式下,spark-submit 会将驱动器程序运行 在 spark-submit 被调用的这台机器上。在集群模式下,驱动器程序会被传输并执行 于集群的一个工作节点上


一、本地方式提交


//该代码是对企业架构下的不同级别的海量告警信息进行离线统计,按部门、日期、级别进行分组统计
public static void main(String[] args) {
        logger.info("开始执行spark任务");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
        SparkConf conf = new SparkConf()
                .setAppName("离线统计")
                //.setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
                .setMaster("local")
                .set("spark.mongodb.input.uri", MongoConfig.SPARK_MONGODB_URL_PREFIX + BaseConstant.ALARM_SOURCE_TABLE)
                .set("spark.mongodb.output.uri", MongoConfig.SPARK_MONGODB_URL_PREFIX + BaseConstant
                        .ALARM_TARGET_TABLE);
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaMongoRDD<Document> lines = MongoSpark.load(jsc).withPipeline(
                singletonList(
                        getCondition()
                )
        );
        //按部门时间分组计算
        JavaPairRDD<String, AlarmStatisticBean> pairs =
                lines.filter((Function<Document, Boolean>) line -> {
                    //代码略掉
                    //过滤函数,true:不过滤,false:过滤
                    return true;
                }).mapToPair( 
                        //对RDD中的每个元素调用指定函数,并返回<String, AlarmStatisticBean>类型的对象
                        //键值对, key: orgId_day  value: level[]   例:1_20190101, [0,1,0]
                        (PairFunction<Document, String, AlarmStatisticBean>) line -> {
                            Long orgId = line.getLong("orgId");
                            String statisticDate = sdf.format(line.getLong("createTimestamp") * 1000);
                            AlarmStatisticBean bean = new AlarmStatisticBean();
                            Long level = line.getLong("levelDictId");
                            if (level == AlarmTypeEnum.LEVEL1.getType()) {
                                bean.setLevel1Count(1);
                            }
                            if (level == AlarmTypeEnum.LEVEL2.getType()) {
                                bean.setLevel2Count(1);
                            }
                            if (level == AlarmTypeEnum.LEVEL3.getType()) {
                                bean.setLevel3Count(1);
                            }
                            bean.setOrgId(orgId.intValue());
                            bean.setDay(statisticDate);
                            String code = orgId + "_" + statisticDate;
                            return new Tuple2<>(code, bean);
                        }
                        //分组多列求和 1_20190101, [x,y,z]
                ).reduceByKey((Function2<AlarmStatisticBean, AlarmStatisticBean, AlarmStatisticBean>) (v1, v2) -> {
                   //reduceByKey的作用是合并具有相同键的值
                    v1.setLevel1Count(v1.getLevel1Count() + v2.getLevel1Count());
                    v1.setLevel2Count(v1.getLevel2Count() + v2.getLevel2Count());
                    v1.setLevel3Count(v1.getLevel3Count() + v2.getLevel3Count());
                    return v1;
                });
        logger.info("------------------------->>>>>" + pairs.count());
        List<Document> documents = new ArrayList<>();
        //类型转换,以便持久化到DB(mongo或mysql)
        for (Tuple2<String, AlarmStatisticBean> tuple2 : pairs.collect()) {
            Document document = new Document("day", tuple2._2.getDay())
                    .append("level1Count", tuple2._2.getLevel1Count())
                    .append("level2Count", tuple2._2.getLevel2Count())
                    .append("level3Count", tuple2._2.getLevel3Count())
                    .append("orgId", tuple2._2.getOrgId());
            documents.add(document);
        }
        MongoManager.saveToMongo(documents, BaseConstant.ALARM_TARGET_TABLE);
    }


直接运行上述main方法会将AlarmStatisticBean对象保存到MongoDB中


二、spark-submit脚本


提交到集群时,需要注释掉local

SparkConf conf = new SparkConf()
                .setAppName("离线统计")
                .setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
                //.setMaster("local")
...

//带*的是公司名或项目名称,不影响阅读
spark-submit --class com.*.*.meter.alarm.AlarmStatisticService --master spark://master:7077 /opt/middleware/*-alarm-task-1.0-jar-with-dependencies.ja


--master spark:// 表示会使用独立模式,也就是使用spark自带的独立集群管理器。提交时使用的主机名和端口精确匹配用户页面中的URL,这里建议直接从http://172...6:8080页面上复制URL,避免不必要的麻烦。


三、SparkLauncher提交


SparkLauncher也提供了两种方式提交任务


3.1、launch


SparkLauncher实际上是根据JDK自带的ProcessBuilder构造了一个UNIXProcess子进程提交任务,提交的形式跟spark-submit一样。这个子进程会以阻塞的方式等待程序的运行结果。简单来看就是拼接spark-submit命令,并以子进程的方式启动。


代码中的process.getInputStream()实际上对应linux进程的标准输出stdout

process.getErrorStream()实际上对应linux进程的错误信息stderr

process.getOutputStream()实际上对应linux进程的输入信息stdin

@Scheduled(fixedRate = 5000 * 60)
    public void alarmStatistic() {
        logger.info("=====>>>>>离线统计定时任务!", System.currentTimeMillis());
        try {
            HashMap env = new HashMap();
            //这两个属性必须设置
            env.put("HADOOP_CONF_DIR", CommonConfig.HADOOP_CONF_DIR);
            env.put("JAVA_HOME", CommonConfig.JAVA_HOME);
            SparkLauncher handle = new SparkLauncher(env)
                    .setSparkHome(SparkConfig.SPARK_HOME)
                    .setAppResource(CommonConfig.ALARM_JAR_PATH)
                    .setMainClass(CommonConfig.ALARM_JAR_MAIN_CLASS)
                    .setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
                    .setDeployMode(SparkConfig.SPARK_DEPLOY_MODE)
                    .setVerbose(SparkConfig.SPARK_VERBOSE)
                    .setConf("spark.app.id", CommonConfig.ALARM_APP_ID)
                    .setConf("spark.driver.memory", SparkConfig.SPARK_DRIVER_MEMORY)
                    .setConf("spark.rpc.message.maxSize", SparkConfig.SPARK_RPC_MESSAGE_MAXSIZE)
                    .setConf("spark.executor.memory", SparkConfig.SPARK_EXECUTOR_MEMORY)
                    .setConf("spark.executor.instances", SparkConfig.SPARK_EXECUTOR_INSTANCES)
                    .setConf("spark.executor.cores", SparkConfig.SPARK_EXECUTOR_CORES)
                    .setConf("spark.default.parallelism", SparkConfig.SPARK_DEFAULT_PARALLELISM)
                    .setConf("spark.driver.allowMultipleContexts", SparkConfig.SPARK_DRIVER_ALLOWMULTIPLECONTEXTS)
                    .setVerbose(true);
            Process process = handle.launch();
            InputStreamRunnable inputStream = new InputStreamRunnable(process.getInputStream(), "alarm task input");
            ExecutorUtils.getExecutorService().submit(inputStream);
            InputStreamRunnable errorStream = new InputStreamRunnable(process.getErrorStream(), "alarm task error");
            ExecutorUtils.getExecutorService().submit(errorStream);
            logger.info("Waiting for finish...");
            int exitCode = process.waitFor();
            logger.info("Finished! Exit code:" + exitCode);
        } catch (Exception e) {
            logger.error("submit spark task error", e);
        }
    }
运行过程示意图


image.png

运行过程


相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
2月前
|
存储 缓存 分布式计算
Spark任务OOM问题如何解决?
大家好,我是V哥。在实际业务中,Spark任务常因数据量过大、资源分配不合理或代码瓶颈导致OOM(Out of Memory)。本文详细分析了各种业务场景下的OOM原因,并提供了优化方案,包括调整Executor内存和CPU资源、优化内存管理策略、数据切分及减少宽依赖等。通过综合运用这些方法,可有效解决Spark任务中的OOM问题。关注威哥爱编程,让编码更顺畅!
201 3
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
47 5
|
6月前
|
分布式计算 运维 Serverless
EMR Serverless Spark PySpark流任务体验报告
阿里云EMR Serverless Spark是一款全托管的云原生大数据计算服务,旨在简化数据处理流程,降低运维成本。测评者通过EMR Serverless Spark提交PySpark流任务,体验了从环境准备、集群创建、网络连接到任务管理的全过程。通过这次测评,可以看出阿里云EMR Serverless Spark适合有一定技术基础的企业,尤其是需要高效处理大规模数据的场景,但新用户需要投入时间和精力学习和适应。
7187 43
EMR Serverless Spark PySpark流任务体验报告
|
3月前
|
SQL 机器学习/深度学习 分布式计算
Spark适合处理哪些任务?
【9月更文挑战第1天】Spark适合处理哪些任务?
194 3
|
4月前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
|
5月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
426 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
5月前
|
分布式计算 运维 Serverless
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。
285 7
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
|
4月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
219 0
|
6月前
|
弹性计算 分布式计算 DataWorks
DataWorks产品使用合集之spark任务如何跨空间取表数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
43 1