DataWorks_数据开发_EMR Spark节点_计算Pi和对接MaxCompute案例

简介: DataWorks_数据开发_EMR Spark节点 1)计算Pi; 2)对接MaxCompute。
+关注继续查看

示例一:spark自带示例项目SparkPi:计算Pi

本文以Spark自带示例项目计算Pi为例测试当前EMR Spark环境是否可用,示例详情请参见EMR示例项目使用说明


准备工作:

获取spark自带example的jar包spark-examples_2.11-2.4.5.jar存放路径,spark组件安装在/usr/lib/spark-current路径下,登录EMR集群可查询全路径/usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,详情可参见EMR常用文件路径

44CC3525-315D-4449-B30B-0373623B391A.png


执行任务:

新建EMR Spark节点,提交运行代码。仅需填写spark-submit后面部分的内容,作业提交会自动补全。

提交代码:

--class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100

实际执行

# spark-submit [options] --class [MainClass] xxx.jar args
spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100


查看结果:

返回结果1097: Pi is roughly 3.1415547141554714,运行成功,环境可用。

image.png


示例二:Spark对接MaxCompute

以Spark对接MaxCompute为例,实现通过Spark统计MaxCompute表行数。更多应用场景可见EMR Spark开发指南

本示例涉及云产品:绑定EMR引擎和MaxCompute引擎的DataWorks项目、OSS。


准备测试数据:

在DataWorks数据开发新建odps sql节点,执行建表和插入数据语句,第一列为bigint类型,插入2条记录。

DROP TABLE IF EXISTS emr_spark_read_odpstable ;
CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable 
(
    id BIGINT
    ,name STRING
)
;
INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;


本地开发:

创建Maven工程,添加pom依赖,详情请参见Spark准备工作

<dependency>
        <groupId>com.aliyun.emr</groupId>
        <artifactId>emr-maxcompute_2.11</artifactId>
        <version>1.9.0</version>
    </dependency>


插件部分仅供参考。

<build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
          
           <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <recompileMode>incremental</recompileMode>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


编写内容:

实现在Spark对MaxCompute表第一列Bigint类型行数统计,详情请参见Spark对接MaxCompute。完成后打jar包,有关odps的依赖都属于第三方包,所以也需要一起打包上传到集群。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.aliyun.emr.example.spark
import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkConf, SparkContext}
object SparkMaxComputeDemo {
  def main(args: Array[String]): Unit = {
    if (args.length < 6) {
      System.err.println(
        """Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
          |
          |Arguments:
          |
          |    accessKeyId      Aliyun Access Key ID.
          |    accessKeySecret  Aliyun Key Secret.
          |    envType          0 or 1
          |                     0: Public environment.
          |                     1: Aliyun internal environment, i.e. Aliyun ECS etc.
          |    project          Aliyun ODPS project
          |    table            Aliyun ODPS table
          |    numPartitions    the number of RDD partitions
        """.stripMargin)
      System.exit(1)
    }
    val accessKeyId = args(0)
    val accessKeySecret = args(1)
    val envType = args(2).toInt
    val project = args(3)
    val table = args(4)
    val numPartitions = args(5).toInt
    val urls = Seq(
      Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment
      Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment
    )
    val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
    val sc = new SparkContext(conf)
    val odpsOps = envType match {
      case 0 =>
        OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
      case 1 =>
        OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
    }
    val odpsData = odpsOps.readTable(project, table, read, numPartitions)
    println(s"Count (odpsData): ${odpsData.count()}")
  }
  def read(record: Record, schema: TableSchema): Long = {
    record.getBigint(0)
  }
}


上传运行资源:

登录OSS控制台,在指定路径下上传jar资源(首次使用需要一键授权,详情请参见emr mr节点中的一键授权)。

本示例在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下上传emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar。

注:由于DataWorks EMR 资源上限是50M,而带依赖的包通常大于50m,所以直接在OSS控制台上传。如果您的资源小于50M也可以在DataWorks上操作创建和使用EMR JAR资源

60817947-ABA2-45e8-A261-205B76187381.png


创建EMR JAR资源:

本示例创建emr_spark_demo-1.0-SNAPSHOT.jar资源,上传上文打好的jar包,存储在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下(首次使用需要一键授权),提交资源,提交后可前往OSS管控台查看。详情请参见创建和使用EMR JAR资源

3ED0B27C-A7A2-4dd5-B54F-BAA68BB6E847.png


