1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
DataFrame持久存储
3. 实验学时:
4. 实验原理:
DataFrame数据经过计算以后,可以持久到外部存储中,如关系型数据库和HDFS中。Spark对此提供了支持。
5. 实验目的:
掌握DataFrame存储操作。
6. 实验内容:
将DataFrame持久存储。具体包含如下内容:
- 写入MySQL
- 写入HDFS
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
8. 实验步骤:
8.1 环境准备
1、右击Ubuntu操作系统桌面,从弹出菜单中选择【Open in Terminal】命令打开终端。
在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh 4. $ zeppelin-daemon.sh start
2、将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:
1. $ hdfs dfs -mkdir -p /data/dataset/batch 2. $ hdfs dfs -put /data/dataset/batch/Online-Retail.txt /data/dataset/batch/
3、因为后面的实验中需要访问MySQL数据库,所以先要将MySQL的jdbc驱动程序拷贝到Spark的jars目录下。在终端窗口,执行如下的命令:
1. $ cp /data/software/mysql-connector-java-5.1.45-bin.jar /opt/spark/jars/
4、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:
8.2 数据存储
1、读取数据,生成RDD,创建DataFrame。在zeppelin中执行如下代码:
1. import org.apache.spark.sql.types._ 2. import org.apache.spark.sql._ 3. import org.apache.spark.sql.functions._ 4. 5. // 数据路径 6. var filePath = "/data/dataset/batch/Online-Retail.txt" 7. 8. // 加载RDD 9. var inFileRDD= sc.textFile(filePath) 10. // 以制表符进行分割 11. var allRowsRDD=inFileRDD.map(x=> x.split("\t")) 12. 13. // 获取RDD的第一条数据头标签 14. var header = allRowsRDD.first() 15. 16. // 去除标题行 17. var data = allRowsRDD.filter(x => x(0) != header(0)) 18. 19. // 创建Schema 20. var fields= List(StructField("invoiceNo", StringType, true), 21. StructField("stockCode", StringType, true), 22. StructField("description", StringType, true), 23. StructField("quantity", IntegerType, true), 24. StructField("invoiceDate", StringType, true), 25. StructField("unitPrice", DoubleType, true), 26. StructField("customerID", StringType, true), 27. StructField("country", StringType, true) 28. ) 29. val schema = StructType(fields) 30. 31. 32. // 将RDD中的每行数据转换为Row对象 33. var rowRDD = data.map( x => Row(x(0),x(1),x(2),x(3).toInt,x(4),x(5).toDouble,x(6),x(7))) 34. 35. // 创建DataFrame 36. var r1DF = spark.createDataFrame(rowRDD,schema) 37. // 显示DataFrame数据 38. r1DF.show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:
1. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+ 2. |invoiceNo|stockCode| description|quantity| invoiceDate|unitPrice|customerID| country| 3. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+ 4. | 536365| 85123A|WHITE HANGING HEA...| 6|2010/12/1 8:26| 2.55| 17850|United Kingdom| 5. | 536365| 71053| WHITE METAL LANTERN| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom| 6. | 536365| 84406B|CREAM CUPID HEART...| 8|2010/12/1 8:26| 2.75| 17850|United Kingdom| 7. | 536365| 84029G|KNITTED UNION FLA...| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom| 8. | 536365| 84029E|RED WOOLLY HOTTIE...| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom| 9. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+ 10. only showing top 5 rows
2、数据类型转换,创建本地视图,调用sql语句进行查询。在zeppelin中执行如下代码:
1. // 将invoiceDate列强制转换为时间类型 2. var ts = unix_timestamp($"invoiceDate","yyyy/MM/dd HH:mm").cast("timestamp") 3. 4. // 为DataFrame添加一列 5. var r2DF = r1DF.withColumn("ts",ts) 6. // 显示添加后的数据 7. r2DF.show(5)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+ 2. |invoiceNo|stockCode| description|quantity| invoiceDate|unitPrice|customerID| country| ts| 3. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+ 4. | 536365| 85123A|WHITE HANGING HEA...| 6|2010/12/1 8:26| 2.55| 17850|United Kingdom|2010-12-01 08:26:00| 5. | 536365| 71053| WHITE METAL LANTERN| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom|2010-12-01 08:26:00| 6. | 536365| 84406B|CREAM CUPID HEART...| 8|2010/12/1 8:26| 2.75| 17850|United Kingdom|2010-12-01 08:26:00| 7. | 536365| 84029G|KNITTED UNION FLA...| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom|2010-12-01 08:26:00| 8. | 536365| 84029E|RED WOOLLY HOTTIE...| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom|2010-12-01 08:26:00| 9. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+ 10. only showing top 5 rows
3、对数据进行查询,选择满足条件的数据。在zeppelin中执行如下代码:
1. import java.util.Properties 2. 3. // 创建本地临时视图 4. r2DF.createOrReplaceTempView("retailTable") 5. // 查找时间小于2011-12-01的数据 6. var r3DF = spark.sql("select * from retailTable where ts<\"2011-12-01\"") 7. 8. // 查找时间大于等于2011-12-01的数据 9. var r4DF = spark.sql("select * from retailTable where ts>=\"2011-12-01\"") 10. 11. // 选取数据 12. var selectData = r4DF.select("invoiceNo","stockCode","description","quantity","unitPrice","customerID","country","ts") 13. 14. // 修改列的名字 15. var writeMySQL = selectData.withColumnRenamed("ts","invoiceDate") 16. // 显示修改后的DataFrame 17. writeMySQL.show(5)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---------+---------+--------------------+--------+---------+----------+--------------+-------------------+ 2. |invoiceNo|stockCode| description|quantity|unitPrice|customerID| country| invoiceDate| 3. +---------+---------+--------------------+--------+---------+----------+--------------+-------------------+ 4. | C579889| 23245|SET OF 3 REGENCY ...| -8| 4.15| 13853|United Kingdom|2011-12-01 08:12:00| 5. | C579890| 84947|ANTIQUE SILVER TE...| -1| 1.25| 15197|United Kingdom|2011-12-01 08:14:00| 6. | C579890| 23374|RED SPOT PAPER GI...| -1| 0.82| 15197|United Kingdom|2011-12-01 08:14:00| 7. | C579890| 84945|MULTI COLOUR SILV...| -2| 0.85| 15197|United Kingdom|2011-12-01 08:14:00| 8. | C579891| 23485|BOTANICAL GARDENS...| -1| 25.0| 13644|United Kingdom|2011-12-01 08:18:00| 9. +---------+---------+--------------------+--------+---------+----------+--------------+-------------------+ 10. only showing top 5 rows
4、将查询到的数据存储到MySQL中。
(1)首先启动MySQL服务器。在终端窗口中,执行以下命令:
1. $ service mysql start
(2)登录MySQL服务器。在终端窗口中,执行以下命令:
1. $ mysql -u root -p
然后根据提示,输入登录密码:root。
(3)执行以下SQL语句,创建测试表:
1. mysql> create database retailDB; 2. mysql> exit;
(4)在zeppelin中执行如下代码:
1. // 将DataFrame数据存储到数据库中 2. val prop = new Properties() 3. prop.setProperty("user", "root") 4. prop.setProperty("password", "root") 5. 6. writeMySQL.write.mode("append").jdbc("jdbc:mysql://localhost:3306/retailDB?characterEncoding=UTF-8", "transactions", prop)
验证保存成功。进入MySQL,通过【select count(*) from transactions;】来查看写入MySQL的数据条数。
5、将DataFrame存储到HDFS中。选择满足条件的数据,将数据写入到HDFS中。在zeppelin中执行如下代码:
1. var selectData = r3DF.select("invoiceNo","stockCode","description","quantity","unitPrice","customerID","country","ts") 2. var writeHDFS = selectData.withColumnRenamed("ts","invoiceDate") 3. writeHDFS.select("*").write.format("json").save("/Users/r3DF")
同时按下【shift+enter】对程序进行输出。
验证保存到HDFS中成功。在终端窗口下,执行以下命令,查看写入HDFS的json数据:
1. # hdfs dfs -ls /Users/r3DF
可以看到已经写入成功,如下图所示:
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过学习DataFrame持久存储,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。
12、 实验测试
1、数据写入MySQL中mode=’append’的意思是什么( A ){单选}
A、追加
B、覆盖
C、修改
D、删除
13、实验拓展
1、给定给一个文本数据,将数据转换为DataFrame类型,并将数据写入到MySQL 中。