本文介绍大数据分析引擎spark 基于EMR集群,利用OSS云存储数据,实现一个简单的分析案例。
前提条件
• 已注册阿里云账号,详情请参见注册云账号。
• 已开通E-MapReduce服务和OSS服务。
• 已完成云账号的授权,详情请参见角色授权。
• 已创建Haoop集群,且带有spark组件, 配置好相关的OSS数据源。
步骤一:数据上传至oss
hadoop fs -put course2.csv oss://your-bucket-name/
步骤二:编写处理代码,及打包
1、分析代码
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
object OSSExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("OSSExample")
.getOrCreate()
val data=spark.read.format("csv").option("header","true").load("oss://your-bucket-name/course2.csv")
val data1 = data.groupBy("subject", "level").count()
val window = Window.partitionBy("subject").orderBy(org.apache.spark.sql.functions.col("count").desc)
val data2 = data1.withColumn("topn", row_number().over(window)).where("topn <= 1" )
data2.show(false)
}
}
2、IDEA打包
IDEA Build -> Build Artifact ->Build
步骤三:上传jar包到Hadoop 或者oss
在本例中,我们把jar上传至OSS中
把jar 上传到集群header节点,然后使用以下命令
hadoop fs -put OSSExample.jar oss://your-bucket-name/
步骤四:创建作业job,运行作业
--class OSSExample --master yarn --deploy-mode client --driver-memory 3g --num-executors 10 --executor-memory 3g --executor-cores 3 --conf spark.default.parallelism=50 --conf spark.yarn.am.memoryOverhead=1g --conf spark.yarn.am.memory=2g oss://your-bucket-name/OSSExample.jar
步骤五:查看作业运行是否成功及查看运行结果
总结
通过本次实践,实现了从OSS上读取数据,并在EMR集群上进行简单统计分析的Spark作业运行;通过本次实践,能够了解如何利用Spark对OSS进行分析的具体过程,有助于后续其他复杂作业的开发以及实践。