DataWorks_数据开发_EMR Spark节点_计算Pi和对接MaxCompute案例

本文涉及的产品
对象存储 OSS,20GB 3个月
大数据开发治理平台DataWorks,资源组抵扣包 750CU*H
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: DataWorks_数据开发_EMR Spark节点 1)计算Pi;2)对接MaxCompute。

示例一:spark自带示例项目SparkPi:计算Pi

本文以Spark自带示例项目计算Pi为例测试当前EMR Spark环境是否可用,示例详情请参见EMR示例项目使用说明


准备工作:

获取spark自带example的jar包spark-examples_2.11-2.4.5.jar存放路径,spark组件安装在/usr/lib/spark-current路径下,登录EMR集群可查询全路径/usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,详情可参见EMR常用文件路径

44CC3525-315D-4449-B30B-0373623B391A.png


执行任务:

新建EMR Spark节点,提交运行代码。仅需填写spark-submit后面部分的内容,作业提交会自动补全。

提交代码:

--class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100

实际执行

# spark-submit [options] --class [MainClass] xxx.jar argsspark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100


查看结果:

返回结果1097: Pi is roughly 3.1415547141554714,运行成功,环境可用。

image.png


示例二:Spark对接MaxCompute

以Spark对接MaxCompute为例,实现通过Spark统计MaxCompute表行数。更多应用场景可见EMR Spark开发指南

本示例涉及云产品:绑定EMR引擎和MaxCompute引擎的DataWorks项目、OSS。


准备测试数据:

在DataWorks数据开发新建odps sql节点,执行建表和插入数据语句,第一列为bigint类型,插入2条记录。

DROPTABLE IF EXISTS emr_spark_read_odpstable ;CREATETABLE IF NOT EXISTS emr_spark_read_odpstable 
(    id BIGINT,name STRING
);INSERTINTOTABLE emr_spark_read_odpstable VALUES(111,'zhangsan'),(222,'lisi');


本地开发:

创建Maven工程,添加pom依赖,详情请参见Spark准备工作

<dependency><groupId>com.aliyun.emr</groupId><artifactId>emr-maxcompute_2.11</artifactId><version>1.9.0</version></dependency>


插件部分仅供参考。

<build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><configuration><recompileMode>incremental</recompileMode></configuration><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin></plugins></build>


编写内容:

实现在Spark对MaxCompute表第一列Bigint类型行数统计,详情请参见Spark对接MaxCompute。完成后打jar包,有关odps的依赖都属于第三方包,所以也需要一起打包上传到集群。

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/packagecom.aliyun.emr.example.sparkimportcom.aliyun.odps.TableSchemaimportcom.aliyun.odps.data.Recordimportorg.apache.spark.aliyun.odps.OdpsOpsimportorg.apache.spark.{SparkConf, SparkContext}
objectSparkMaxComputeDemo {
defmain(args: Array[String]): Unit= {
if (args.length<6) {
System.err.println(
"""Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>||Arguments:
||accessKeyIdAliyunAccessKeyID.
|accessKeySecretAliyunKeySecret.
|envType0or1|0: Publicenvironment.
|1: Aliyuninternalenvironment, i.e. AliyunECSetc.
|projectAliyunODPSproject|tableAliyunODPStable|numPartitionsthenumberofRDDpartitions""".stripMargin)System.exit(1)
    }
valaccessKeyId=args(0)
valaccessKeySecret=args(1)
valenvType=args(2).toIntvalproject=args(3)
valtable=args(4)
valnumPartitions=args(5).toIntvalurls=Seq(
Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environmentSeq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment    )
valconf=newSparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
valsc=newSparkContext(conf)
valodpsOps=envTypematch {
case0=>OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
case1=>OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
    }
valodpsData=odpsOps.readTable(project, table, read, numPartitions)
println(s"Count (odpsData): ${odpsData.count()}")
  }
defread(record: Record, schema: TableSchema): Long= {
record.getBigint(0)
  }
}


上传运行资源:

登录OSS控制台,在指定路径下上传jar资源(首次使用需要一键授权,详情请参见emr mr节点中的一键授权)。

本示例在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下上传emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar。

注:由于DataWorks EMR 资源上限是50M,而带依赖的包通常大于50m,所以直接在OSS控制台上传。如果您的资源小于50M也可以在DataWorks上操作创建和使用EMR JAR资源

60817947-ABA2-45e8-A261-205B76187381.png


创建EMR JAR资源:

本示例创建emr_spark_demo-1.0-SNAPSHOT.jar资源,上传上文打好的jar包,存储在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下(首次使用需要一键授权),提交资源,提交后可前往OSS管控台查看。详情请参见创建和使用EMR JAR资源

3ED0B27C-A7A2-4dd5-B54F-BAA68BB6E847.png


创建并执行EMR Spark节点:

本示例在业务流程的EMR数据开发模块下右键新建EMR Spark节点命名为emr_spark_odps,选择EMR引擎实例,提交如下代码,点击高级运行。

其中参数信息Arguments     需要替换为实际使用的相关信息。

提交代码:

--class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1

08D2575F-BC12-440c-BDC5-5B7287A83DCF.png


查看结果:

查看日志,表记录数为2符合预期。

F2F2D4D1-E44A-40b5-BE55-8D7BDBBF4A8D.png




