DataFrame的操作-使用SQL

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: DataFrame的操作-使用SQL

1. 实验室名称:

大数据实验教学系统

2. 实验项目名称:

DataFrame的操作-使用SQL

3. 实验学时:

4. 实验原理:

在SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。


5. 实验目的:

掌握Spark SQL临时视图的创建。

 掌握Spark SQL查询。


6. 实验内容:

使用标准SQL对DataFrame进行操作。具体包含如下内容:

 1、临时表创建

   - createGlobalTempView(name)

   - createOrReplaceGlobalTempView(name)

   - createOrReplaceTempView(name)

   - createTempView(name)

   - registerTempTable(name)

   - dropTempView(name)

 2、Spark SQL查询

   - 加载数据集及处理

   - 创建临时表并查询

   - 创建永久表并查询


7. 实验器材(设备、虚拟机名称):

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1


8. 实验步骤:

8.1 环境准备

1、在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:


1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh
4.  $ zeppelin-daemon.sh start

2. 将本实验用到的数据集上传到HDFS上。在终端窗口中,执行以下命令:


1.  $ hdfs dfs -mkdir -p /data/dataset/batch
2.  $ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/
3.  $ hdfs dfs -put /data/dataset/batch/wc.txt /data/dataset/batch/

3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:


6220747b92934d40b8fb3a853c18ef5a.png


8.2 临时视图的创建

1、使用createGlobalTempView(name)方法为DataFrame创建一个全局的临时表,其生命周期和启动的app的周期一致,既启动的spark应用存在则这个临时的表就一直能访问,直道sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库中。

 在zeppelin中执行如下代码:


1.  var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
2.  df.createGlobalTempView("xx")
3.       
4.  // 查询
5.  spark.sql("select * from global_temp.xx").show(5)

【shift+enter】对程序进行输出。输出内容如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|Age|Annual Income|
3.  +---+------+---+-------------+
4.  |  1|  Male| 34|         2000|
5.  |  2|Female| 23|         3500|
6.  |  3|Female| 26|         2500|
7.  |  4|Female| 27|         4500|
8.  |  5|  Male| 24|         5500|
9.  +---+------+---+-------------+
10. only showing top 5 rows

2、使用createOrReplaceGlobalTempView(name)方法创建或替换视图。上面的方法当遇到已经创建了的临时表名的话会报错。而这个方法遇到已经存在的临时表会进行替换,没有则创建。在zeppelin中执行如下代码:

1.  // 创建临时表,存在则覆盖
2.  df.createOrReplaceGlobalTempView("XX")
3.       
4.  // 查询
5.  spark.sql("select * from global_temp.XX").show(5)

【shift+enter】对程序进行输出。输出内容如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|Age|Annual Income|
3.  +---+------+---+-------------+
4.  |  1|  Male| 34|         2000|
5.  |  2|Female| 23|         3500|
6.  |  3|Female| 26|         2500|
7.  |  4|Female| 27|         4500|
8.  |  5|  Male| 24|         5500|
9.  +---+------+---+-------------+
10. only showing top 5 rows

3、使用createOrReplaceTempView(name)方法为DataFrame创建本地的临时视图,其生命周期只限于当前的SparkSession,当调用了SparkSession的stop方法停止SparkSession后,其生命周期就到此为止了。

 在zeppelin中执行如下代码:

1.  
2.  // 创建本地的临时视图
3.  df.createOrReplaceTempView("TT")
4.       
5.  // 查询
6.  spark.sql("select * from TT").show(5)

【shift+enter】对程序进行输出。输出内容如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|Age|Annual Income|
3.  +---+------+---+-------------+
4.  |  1|  Male| 34|         2000|
5.  |  2|Female| 23|         3500|
6.  |  3|Female| 26|         2500|
7.  |  4|Female| 27|         4500|
8.  |  5|  Male| 24|         5500|
9.  +---+------+---+-------------+
10. only showing top 5 rows

4、使用createTempView(name)方法创建临时视图。这个方法在创建临时视图时若遇到已经创建过的视图的名字,会报错。因此需要指定另外的名字。在zeppelin中执行如下代码:

1.  
2.  // 创建视图
3.  df.createTempView("T2")
4.       
5.  // 查询
6.  spark.sql("select * from T2").show(5)

【shift+enter】对程序进行输出。输出内容如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|Age|Annual Income|
3.  +---+------+---+-------------+
4.  |  1|  Male| 34|         2000|
5.  |  2|Female| 23|         3500|
6.  |  3|Female| 26|         2500|
7.  |  4|Female| 27|         4500|
8.  |  5|  Male| 24|         5500|
9.  +---+------+---+-------------+
10. only showing top 5 rows