创建并执行EMR Spark节点:

本示例在业务流程的EMR数据开发模块下右键新建EMR Spark节点命名为emr_spark_odps,选择EMR引擎实例,提交如下代码,点击高级运行。

其中参数信息Arguments 需要替换为实际使用的相关信息。

提交代码:

--class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1

08D2575F-BC12-440c-BDC5-5B7287A83DCF.png


查看结果:

查看日志,表记录数为2符合预期。

F2F2D4D1-E44A-40b5-BE55-8D7BDBBF4A8D.png




相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标 &nbsp;通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群 &nbsp;企业数据仓库开发人员 &nbsp;大数据平台开发人员 &nbsp;数据分析师 &nbsp;大数据运维人员 &nbsp;对于大数据平台、数据中台产品感兴趣的开发者
相关文章
|
9月前
|
分布式计算 Kubernetes Cloud Native
《从Spark到Kubernetes MaxCompute 的云原生开源生态实践之路》电子版地址
从Spark到Kubernetes MaxCompute 的云原生开源生态实践之路
71 0
《从Spark到Kubernetes MaxCompute 的云原生开源生态实践之路》电子版地址
|
分布式计算 Java Linux
如何搭建MaxCompute Spark开发环境。
如何搭建MaxCompute Spark开发环境。
350 0
|
分布式计算 关系型数据库 分布式数据库
MaxCompute Spark
MaxCompute Spark
107 0
|
弹性计算 分布式计算 资源调度
模拟IDC spark读写MaxCompute实践
现有湖仓一体架构是以 MaxCompute 为中心读写 Hadoop 集群数据,有些线下 IDC 场景,客户不愿意对公网暴露集群内部信息,需要从 Hadoop 集群发起访问云上的数据。本文以 EMR (云上 Hadoop)方式模拟本地 Hadoop 集群访问 MaxCompute数据。
543 0
|
分布式计算 DataWorks Java
MaxCompute Spark 使用及常见问题|学习笔记
快速学习 MaxCompute Spark 使用及常见问题
403 0
MaxCompute Spark 使用及常见问题|学习笔记
|
存储 分布式计算 DataWorks
【MaxCompute 常见问题】 MaxCompute Spark
如何将开源 Spark 代码迁移到 Spark on MaxCompute?分以下三种情形: 作业无需访问 MaxCompute 表和 OSS。您的 Jar 包可直接运行,具体步骤请参见搭建开发环境。注意,对于 Spark 或 Hadoop 的依赖必须设成 provided。
【MaxCompute 常见问题】 MaxCompute Spark
|
传感器 SQL 分布式计算
MaxCompute Spark 资源使用优化详解
本文主要讲解MaxCompute Spark资源调优,目的在于在保证Spark任务正常运行的前提下,指导用户更好地对Spark作业资源使用进行优化,极大化利用资源,降低成本。
1440 0
MaxCompute Spark 资源使用优化详解
|
分布式计算 DataWorks Java
MaxCompute Spark 使用和常见问题
本文将就MaxCompute Spark开发环境搭建、常用配置、作业迁移注意事项以及常见问题进行深入介绍。
3528 2
|
分布式计算 DataWorks Java
Spark On MaxCompute如何访问Phonix数据
如何使用Spark On MaxCompute连接Phonix,将Hbase的数据写入到MaxCompute的对应表中,目前没有对应的案例,为了满足用户的需求。本文主要讲解使用Spark连接Phonix访问Hbase的数据再写入到MaxCompute方案实践。该方案的验证是使用hbase1.1对应Phonix为4.12.0。本文从阿里云Hbase版本的选择、确认VPC、vswitchID、设置白名单和访问方式,Phonix4.12.0的客户端安装,在客户端实现Phonix表的创建和写入,Spark代码在本地IDEA的编写以及pom文件以及vpcList的配置,打包上传jar包并进行冒烟测试。
7979 0
Spark On MaxCompute如何访问Phonix数据
|
分布式计算 安全 Shell
Maxcompute Spark 访问 阿里云 Hbase
引子 本来这个东西是没啥好写的,但是在帮客户解决问题的时候,发现链路太长,不能怪客户弄不出来,记录一下 需求列表 MaxCompute Spark包 (写文章时刻为版本 0.32.1, 请自行更新,本文不是文档) Spark 配置 spark.
Maxcompute Spark 访问 阿里云 Hbase
推荐文章
更多