DataWorks_数据开发_EMR Spark节点_计算Pi和对接MaxCompute案例

本文涉及的产品
对象存储 OSS,20GB 3个月
大数据开发治理平台DataWorks,Serverless资源组抵扣包300CU*H
对象存储 OSS,恶意文件检测 1000次 1年
简介: DataWorks_数据开发_EMR Spark节点 1)计算Pi;2)对接MaxCompute。

示例一:spark自带示例项目SparkPi:计算Pi

本文以Spark自带示例项目计算Pi为例测试当前EMR Spark环境是否可用,示例详情请参见EMR示例项目使用说明


准备工作:

获取spark自带example的jar包spark-examples_2.11-2.4.5.jar存放路径,spark组件安装在/usr/lib/spark-current路径下,登录EMR集群可查询全路径/usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,详情可参见EMR常用文件路径

44CC3525-315D-4449-B30B-0373623B391A.png


执行任务:

新建EMR Spark节点,提交运行代码。仅需填写spark-submit后面部分的内容,作业提交会自动补全。

提交代码:

--class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100

实际执行

# spark-submit [options] --class [MainClass] xxx.jar argsspark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100


查看结果:

返回结果1097: Pi is roughly 3.1415547141554714,运行成功,环境可用。

image.png


示例二:Spark对接MaxCompute

以Spark对接MaxCompute为例,实现通过Spark统计MaxCompute表行数。更多应用场景可见EMR Spark开发指南

本示例涉及云产品:绑定EMR引擎和MaxCompute引擎的DataWorks项目、OSS。


准备测试数据:

在DataWorks数据开发新建odps sql节点,执行建表和插入数据语句,第一列为bigint类型,插入2条记录。

DROPTABLE IF EXISTS emr_spark_read_odpstable ;CREATETABLE IF NOT EXISTS emr_spark_read_odpstable 
(    id BIGINT,name STRING
);INSERTINTOTABLE emr_spark_read_odpstable VALUES(111,'zhangsan'),(222,'lisi');


本地开发:

创建Maven工程,添加pom依赖,详情请参见Spark准备工作

<dependency><groupId>com.aliyun.emr</groupId><artifactId>emr-maxcompute_2.11</artifactId><version>1.9.0</version></dependency>


插件部分仅供参考。

<build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><configuration><recompileMode>incremental</recompileMode></configuration><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin></plugins></build>


编写内容:

实现在Spark对MaxCompute表第一列Bigint类型行数统计,详情请参见Spark对接MaxCompute。完成后打jar包,有关odps的依赖都属于第三方包,所以也需要一起打包上传到集群。

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/packagecom.aliyun.emr.example.sparkimportcom.aliyun.odps.TableSchemaimportcom.aliyun.odps.data.Recordimportorg.apache.spark.aliyun.odps.OdpsOpsimportorg.apache.spark.{SparkConf, SparkContext}
objectSparkMaxComputeDemo {
defmain(args: Array[String]): Unit= {
if (args.length<6) {
System.err.println(
"""Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>||Arguments:
||accessKeyIdAliyunAccessKeyID.
|accessKeySecretAliyunKeySecret.
|envType0or1|0: Publicenvironment.
|1: Aliyuninternalenvironment, i.e. AliyunECSetc.
|projectAliyunODPSproject|tableAliyunODPStable|numPartitionsthenumberofRDDpartitions""".stripMargin)System.exit(1)
    }
valaccessKeyId=args(0)
valaccessKeySecret=args(1)
valenvType=args(2).toIntvalproject=args(3)
valtable=args(4)
valnumPartitions=args(5).toIntvalurls=Seq(
Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environmentSeq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment    )
valconf=newSparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
valsc=newSparkContext(conf)
valodpsOps=envTypematch {
case0=>OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
case1=>OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
    }
valodpsData=odpsOps.readTable(project, table, read, numPartitions)
println(s"Count (odpsData): ${odpsData.count()}")
  }
defread(record: Record, schema: TableSchema): Long= {
record.getBigint(0)
  }
}


上传运行资源:

登录OSS控制台,在指定路径下上传jar资源(首次使用需要一键授权,详情请参见emr mr节点中的一键授权)。

本示例在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下上传emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar。

注:由于DataWorks EMR 资源上限是50M,而带依赖的包通常大于50m,所以直接在OSS控制台上传。如果您的资源小于50M也可以在DataWorks上操作创建和使用EMR JAR资源

60817947-ABA2-45e8-A261-205B76187381.png


创建EMR JAR资源:

本示例创建emr_spark_demo-1.0-SNAPSHOT.jar资源,上传上文打好的jar包,存储在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下(首次使用需要一键授权),提交资源,提交后可前往OSS管控台查看。详情请参见创建和使用EMR JAR资源

3ED0B27C-A7A2-4dd5-B54F-BAA68BB6E847.png


创建并执行EMR Spark节点:

本示例在业务流程的EMR数据开发模块下右键新建EMR Spark节点命名为emr_spark_odps,选择EMR引擎实例,提交如下代码,点击高级运行。

其中参数信息Arguments     需要替换为实际使用的相关信息。

提交代码:

--class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1

08D2575F-BC12-440c-BDC5-5B7287A83DCF.png


查看结果:

查看日志,表记录数为2符合预期。

F2F2D4D1-E44A-40b5-BE55-8D7BDBBF4A8D.png




