开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:课时名称】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12046
DataFrame 介绍_创建_toDF
内容介绍:
一、ETL 的三大步骤
二、Spark 代码编写的套路
三、DataFrame 如何创建
上章介绍了 DataFrame 是什么,本章深入介绍 DataFrame 首先要知道在一般情况下处理数据是否有纲领性的指引。
一、ETL 的三大步骤
一般处理数据都差不多是ETL这个步骤,此处的ETL不是狭义的ETL,大致模拟为先读再装载,ETL步骤中E代表抽取,T表示Transformation处理,转换。L即代表装载,落地,以上就代表ETL的三大步骤。Spark是怎么去处理数据的呢?代码如何编写的呢?
二、Spark代码编写的套路
第一步首先要将数据读出来,创建出来。之后就是通过 API 来进行数据处理,最后是进行数据落地,同时三个步骤不仅可以用于 DataFrame 也可以用于 DataSet 和 RDD。
1. 创造 DataFrame DataSet RDD 制造或读取数据
2. 通过 DataFrame DataSet RDD 的 API 来进行数据处理
3. 通过 DataFrame DataSet RDD 来进行数据落地
以上就是整个的处理步骤。
三、DataFrame如何创建
了解 DataFrame 怎么去处理数据之后第一步首先要了解数据读取的过程 DataFrame如何创建、DataFrame 支持什么操作和 DataFrame 如何落地,三大核心操作,其中 DataFrame 如何落地在下章节有详细介绍不再赘述。
DataFrame 如何创建有三种模式:第一种 toDF,第二种 createDataFrame,第三种还有新增的读写框架 DataFrameReader,就是通过读取外部处理集来创建 DataFrame。
1. 通过隐式转换创建 DataFrame
这种方式本质上是使用 SparkSession 中的隐式转换来进行的
val spark: SparkSession = new sql. SparkSession .Builder ()
.appName ( "hello")
.master( "local[6]")
.getOrCreate()
//必须要导入隐式转换
//注意: spark 在此处不是包,而是 SparkSession 对象
import spark.implicits._
val peopleDF: DataFrame = Seq(People("zhangsan",15),People("lisi",15)) .toDF()
根据源码可以知道,toDF 方法可以在 RDD 和 seq 中使用
通过集合创建 DataFrame 的时候,集合中不仅可以包含样例类,也可以只有普通数据类型,后通过指定列名来创建
val spark: SparkSession = new sql.SparkSession.Builder ().appName( "hello")
.master( "local[6]").getOrCreate()
import spark.implicits._
val df1: DataFrame = Seqr"nihao" , "hello" ).toDF( "text" )
/*
+-------+
| text l
+--------+
| nihao|
| hello |
+--------+
*/
df1.show()
val df2:DataFrame = Seq (("a”,1)).tODF ( "word", "count")
/*
+- - - -+- - - - - +
|word | count|
+- - - -+- - - - - +
| a | 1|
| b | 1|
+- - - -+- - - - -+
*/
df2.show( )
2. IDAE上的示例
(1)打开 IDAE 根据以下写好的方法进行
@Test
def dataframe2(): unit = {
//第一步:创建 SparkSession
val spark = Sparksession.builder()
.appName( name ="dataframe1")
.master( master = "loca1[6]")
.getorcreate()
//第二步:导入隐式转换
import spark.implicits._
//第三步:创建一个 list,而 list 当中放的是 person
val personList = seq(Person(" zhangsan",15),Person("lisi",20))
}
}
case class Person( name: string, age: Int)
(2)之后在其基础上介绍三种创建模式:①toDF ②createDataFrame ③DataFrameReader 将其简称为 read
@Test
def dataframe2(): unit = {
//第一步:创建 SparkSession
val spark = Sparksession.builder()
.appName( name ="dataframe1")
.master( master = "loca1[6]")
.getorcreate()
//第二步:导入隐式转换
import spark.implicits._
//第三步:创建一个 list,而 list当中放的是 person
val personList = seq(Person(" zhangsan",15),Person("lisi",20))
//1.toDF ,可以作用于 PersonList,因而可以直接使用 seq 来调用 toDF
val df1 = personList.toDF()
//同时也可以将其作为一个转换作用于RDD上,spark.sparkcontext.parallelize(personList)创建一个 RDD
val df2 = spark.sparkcontext.parallelize(personList).toDF()
//2.createDataFrame
//3.DataFrameReader将其简称为 read
}
}
case class Person( name: string, age: Int)
(3)toDF 在何处定义
如果将以上代码的val df1 = personList.toDF()和val df2 = spark.sparkcontext.parallelize(personList).toDF()两行注释之后,import spark.implicits._在 IDAE 中会变为灰色,代表它没有在任何地方使用,而取消注释又会变为使用状态说明 toDF 是来自于这个导入的 spark.implicits._。
因为说 toDF 是一个映式转换,首先就需要知道 toDF 将 personlist 和 RDD 都转为了什么类型,按住 ctrl 点击 toDF 就可以进行查看,点入后可以发现 toDF(),toDF()都是在 DatasetHolder 进行定义的,所以只需要找到 DatasetHolder 是在何处进行隐式转换的即可。
Ctrl+C 复制 DatasetHolder 这个类名,因为隐式转换是在 implicits._处导入的,所以 ctrl 点入会发现如下代码:
@Experimental
@Interfacestability.Evolving
object implicits extends SQLImplicits with serializable {
protected override def _SQLcontext: sQLContext = SparkSession.this.sqlcontext
}
得知它给的是一个 SQLcontext,再点入 SQLcontext 大致浏览后如果并未找到隐式转换相关,就可以在 IDAE 中直接搜索 implicits 得到如下代码,可以发现其中有一个 implicits 的 object 刚好继承了 SQLImplicits。
@Experimental
@Interfacestability.Evolving
object implicits extends SQLImplicits with serializable {
protected override def _sqlcontext:sQLcontext = self
}
再次点入 SQLImplicits,因为 toDF 是在 DatasetHolder 中定义的,所以在其中直接搜索 DatasetHolder,发现是在以下代码中进行了隐式转换,当在代码导入 import spark.implicits._后,使用对应类型时,它会传入到隐式转换的方法中来,去对比是否是该类型,是这个类型则会直接创建一个 DatasetHolder。
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]):DatasetHolder[T] ={
DatasetHolder(_sqlcontext.createDataset(rdd))
}
/**
*creates a [[Dataset]] from a local seq.
*since 1.6.0
*/
implicit def localseqToDatasetHolder[T : Encoder](s: seq[T])DatasetHolder[T] = {
DatasetHolder(_sqlcontext.createDataset(s))
}
以上就是整个 toDF 的流程,本章介绍了 toDF 在何处定义,同时查看了源码层面,同时源码无需完全掌握,知晓思路即可,掌握下回遇到同类情况,可以去看懂源码思路即可。
同时 Spark 的代码写的较好,在网上以及博客里搜集的资料大多不同,就可以采用看源码的形式。