8. 统计操作
groupBy()
和count()
操作
计算所有年龄和年龄的数量:
teacherDF.groupBy("age").count().show()
0x03 执行 SQL 语句
SparkSession
提供了 SparkSession.sql()
方法,SQL 语句可以直接作为字符串
传入 sql()
方法中,具体可以学习相关的 SQL 知识。
首先需要将 DataFrame
注册为临时表
才可以在该表上执行 SQL 语句:
teacherDF.createOrReplaceTempView("teacher")
查询身高在 170-190 之间的老师:
val sqlDF = spark.sql("SELECT name,height FROM teacher WHERE height >= 170 and height <= 190") sqlDF.show()
如果在同一个应用的不同 session 会话中需要重用一个临时表,可以把它注册成为全局临时表
,全局临时表
会一直存在并在所有会话中共享直到应用程序
终止。
// 注册成为全局临时表 teacherDF.createGlobalTempView("GlobalTeacher") // newSession()返回一个新的spark对象,引用全局临时表需要 global_temp 标识 spark.newSession().sql("SELECT name,height FROM global_temp.GlobalTeacher WHERE height >= 170 and height <= 190").show()
如果重复创建表的话会报错,提示表已经存在,此时就可以使用:createOrReplaceGlobalTempView()
:
teacherDF.createOrReplaceGlobalTempView("GlobalTeacher")
0x04 保存 DataFrame 为其他格式
1. 默认为Parquet格式
parquet
是 Spark SQL 读取的默认
数据文件格式,我们把先前从 JSON 中读取的 DataFrame 保存为这种格式,只保存名称和身高两项数据:
teacherDF.select("name", "height").write.format("parquet").save("/home/hadoop-sny/datas/teacher.parquet")
/home/hadoop-sny/datas/teacher.parquet
文件夹被会被创建并存入名称和身高。另开一个终端,可以查看文件夹下的内容:
2. 保存为其他格式
此外,你也可以保存成一份json文件:
teacherDF.select("name", "height").write.format("json").save("/home/hadoop-sny/datas/teacher-test.json")
查看内容,如图:
3. 保存模式
保存操作可以选择使用多种存储模式: SaveMode
, 它可以指定如何处理现有数据。比如当执行 Overwrite
时, 在写入新数据之前,原有的数据将被删除。
比如:使用overwrite
方式以parquet
形式写出去:
teacherDF.select("name", "height").write.format("json").mode("overwrite").save("/home/hadoop-sny/datas/teacher-test.json")
退出原本的目录,重新进入查看一下文件,生成的时间变了,因为重新生成了文件,并且覆盖了以前生成的文件,其他的模式也是类似的,此处不再反复截图。
0x05 支持多种数据源
1. 通用 load、save 函数
Spark SQL
的默认数据源格式为 parquet
格式。当文件是 parquet
格式时,Spark SQL
可以直接在该文件上执行查询操作。
代码示例如下:
val usersDF = spark.read.load("路径/users.parquet") usersDF.select("name", "age").write.save("namesAndAge.parquet")
2. 指定其他格式数据源
当数据源不是 parquet
文件却是内置格式
的时候,使用指定简称(json, jdbc, orc, libsvm, csv, text)
即可。同时还可以对 DataFrame 进行类型转换
。
代码示例如下:
val usersDF = spark.read.format("json").load("路径/users.json") usersDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
0xFF 总结
- 本文的前置教程课程为: Spark SQL快速入门(基础),关于Spark SQL的操作还有很多知识,此处仅仅是入门教程,有机会再写相应的教程。
- 关于RDD、DataFrame、DataSet的互相转换是非常重要的知识点,请留意我的博客,点赞、评论、关注,有时间分享给大家,谢谢!