示例一:spark自带示例项目SparkPi:计算Pi
本文以Spark自带示例项目计算Pi为例测试当前EMR Spark环境是否可用,示例详情请参见EMR示例项目使用说明。
准备工作:
获取spark自带example的jar包spark-examples_2.11-2.4.5.jar存放路径,spark组件安装在/usr/lib/spark-current路径下,登录EMR集群可查询全路径/usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,详情可参见EMR常用文件路径。

执行任务:
新建EMR Spark节点,提交运行代码。仅需填写spark-submit后面部分的内容,作业提交会自动补全。
提交代码:
--class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
实际执行:
spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
查看结果:
返回结果1097: Pi is roughly 3.1415547141554714,运行成功,环境可用。

示例二:Spark对接MaxCompute
以Spark对接MaxCompute为例,实现通过Spark统计MaxCompute表行数。更多应用场景可见EMR Spark开发指南。
本示例涉及云产品:绑定EMR引擎和MaxCompute引擎的DataWorks项目、OSS。
准备测试数据:
在DataWorks数据开发新建odps sql节点,执行建表和插入数据语句,第一列为bigint类型,插入2条记录。
DROP TABLE IF EXISTS emr_spark_read_odpstable ;
CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable
(
id BIGINT
,name STRING
)
;
INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;
本地开发:
创建Maven工程,添加pom依赖,详情请参见Spark准备工作。
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-maxcompute_2.11</artifactId>
<version>1.9.0</version>
</dependency>
插件部分仅供参考。
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
编写内容:
实现在Spark对MaxCompute表第一列Bigint类型行数统计,详情请参见Spark对接MaxCompute。完成后打jar包,有关odps的依赖都属于第三方包,所以也需要一起打包上传到集群。
package com.aliyun.emr.example.spark
import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkConf, SparkContext}
object SparkMaxComputeDemo {
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println(
"""Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
|
|Arguments:
|
| accessKeyId Aliyun Access Key ID.
| accessKeySecret Aliyun Key Secret.
| envType 0 or 1
| 0: Public environment.
| 1: Aliyun internal environment, i.e. Aliyun ECS etc.
| project Aliyun ODPS project
| table Aliyun ODPS table
| numPartitions the number of RDD partitions
""".stripMargin)
System.exit(1)
}
val accessKeyId = args(0)
val accessKeySecret = args(1)
val envType = args(2).toInt
val project = args(3)
val table = args(4)
val numPartitions = args(5).toInt
val urls = Seq(
Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"),
Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com")
)
val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
val sc = new SparkContext(conf)
val odpsOps = envType match {
case 0 =>
OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
case 1 =>
OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
}
val odpsData = odpsOps.readTable(project, table, read, numPartitions)
println(s"Count (odpsData): ${odpsData.count()}")
}
def read(record: Record, schema: TableSchema): Long = {
record.getBigint(0)
}
}
上传运行资源:
登录OSS控制台,在指定路径下上传jar资源(首次使用需要一键授权,详情请参见emr mr节点中的一键授权)。
本示例在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下上传emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar。
注:由于DataWorks EMR 资源上限是50M,而带依赖的包通常大于50m,所以直接在OSS控制台上传。如果您的资源小于50M也可以在DataWorks上操作创建和使用EMR JAR资源。

创建EMR JAR资源:
本示例创建emr_spark_demo-1.0-SNAPSHOT.jar资源,上传上文打好的jar包,存储在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下(首次使用需要一键授权),提交资源,提交后可前往OSS管控台查看。详情请参见创建和使用EMR JAR资源。

创建并执行EMR Spark节点:
本示例在业务流程的EMR数据开发模块下右键新建EMR Spark节点命名为emr_spark_odps,选择EMR引擎实例,提交如下代码,点击高级运行。
其中参数信息Arguments 需要替换为实际使用的相关信息。
提交代码:
--class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1

查看结果:
查看日志,表记录数为2符合预期。
