EMR Studio 大数据处理最佳实践课程_存储服务_技术课程_开发者学堂

阿里云
为了无法计算的价值
打开APP
阿里云APP内打开
开发者社区> 开发者学堂> 全部课程> EMR Studio 大数据处理最佳实践课程

EMR Studio 大数据处理最佳实践课程

2课时 |
125人已学 |
免费
课程介绍
EMR Studio 是 阿里云 E-MapReduce 最新推出的用于开源大数据开发场景的集群类型,提供交互式开发、作业提交、作业调试和工作流一站式数据开发体验。能够无缝对接 EMR 计算集群,自动适配多种计算引擎协同工作。覆盖了大数据处理ETL、交互式数据分析、机器学习和实时计算等多种应用场景。

本系列课程旨在让你对EMR Studio 有初步的了解,同时通过一些落地的最佳实践,助你摆脱技术小白称号!

Spark大数据处理最佳实践

 

目录

一.大数据概览

二.如何摆脱技术小白

三.Spark SQL学习框架
四. EMR Studio上的大数据最佳实践

五.Demo

 

  • 大数据概览
    大数据处理ETL (Data > Data)
    ●大数据分析BI (Data > Dashboard)
    ●机器学习AI       (Data > Model)

 

二、如何摆脱技术小白

技术小白是什么:

只懂表面,不懂本质。

只懂得参考别人的Spark代码,不懂得Spark的内在机制,不懂得如何调优Spark Job

摆脱技术小白药方:
              懂得运行机制

如何配置

如何看Log

 

3.Spark SQL学习框架

3.1Spark SQL Archi tecture

 

3.2如何配置Spark App

  • 配置Driver

spark.driver.memory

spark.driver.coresCÏ

  • 配置Executor
    executor.memory
    spark.executor.cores
  • 配置Runtime
    files
    spark.jars

val df1 = spark. createDataFrame(Seq((1, "andy", 20, "USA")
(2, "jeff", 23, "China"),(3,"james",18, "USA")))

.toDF("id","name","age","country")

//select can accept a list of string of the column names

val df2 = df1. select("id", "name")

df2. show()

 

  • 配置DAE

观察一个Spark Log

 

 

 

三、.3Spark SQL学习框架(结合图形/几何)

  • Select Rows
  • Select Columns

val df1 = spark.createDataF rame(Seq((1,"andy",20,"USA"),
(2, "jeff"23, "China")

(3, "james", 18, "USA")))

.toDF("id", "name", "age", "country")
// filter accept a Column
val df2 = df1. filter($"age" >= 20)

df2. show()

  • Transform Column

//withColumn could be used to add/replace new Column
val df1 = spark. createDataFrame(Seq((1, "andy", 20, "USA"),
                                        (2, "jeff", 23, "China")

(3, "james", 18, "USA")))
.toDF("id", "name", "age" , " country")
val df2 = df1. wi thColumn("name", upper($ " name"))

df2. show()

  • Group Вy

val df1 = spark . createDataFrame(Seq((1, "andy", 20, "USA"),
(2, "jeff", 23, "China")(3, "james", 18, "USA"))

.toDF("id", "name", "age", "country")

//You can call agg function after groupBy directly,

such as count/min/max/avg/sumval df2=df1. roupBy(" country"). 

count()

df2. show()


// Pass a Map if you want to do multiple aggregation

val df3=df1.groupBy("country").agg(Map("age"->"avg","id"->"count")

)df3.show()

 

  • Join

val df1.spark.createDataFrame(Seq((1,"andy",20,1),
(2, "jeff", 23, 2)

(3, "james", 18, 3)))

.toDF("id", "name", "age", "c. _id")
val df2 = spark. createDataFrame(Seq((1, "USA"),
                                        (2, "China")))
.toDF("c. id", "c. _name")
val df4-df1.join(df2,df1("c._id")--df2("c._id"))df4.show()

3.4Spark SQL执行计划

  • Where

val df - spark. read. json("file:///usr/lib/spark- current/examples/src/main/resources/people. json")df .registerTempTable("people")
select.from people where name'jeff'
= Physical Plan =
*Project ([age#6L, name#7]
+- *Filter (isnotnull(name#7) && (name#7 . jeff))
+- *FileScan json [age#6L ,name#7] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/usr/lib/spark-current/examples/src/main/resources/people. json], PartitionFilters: O, PushedFilters: [IsNotNull(name), EqualTo(name , jeff)], ReadSchema: struct <age:bigint ,name:string>I

 

  • Group by

val df = spark. read . json(" file:///usr/lib/spark- current/examples/src/main/resources/people.json")df.registerTempTable("people")
select age, count(1) from people group by age
-- Physical Plan --
*(2) HashAggregate(keys- [age#26L], functions=[count(1)])
+- Exchange hashpartitioning(age#26L, 200)
+- *(1) HashAggregate(keys=[age#26L], functions=[partial. .count(1)])
+- *(1) FileScan json [age#26L] Batched:false,Format:JSONLocation:InMemoryFileIndex[file:/usr/lib/spar
k-current/examples/src/main/resources/people. json], PartitionFilters:ẞ,PushedFilters:O,ReadSchema:struct<age:blgint»r

  • Order by

val df - spark. read. json("file:///usr/lib/spark-current/examples/src/main/resources/people. json")df . registerTempTable("people")
select * from people order by age
= Physical Plan =
*(2) Sort [age#64L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(age#64L ASC NULLS FIRST, 200)
+- *(1) FileScan json [age#64L , name#65] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/usr/lib/spark- current/examples/src/main/resources/people. json], PartitionFilters: [, PushedFilters: [, ReadSchema: structcage:bigint , name:string> I

 

四、EMR Studio上的大数据最佳实践

  • EMR Studio特性
  • 兼容开源组件
  • 支持连接多个集群

 

  • 适配多个计算引擎
  • 通过界面化的方式进行交互式开发和作业调度

Notebook + Airflow:无缝衔接开发环节和生产调度环节

1.利用交互式开发模式可以快速验证作业的正确性。

2.在Airflow里 调度Notebook作业,最大程度得保证开发环境和生产环境的一致性,防止由于开发阶段和生产阶段环境不一致而导致的问题。

  • 适用多种大数据应用场景
  • 大数据处理ETL
  • 交互式数据分析
  • 机器学习
  • 实时计算
  • 计算存储分离

所有数据都保存在00S上,包括

  • 用户Notebook代码
    调度作业Log
  • 即使集群销毁,也可以重建集群轻松恢复数据

5.Demo

简要介绍 EMR Studio工具栏的三个主键以及关联集群,大致演示如何关联集群。在spark教程中用Spark SQL 教程(Scala)首先要选择任一一个集群,然后跑一下进程进行观察。交互式的开发好处是能立马看到结果。