1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
DataFrame的操作-使用SQL
3. 实验学时:
4. 实验原理:
在SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。
5. 实验目的:
掌握Spark SQL临时视图的创建。
掌握Spark SQL查询。
6. 实验内容:
使用标准SQL对DataFrame进行操作。具体包含如下内容:
1、临时表创建
- createGlobalTempView(name)
- createOrReplaceGlobalTempView(name)
- createOrReplaceTempView(name)
- createTempView(name)
- registerTempTable(name)
- dropTempView(name)
2、Spark SQL查询
- 加载数据集及处理
- 创建临时表并查询
- 创建永久表并查询
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
8. 实验步骤:
8.1 环境准备
1、在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh 4. $ zeppelin-daemon.sh start
2. 将本实验用到的数据集上传到HDFS上。在终端窗口中,执行以下命令:
1. $ hdfs dfs -mkdir -p /data/dataset/batch 2. $ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/ 3. $ hdfs dfs -put /data/dataset/batch/wc.txt /data/dataset/batch/
3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:
8.2 临时视图的创建
1、使用createGlobalTempView(name)方法为DataFrame创建一个全局的临时表,其生命周期和启动的app的周期一致,既启动的spark应用存在则这个临时的表就一直能访问,直道sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库中。
在zeppelin中执行如下代码:
1. var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv") 2. df.createGlobalTempView("xx") 3. 4. // 查询 5. spark.sql("select * from global_temp.xx").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|Age|Annual Income| 3. +---+------+---+-------------+ 4. | 1| Male| 34| 2000| 5. | 2|Female| 23| 3500| 6. | 3|Female| 26| 2500| 7. | 4|Female| 27| 4500| 8. | 5| Male| 24| 5500| 9. +---+------+---+-------------+ 10. only showing top 5 rows
2、使用createOrReplaceGlobalTempView(name)方法创建或替换视图。上面的方法当遇到已经创建了的临时表名的话会报错。而这个方法遇到已经存在的临时表会进行替换,没有则创建。在zeppelin中执行如下代码:
1. // 创建临时表,存在则覆盖 2. df.createOrReplaceGlobalTempView("XX") 3. 4. // 查询 5. spark.sql("select * from global_temp.XX").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|Age|Annual Income| 3. +---+------+---+-------------+ 4. | 1| Male| 34| 2000| 5. | 2|Female| 23| 3500| 6. | 3|Female| 26| 2500| 7. | 4|Female| 27| 4500| 8. | 5| Male| 24| 5500| 9. +---+------+---+-------------+ 10. only showing top 5 rows
3、使用createOrReplaceTempView(name)方法为DataFrame创建本地的临时视图,其生命周期只限于当前的SparkSession,当调用了SparkSession的stop方法停止SparkSession后,其生命周期就到此为止了。
在zeppelin中执行如下代码:
1. 2. // 创建本地的临时视图 3. df.createOrReplaceTempView("TT") 4. 5. // 查询 6. spark.sql("select * from TT").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|Age|Annual Income| 3. +---+------+---+-------------+ 4. | 1| Male| 34| 2000| 5. | 2|Female| 23| 3500| 6. | 3|Female| 26| 2500| 7. | 4|Female| 27| 4500| 8. | 5| Male| 24| 5500| 9. +---+------+---+-------------+ 10. only showing top 5 rows
4、使用createTempView(name)方法创建临时视图。这个方法在创建临时视图时若遇到已经创建过的视图的名字,会报错。因此需要指定另外的名字。在zeppelin中执行如下代码:
1. 2. // 创建视图 3. df.createTempView("T2") 4. 5. // 查询 6. spark.sql("select * from T2").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|Age|Annual Income| 3. +---+------+---+-------------+ 4. | 1| Male| 34| 2000| 5. | 2|Female| 23| 3500| 6. | 3|Female| 26| 2500| 7. | 4|Female| 27| 4500| 8. | 5| Male| 24| 5500| 9. +---+------+---+-------------+ 10. only showing top 5 rows
8.3 执行Spark SQL查询
1、创建数据,从HDFS上进行数据读取,数据转换为DataFrame类型。在zeppelin中执行如下代码:
1. import org.apache.spark.sql.types._ 2. import org.apache.spark.sql._ 3. 4. // 数据路径 5. var filePath = "/data/dataset/batch/wc.txt" 6. 7. // 创建RDD 8. var rdd1 = sc.textFile(filePath) 9. 10. // RDD进行flatMap操作后进行map操作 11. var rdd2 = rdd1.flatMap( x=>x.split(" ")).map( word=>Row(word,1)) 12. 13. // 指定schema 14. var schema = StructType(List(StructField("word",StringType,true),StructField("count",IntegerType,true))) 15. 16. // 创建DataFrame 17. var wordDF = spark.createDataFrame(rdd2,schema) 18. // 查看df的数据结构信息 19. wordDF.printSchema
【shift+enter】对程序进行输出。输出内容如下所示:
1. root 2. |-- word: string (nullable = true) 3. |-- count: integer (nullable = true)
2、创建临时表,数据查询。在zeppelin中执行如下代码:
1. // 注册临时表 2. wordDF.createOrReplaceTempView("wc") 3. 4. // 从临时表wc中执行sql查询 5. var resultDF = spark.sql("select * from wc") 6. // 展示DF 7. resultDF.show 8. 9. // 从临时表中执行sql查询 10. var resultDF2 = spark.sql("select word,count(*) as total from wc group by word") 11. // 展示DF 12. resultDF2.show
【shift+enter】对程序进行输出。输出内容如下所示:
1. +-----+-----+ 2. | word|count| 3. +-----+-----+ 4. | good| 1| 5. | good| 1| 6. |study| 1| 7. | day| 1| 8. | day| 1| 9. | up| 1| 10. +-----+-----+ 11. 12. +-----+-----+ 13. | word|total| 14. +-----+-----+ 15. | day| 2| 16. |study| 1| 17. | up| 1| 18. | good| 2| 19. +-----+-----+
3、创建永久表,数据查询。在zeppelin中执行如下代码:
1. // 注册为永久表 2. wordDF.write.saveAsTable("wccc") 3. 4. // 从永久表中执行查询 5. var resultDF3 = spark.sql("select * from wccc") 6. // 数据展示 7. resultDF3.show() 8. 9. // 从永久表中执行wccc查询 10. var resultDF4 = spark.sql("select word,count(*) as total from wccc group by word") 11. // 数据展示 12. resultDF4.show()
【shift+enter】对程序进行输出。输出内容如下所示:
1. +-----+-----+ 2. | word|count| 3. +-----+-----+ 4. | good| 1| 5. | good| 1| 6. |study| 1| 7. | day| 1| 8. | day| 1| 9. | up| 1| 10. +-----+-----+ 11. 12. +-----+-----+ 13. | word|total| 14. +-----+-----+ 15. | day| 2| 16. |study| 1| 17. | up| 1| 18. | good| 2| 19. +-----+-----+
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过DataFrame的操作-使用SQL,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。
12、 实验知识测试
1、创建本地临时视图表成功的方法是( C ){单选}
A、createGlobalTempView(name)
B、createOrReplaceGlobalTempView(name)
C、createOrReplaceTempView(name)
D、createTempView(name)
13、实验拓展
给定本地一个文本数据,并上传至HDFS,读取后创建为DataFrame,并创建临时表对数据进行查询。