相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标 &nbsp;通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群 &nbsp;企业数据仓库开发人员 &nbsp;大数据平台开发人员 &nbsp;数据分析师 &nbsp;大数据运维人员 &nbsp;对于大数据平台、数据中台产品感兴趣的开发者
相关文章
|
1月前
|
数据采集 运维 DataWorks
DataWorks on EMR StarRocks,打造标准湖仓新范式
本文整理自阿里云计算平台产品专家周硕(簌篱)在阿里云DataWorks on EMR StarRocks解决方案介绍中的分享。介绍了阿里云DataWorks与EMR Serverless StarRocks的结合使用,详细阐述了在数据同步、数据消费、数据治理三大场景中的核心能力。DataWorks作为大数据开发治理平台,提供了从数据建模、数据集成、数据开发到数据治理的全链路解决方案,结合StarRocks的高性能分析能力,帮助企业实现OLAP分析、湖仓一体开发及数据综合治理,满足复杂业务场景下的需求,提升数据处理和分析效率。
|
1月前
|
SQL 人工智能 DataWorks
DataWorks:新一代 Data+AI 数据开发与数据治理平台演进
本文介绍了阿里云 DataWorks 在 DA 数智大会 2024 上的最新进展,包括新一代智能数据开发平台 DataWorks Data Studio、全新升级的 DataWorks Copilot 智能助手、数据资产治理、全面云原生转型以及更开放的开发者体验。这些更新旨在提升数据开发和治理的效率,助力企业实现数据价值最大化和智能化转型。
|
1月前
|
分布式计算 DataWorks 数据处理
"DataWorks高级技巧揭秘:手把手教你如何在PyODPS节点中将模型一键写入OSS,实现数据处理的完美闭环!"
【10月更文挑战第23天】DataWorks是企业级的云数据开发管理平台,支持强大的数据处理和分析功能。通过PyODPS节点,用户可以编写Python代码执行ODPS任务。本文介绍了如何在DataWorks中训练模型并将其保存到OSS的详细步骤和示例代码,包括初始化ODPS和OSS服务、读取数据、训练模型、保存模型到OSS等关键步骤。
117 3
|
3月前
|
SQL 人工智能 DataWorks
【云栖实录】DataWorks:新一代智能湖仓一体数据开发与治理平台
在9月21日的云栖大会上,DataWorks发布了新一代智能湖仓一体数据开发与治理平台。DataWorks历经Kubernetes改造与云原生调度系统的优化,实现了资源组全面Serverless化,降低了使用成本,最高可节省40%。新推出的DataWorks Data Studio,支持多种计算引擎,提供更开放的云原生WebIDE,提升开发效率。DataWorks Copilot智能助手也得到升级,支持多种SQL方言和Python代码生成,平均提升数据开发效率35%。此外,DataWorks还推出了全方位的数据资产治理体系,涵盖业务和技术视角,助力企业实现数据智能化管理和转型。
【云栖实录】DataWorks:新一代智能湖仓一体数据开发与治理平台
|
3月前
|
SQL 机器学习/深度学习 分布式计算
dataworks节点任务
在DataWorks中,你可以通过拖拽节点以及连线来构建复杂的工作流,这样可以方便地管理多个任务之间的依赖关系。此外,DataWorks还提供了调度功能,使得这些任务可以在设定的时间自动执行。这对于构建自动化、定时的数据处理管道非常有用。
74 5
|
4月前
|
DataWorks 关系型数据库 MySQL
DataWorks产品使用合集之mysql节点如何插入数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
分布式计算 DataWorks NoSQL
DataWorks产品使用合集之怎么设置在归并节点传递参数
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之如何计算两个时间点的表行数差异
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理在DI节点同步到OceanBase数据库时,出现SQLException: Not supported feature or function
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
分布式计算 DataWorks Java
DataWorks操作报错合集之CDH节点上传jar包时遇到报错,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。

热门文章

最新文章

  • 1
    DataWorks操作报错合集之DataWorks任务异常 报错: GET_GROUP_SLOT_EXCEPTION 该怎么处理
    123
  • 2
    DataWorks操作报错合集之DataWorksUDF 报错:evaluate for user defined function xxx cannot be loaded from any resources,该怎么处理
    123
  • 3
    DataWorks操作报错合集之在DataWorks中,任务流在调度时间到达时停止运行,是什么原因导致的
    117
  • 4
    DataWorks操作报错合集之DataWorks ODPS数据同步后,timesramp遇到时区问题,解决方法是什么
    100
  • 5
    DataWorks操作报错合集之DataWorks配置参数在开发环境进行调度,参数解析不出来,收到了 "Table does not exist" 的错误,该怎么处理
    110
  • 6
    DataWorks操作报错合集之DataWorks中udf开发完后,本地和在MaxCompute的工作区可以执行函数查询,但是在datawork里报错FAILED: ODPS-0130071:[2,5],是什么原因
    120
  • 7
    DataWorks操作报错合集之DataWorks提交失败: 提交节点的源码内容到TSP(代码库)失败:"skynet_packageid is null,该怎么解决
    125
  • 8
    DataWorks操作报错合集之DataWorks在同步mysql时报错Code:[Framework-02],mysql里面有个json类型字段,是什么原因导致的
    169
  • 9
    DataWorks操作报错合集之DataWorks集成实例绑定到同一个vpc下面,也添加了RDS的IP白名单报错:数据源配置有误,请检查,该怎么处理
    93
  • 10
    DataWorks操作报错合集之在 DataWorks 中运行了一个 Hologres 表的任务并完成了执行,但是在 Hologres 表中没有看到数据,该怎么解决
    133