Spark大数据处理最佳实践
目录
一.大数据概览
二.如何摆脱技术小白
三.Spark SQL学习框架
四. EMR Studio上的大数据最佳实践
五.Demo
- 大数据概览
大数据处理ETL (Data > Data)
●大数据分析BI (Data > Dashboard)
●机器学习AI (Data > Model)
二、如何摆脱技术小白
技术小白是什么:
只懂表面,不懂本质。
只懂得参考别人的Spark代码,不懂得Spark的内在机制,不懂得如何调优Spark Job
摆脱技术小白药方:
懂得运行机制
如何配置
如何看Log
3.Spark SQL学习框架
3.1Spark SQL Archi tecture
3.2如何配置Spark App
spark.driver.memory
spark.driver.coresCÏ
- 配置Executor
executor.memory
spark.executor.cores
- 配置Runtime
files
spark.jars
val df1 = spark. createDataFrame(Seq((1, "andy", 20, "USA")
(2, "jeff", 23, "China"),(3,"james",18, "USA")))
.toDF("id","name","age","country")
//select can accept a list of string of the column names
val df2 = df1. select("id", "name")
df2. show()
观察一个Spark Log
三、.3Spark SQL学习框架(结合图形/几何)
- Select Rows
- Select Columns
val df1 = spark.createDataF rame(Seq((1,"andy",20,"USA"),
(2, "jeff"23, "China")
(3, "james", 18, "USA")))
.toDF("id", "name", "age", "country")
// filter accept a Column
val df2 = df1. filter($"age" >= 20)
df2. show()
//withColumn could be used to add/replace new Column
val df1 = spark. createDataFrame(Seq((1, "andy", 20, "USA"),
(2, "jeff", 23, "China")
(3, "james", 18, "USA")))
.toDF("id", "name", "age" , " country")
val df2 = df1. wi thColumn("name", upper($ " name"))
df2. show()
val df1 = spark . createDataFrame(Seq((1, "andy", 20, "USA"),
(2, "jeff", 23, "China")(3, "james", 18, "USA"))
.toDF("id", "name", "age", "country")
//You can call agg function after groupBy directly,
such as count/min/max/avg/sumval df2=df1. roupBy(" country").
count()
df2. show()
// Pass a Map if you want to do multiple aggregation
val df3=df1.groupBy("country").agg(Map("age"->"avg","id"->"count")
)df3.show()
val df1.spark.createDataFrame(Seq((1,"andy",20,1),
(2, "jeff", 23, 2)
(3, "james", 18, 3)))
.toDF("id", "name", "age", "c. _id")
val df2 = spark. createDataFrame(Seq((1, "USA"),
(2, "China")))
.toDF("c. id", "c. _name")
val df4-df1.join(df2,df1("c._id")--df2("c._id"))df4.show()
3.4Spark SQL执行计划
val df - spark. read. json("file:///usr/lib/spark- current/examples/src/main/resources/people. json")df .registerTempTable("people")
select.from people where name'jeff'
= Physical Plan =
*Project ([age#6L, name#7]
+- *Filter (isnotnull(name#7) && (name#7 . jeff))
+- *FileScan json [age#6L ,name#7] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/usr/lib/spark-current/examples/src/main/resources/people. json], PartitionFilters: O, PushedFilters: [IsNotNull(name), EqualTo(name , jeff)], ReadSchema: struct <age:bigint ,name:string>I
val df = spark. read . json(" file:///usr/lib/spark- current/examples/src/main/resources/people.json")df.registerTempTable("people")
select age, count(1) from people group by age
-- Physical Plan --
*(2) HashAggregate(keys- [age#26L], functions=[count(1)])
+- Exchange hashpartitioning(age#26L, 200)
+- *(1) HashAggregate(keys=[age#26L], functions=[partial. .count(1)])
+- *(1) FileScan json [age#26L] Batched:false,Format:JSONLocation:InMemoryFileIndex[file:/usr/lib/spar
k-current/examples/src/main/resources/people. json], PartitionFilters:ẞ,PushedFilters:O,ReadSchema:struct<age:blgint»r
val df - spark. read. json("file:///usr/lib/spark-current/examples/src/main/resources/people. json")df . registerTempTable("people")
select * from people order by age
= Physical Plan =
*(2) Sort [age#64L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(age#64L ASC NULLS FIRST, 200)
+- *(1) FileScan json [age#64L , name#65] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/usr/lib/spark- current/examples/src/main/resources/people. json], PartitionFilters: [, PushedFilters: [, ReadSchema: structcage:bigint , name:string> I
四、EMR Studio上的大数据最佳实践
- EMR Studio特性
- 兼容开源组件
- 支持连接多个集群
- 适配多个计算引擎
- 通过界面化的方式进行交互式开发和作业调度
Notebook + Airflow:无缝衔接开发环节和生产调度环节
1.利用交互式开发模式可以快速验证作业的正确性。
2.在Airflow里 调度Notebook作业,最大程度得保证开发环境和生产环境的一致性,防止由于开发阶段和生产阶段环境不一致而导致的问题。
- 适用多种大数据应用场景
- 大数据处理ETL
- 交互式数据分析
- 机器学习
- 实时计算
- 计算存储分离
所有数据都保存在00S上,包括
- 用户Notebook代码
调度作业Log
- 即使集群销毁,也可以重建集群轻松恢复数据
5.Demo
简要介绍 EMR Studio工具栏的三个主键以及关联集群,大致演示如何关联集群。在spark教程中用Spark SQL 教程(Scala)首先要选择任一一个集群,然后跑一下进程进行观察。交互式的开发好处是能立马看到结果。