相关实践学习
基于Hologres轻量实时的高性能OLAP分析
本教程基于GitHub Archive公开数据集,通过DataWorks将GitHub中的项⽬、行为等20多种事件类型数据实时采集至Hologres进行分析,同时使用DataV内置模板,快速搭建实时可视化数据大屏,从开发者、项⽬、编程语⾔等多个维度了解GitHub实时数据变化情况。
相关文章
|
4月前
|
SQL DataWorks 大数据
DataWorks x 婚礼纪:智能一站式数据开发治理平台让千万新人的幸福时刻“数智化”
婚礼纪是杭州火烧云科技推出的结婚服务平台,覆盖婚宴酒店、婚纱摄影等全产业链,年服务超2000万对新人。为应对海量数据处理挑战,婚礼纪选择阿里云DataWorks作为一站式大数据开发治理平台,解决数据血缘不清、指标口径混乱等问题。通过湖仓一体架构与全链路数据治理,实现多源异构数据高效整合,支撑精准营销、交易风控等核心场景。DataWorks新版数据开发Data Studio大幅提升开发效率,Copilot智能助手优化SQL代码生成与测试,助力婚礼纪构建数据驱动的结婚产业服务中枢。
|
5月前
|
人工智能 自然语言处理 DataWorks
DataWorks Copilot 集成Qwen3-235B-A22B混合推理模型,数据开发与分析效率再升级!
阿里云DataWorks平台正式接入Qwen3模型,支持最大235B参数量。用户可通过DataWorks Copilot智能助手调用该模型,以自然语言交互实现代码生成、优化、解释及纠错等功能,大幅提升数据开发与分析效率。Qwen3作为最新一代大语言模型,具备混合专家(MoE)和稠密(Dense)架构,适应多种应用场景,并支持MCP协议优化复杂任务处理。目前,用户可通过DataWorks Data Studio新版本体验此功能。
488 23
DataWorks Copilot 集成Qwen3-235B-A22B混合推理模型,数据开发与分析效率再升级!
|
5月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
8月前
|
SQL 分布式计算 DataWorks
活动实践 | DataWorks智能交互式数据开发与分析之旅
本指南介绍了如何使用阿里云平台进行大数据开发与分析。首先,在MaxCompute控制台创建项目并配置计算资源;接着,通过DataWorks控制台创建工作空间和独享资源组,并绑定工作空间。然后,创建个人开发环境,载入案例并新建Notebook实例。在Notebook中,通过SQL和Python Cell进行交互式开发和数据分析,体验智能助手Copilot的功能,如SQL改写、解释、生成注释及智能建表。最后,清理所有创建的资源,包括删除DataWorks资源、MaxCompute项目及网络配置,确保环境整洁。
|
7月前
|
人工智能 自然语言处理 DataWorks
DataWorks X DeepSeek : 用AI实现数据开发治理!
阿里云DataWorks正式接入DeepSeek-R1系列模型,用户可通过DataWorks Copilot智能助手,以自然语言交互完成代码操作,实现数据开发、分析与治理全流程。DataWorks内置阿里巴巴16年大数据建设方法论,支持多种大数据引擎和AI计算服务,助力“Data+AI”全生命周期管理。开通DataWorks后即可免费体验DataWorks Copilot。
|
8月前
|
数据采集 机器学习/深度学习 DataWorks
DataWorks产品评测:大数据开发治理的深度体验
DataWorks产品评测:大数据开发治理的深度体验
366 1
|
9月前
|
SQL 人工智能 自然语言处理
DataWorks年度发布:智能化湖仓一体数据开发与治理平台的演进
阿里云在过去15年中持续为268集团提供数据服务,积累了丰富的实践经验,并连续三年在IDC中国数据治理市场份额中排名第一。新一代智能数据开发平台DateWorks推出了全新的DateStudio IDE,支持湖仓一体化开发,新增Flink计算引擎和全面适配locs,优化工作流程系统和数据目录管理。同时,阿里云正式推出个人开发环境模式和个人Notebook,提升开发者体验和效率。此外,DateWorks Copilot通过自然语言生成SQL、代码补全等功能,显著提升了数据开发与分析的效率,已累计帮助开发者生成超过3200万行代码。
|
9月前
|
人工智能 Cloud Native 大数据
DataWorks深度技术解读:构建开放的云原生数据开发平台
Dateworks是一款阿里云推出的云原生数据处理产品,旨在解决数据治理和数仓管理中的挑战。它强调数据的准确性与一致性,确保商业决策的有效性。然而,严格的治理模式限制了开发者的灵活性,尤其是在面对多模态数据和AI应用时。为应对这些挑战,Dateworks进行了重大革新,包括云原生化、开放性增强及面向开发者的改进。通过Kubernetes作为资源底座,Dateworks实现了更灵活的任务调度和容器化支持,连接更多云产品,并提供开源Flowspec和Open API,提升用户体验。
|
9月前
|
SQL 分布式计算 DataWorks
DataWorks智能交互式数据开发与分析之旅
本次实验将带您进行DataWorks Notebook的快速入门,包含:Notebook新建、多引擎SQL开发与分析、Python开发、交互式分析等,同时,使用DataWorks Copilot体验智能数据开发,体验智能交互式数据探索之旅。
2851 11
|
9月前
|
SQL 数据采集 DataWorks
基于DataWorks的多场景实践及数据开发Data Studio最新体验测评
DataWorks是阿里云推出的一站式智能大数据开发治理平台,自2009年发布以来,历经多次迭代,成为企业数字化转型的重要工具。本文通过多个实践案例,如公共电影票房数据预处理,展示了DataWorks如何帮助企业高效处理大数据,涵盖数据集成、ETL开发、数据分析及治理等全流程。最新版DataWorks引入了智能助手Copilot,进一步提升了用户体验和工作效率。

热门文章

最新文章