8.3 执行Spark SQL查询

1、创建数据,从HDFS上进行数据读取,数据转换为DataFrame类型。在zeppelin中执行如下代码:

1.  import org.apache.spark.sql.types._
2.  import org.apache.spark.sql._
3.       
4.  // 数据路径
5.  var filePath = "/data/dataset/batch/wc.txt"
6.       
7.  // 创建RDD
8.  var rdd1 = sc.textFile(filePath)
9.       
10. // RDD进行flatMap操作后进行map操作
11. var rdd2 = rdd1.flatMap( x=>x.split(" ")).map( word=>Row(word,1))
12.      
13. // 指定schema
14. var schema = StructType(List(StructField("word",StringType,true),StructField("count",IntegerType,true)))
15.      
16. // 创建DataFrame
17. var wordDF = spark.createDataFrame(rdd2,schema)
18. // 查看df的数据结构信息
19. wordDF.printSchema

【shift+enter】对程序进行输出。输出内容如下所示:


1.  root
2.   |-- word: string (nullable = true)
3.   |-- count: integer (nullable = true)

2、创建临时表,数据查询。在zeppelin中执行如下代码:

1.  // 注册临时表
2.  wordDF.createOrReplaceTempView("wc")
3.       
4.  // 从临时表wc中执行sql查询
5.  var resultDF = spark.sql("select * from wc")
6.  // 展示DF
7.  resultDF.show
8.       
9.  // 从临时表中执行sql查询
10. var resultDF2 = spark.sql("select word,count(*) as total from wc group by word")
11. // 展示DF
12. resultDF2.show

【shift+enter】对程序进行输出。输出内容如下所示:

1.  +-----+-----+
2.  | word|count|
3.  +-----+-----+
4.  | good|    1|
5.  | good|    1|
6.  |study|    1|
7.  |  day|    1|
8.  |  day|    1|
9.  |   up|    1|
10. +-----+-----+
11.      
12. +-----+-----+
13. | word|total|
14. +-----+-----+
15. |  day|    2|
16. |study|    1|
17. |   up|    1|
18. | good|    2|
19. +-----+-----+

3、创建永久表,数据查询。在zeppelin中执行如下代码:

1.  // 注册为永久表
2.  wordDF.write.saveAsTable("wccc")
3.       
4.  // 从永久表中执行查询
5.  var resultDF3 = spark.sql("select * from wccc")
6.  // 数据展示
7.  resultDF3.show()
8.       
9.  // 从永久表中执行wccc查询
10. var resultDF4 = spark.sql("select word,count(*) as total from wccc group by word")
11. // 数据展示
12. resultDF4.show()

【shift+enter】对程序进行输出。输出内容如下所示:

1.  +-----+-----+
2.  | word|count|
3.  +-----+-----+
4.  | good|    1|
5.  | good|    1|
6.  |study|    1|
7.  |  day|    1|
8.  |  day|    1|
9.  |   up|    1|
10. +-----+-----+
11.      
12. +-----+-----+
13. | word|total|
14. +-----+-----+
15. |  day|    2|
16. |study|    1|
17. |   up|    1|
18. | good|    2|
19. +-----+-----+

9. 实验结果及分析:

实验结果运行准确,无误


10. 实验结论:

经过本节实验的学习,通过DataFrame的操作-使用SQL,进一步巩固了我们的Spark基础。


11. 总结及心得体会:

SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。


12、 实验知识测试

1、创建本地临时视图表成功的方法是( C ){单选}

 A、createGlobalTempView(name)

 B、createOrReplaceGlobalTempView(name)

 C、createOrReplaceTempView(name)

 D、createTempView(name)


13、实验拓展

给定本地一个文本数据,并上传至HDFS,读取后创建为DataFrame,并创建临时表对数据进行查询。


相关文章
|
5月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之SQL错误[0A000],通常是什么造成的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
54 1
|
11天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
54 0
|
3月前
|
SQL 存储 分布式计算
|
4月前
|
SQL 分布式计算 MaxCompute
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
|
4月前
|
SQL 分布式计算 DataWorks
MaxCompute操作报错合集之使用sql查询一个表的分区数据时遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之在执行SQL查询时遇到报错,代码为[XX000],该怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
SQL 分布式计算 资源调度
MaxCompute操作报错合集之执行SQL Union All操作时,数据类型产生报错,该怎么解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
132 1
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之执行多条SQL语句时,使用同一个实例来运行,遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL DataWorks NoSQL
DataWorks操作报错合集之在执行带with语句的SQL查询时出现了错误,是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。