开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:SparkSQL 读写_分区】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12055
SparkSQL 读写_分区
内容介绍:
一、Hive 分区表
二、分区发现
三、写入 Parquet 的时候可以指定分区
一、Hive 分区表
首先打开 idea,上个案例中写的是一个 json 文件,但是写出来是一个文件夹,原因是 mapreduce,当 reduce 执行完任务之后生成的文件是一个文件夹形式,里面的文件是 part-xxx 的形式,有文件夹的原因是 rdd 是支持分区的,而 dateframe 的底层也是 rdd。
rdd 支持分区不可能等所有数据处理完收集,统一写一个文件,它会直接把每一个 rdd 的分区写成一个文件,之所以会生成文件夹是因为 rdd 支持分区,这里面每一个文件对应一个 rdd。Succes 代表任务的写入成功,是一个状态。
1.概念
①Hive 的分区表可以按照某一列作为分区列,将数据放入不同的文件中。
②对应的文件所在的目录上就有分区的信息,例如某个文件夹叫做 year=2019。
③Hive 很重要,Spark 经常要和 Hive 配合,Spark 要支持这种形式的文件夹的读写。
我们在写的时候直接写成一个文件夹,因为要支持 rdd 的分区,在读的时候直接能读文件夹,是因为在读的时候也要和写配合,写的时候是文件夹,同一个框架,读也一样。
然后进入脑图,了解本节的分区,这个分区是类似于 Hive 分区表形式的分区,Hive 的分区表可以按照某一列作为分区列,将数据放入不同的文件中,就是常说的外部分区表。
Hive 的分一个表对应的数据都存在于 hdfs 上的文件当中的,这个文件可能在某个文件夹,Hive 分区表对应的文件所在的目录上就有我们分区的信息,例如某文件夹叫“year=2019”,按照 year 作为分区的列,然后这条列对应的数据是 2019。
Hive 很重要,Spart 经常和其配合,所以 Spart 要支持这种形式的文件夹的读写。
2.写文件时候分区
要了解的知识点,第一个是写文件的时候分区,然后读分区文件。
二、分区发现
在读取常见文件格式的时候。Spark 会自动的进行分区发现,分区自动发现的时候,会将文件名中的分区信息当作一列.例如如果按照性别分区,那么一般会生成两个文件夹 gender=male 和 gendermfemale ,那么在使用 Spark 读取的时候,会自动发现这个分区信息,并且当作列放入创建的 DataFrame 中。
使用代码证明有两个步骤,第一步先读取某个分区的单独一个文件并打印其 Schema 信息,第二步读取整个数据集所有分区并打印 Schema 信息,和第一步做比较就可以确定。
val spark = ...
val partDF = spark.read.load ( "dataset/beijing_pm/year=2010/month=1")
partDF.printSchema( )
把分区的数据集中的某一个区单做一整个数据集读取,没有分区信息,自然也不会进行分区发现。
三、写入 Parquet 的时候可以指定分区
Spark 在写入文件的时候是支持分区的,可以像 Hive 一样设置某个列为分区列。
val spark: SparkSession = new sql.SparkSession.Builder ()
.appName ( "hello")
.master( "local[6]")
.get0rCreate()
//从 CSV 中读取内容
val dfFromParquet = spark.read . option ( "header" , value =true).csv ( "dataset/BeijingPM20100101_20151231.csv" )
//保存为 Parquet 格式文件,不指定 format 默认就是 Parquet
dfFromParquet.write.partitionBy( "year " ,"month" ).save("dataset/beijing_pm")
进入 idea 中创建新的方法,叫 parquetPatition,同时表分区的概念不仅在 patquet 上有,其他格式的文件也可以指定分区。
1. 读取数据。
读出一个 df 文件,如图,指定 option,header 为 true,csv 文件位置在 BeijingPM 中。
l 代码如下:
def parquetpartitions(): unit = {
// 1.读取数据
val df = spark.read
.option("header", value = true)
.csv( path = "dataset/BeijingPM20100101_20151231.csv")
2.写文件,表分区。
df 可以 write,指定 partitionBy 带某一列,可以指定多个列,按照年和月进行分区,直接进行 save,指定一个位置 beijing_pm4,之后先运行。
//2.写文件,表分区
df.write
.partitionay colNames= "year","
.month.save( path- "dataset/beijing_pm4 )
beijing_pm4 打开后,发现文件夹是按照年对应的值,按照 year 这一列作为第一层的分区,打开任意一个 year 文件的分区,之后发现每一个 year 中按照 month 进行分区的,之后再分发现是 rdd 分区形式。
3.读文件,自动发现分区。
首先 spark.read,写成 parquet 格式,指定位置,然后直接 printSchema。
注意:写分区表的时候,分区列不会包含在生成的文件中,只包含在文件夹上,如果直接通过读取文件来读取,分区信息会丢失,对应的列也会丢失。
// 3.读文件。自动发现分区
//写分区表的时候,分区列不会包含在生成的文件
//直接通过文件来进行读取的话,分区信息会丢失
spark.read
.parquet(path="dataset/beijing_pm4/year=2010/month=1")
.printSchema()
如果把 year 和 month 去掉,直接读取文件夹信息。spark sql 读取文件夹的时候支持发现里面自动分区的。但在进行验证后发现操作没有问题,说明读取分区表文件夹时,读取最外面的文件夹,sparksql 会进行自动的发现,所以最后的代码完全可以去除 year 和 month。
l 最终代码如下:
spark.read
.parquet(path="dataset/beijing_pm4")//去除 year 和 moth
.printSchema()
最后强调,写的时候可以进行分区,读的时候可以自动读取分区表。