DataFrame持久存储

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: DataFrame持久存储

1. 实验室名称:

大数据实验教学系统

2. 实验项目名称:

DataFrame持久存储

3. 实验学时:

4. 实验原理:

DataFrame数据经过计算以后,可以持久到外部存储中,如关系型数据库和HDFS中。Spark对此提供了支持。

5. 实验目的:

掌握DataFrame存储操作。

6. 实验内容:

将DataFrame持久存储。具体包含如下内容:

  - 写入MySQL

  - 写入HDFS

7. 实验器材(设备、虚拟机名称):

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1

8. 实验步骤:

8.1 环境准备

1、右击Ubuntu操作系统桌面,从弹出菜单中选择【Open in Terminal】命令打开终端。

 在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:


1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh
4.  $ zeppelin-daemon.sh start

2、将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:

1.  $ hdfs dfs -mkdir -p /data/dataset/batch
2.  $ hdfs dfs -put /data/dataset/batch/Online-Retail.txt /data/dataset/batch/

3、因为后面的实验中需要访问MySQL数据库,所以先要将MySQL的jdbc驱动程序拷贝到Spark的jars目录下。在终端窗口,执行如下的命令:

1.  $ cp /data/software/mysql-connector-java-5.1.45-bin.jar /opt/spark/jars/

4、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:


6c94bea2b8894e36b4507ea58ee68394.png

8.2 数据存储

1、读取数据,生成RDD,创建DataFrame。在zeppelin中执行如下代码:

1.  import org.apache.spark.sql.types._
2.  import org.apache.spark.sql._
3.  import org.apache.spark.sql.functions._
4.       
5.  // 数据路径
6.  var filePath = "/data/dataset/batch/Online-Retail.txt"
7.       
8.  // 加载RDD
9.  var inFileRDD= sc.textFile(filePath)
10. // 以制表符进行分割
11. var allRowsRDD=inFileRDD.map(x=> x.split("\t"))
12.      
13. // 获取RDD的第一条数据头标签
14. var header = allRowsRDD.first()
15.      
16. // 去除标题行
17. var data = allRowsRDD.filter(x => x(0) != header(0))
18.      
19. // 创建Schema
20. var fields= List(StructField("invoiceNo", StringType, true),
21.                  StructField("stockCode", StringType, true),
22.                  StructField("description", StringType, true),
23.                  StructField("quantity", IntegerType, true),
24.                  StructField("invoiceDate", StringType, true),
25.                  StructField("unitPrice", DoubleType, true),
26.                  StructField("customerID", StringType, true),
27.                  StructField("country", StringType, true)
28.       )
29. val schema = StructType(fields)
30.      
31. 
32. // 将RDD中的每行数据转换为Row对象
33. var rowRDD = data.map( x => Row(x(0),x(1),x(2),x(3).toInt,x(4),x(5).toDouble,x(6),x(7)))
34.      
35. // 创建DataFrame
36. var r1DF = spark.createDataFrame(rowRDD,schema)
37. // 显示DataFrame数据
38.  r1DF.show(5)

同时按下【shift+enter】对程序进行输出。输出内容如下所示:

1.  +---------+---------+--------------------+--------+--------------+---------+----------+--------------+
2.  |invoiceNo|stockCode|         description|quantity|   invoiceDate|unitPrice|customerID|       country|
3.  +---------+---------+--------------------+--------+--------------+---------+----------+--------------+
4.  |   536365|   85123A|WHITE HANGING HEA...|       6|2010/12/1 8:26|     2.55|     17850|United Kingdom|
5.  |   536365|    71053| WHITE METAL LANTERN|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|
6.  |   536365|   84406B|CREAM CUPID HEART...|       8|2010/12/1 8:26|     2.75|     17850|United Kingdom|
7.  |   536365|   84029G|KNITTED UNION FLA...|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|
8.  |   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|
9.  +---------+---------+--------------------+--------+--------------+---------+----------+--------------+
10. only showing top 5 rows

2、数据类型转换,创建本地视图,调用sql语句进行查询。在zeppelin中执行如下代码:

1.  // 将invoiceDate列强制转换为时间类型
2.  var ts = unix_timestamp($"invoiceDate","yyyy/MM/dd HH:mm").cast("timestamp")
3.       
4.  // 为DataFrame添加一列
5.  var r2DF = r1DF.withColumn("ts",ts)
6.  // 显示添加后的数据
7.  r2DF.show(5)

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
2.  |invoiceNo|stockCode|         description|quantity|   invoiceDate|unitPrice|customerID|       country|                 ts|
3.  +---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
4.  |   536365|   85123A|WHITE HANGING HEA...|       6|2010/12/1 8:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|
5.  |   536365|    71053| WHITE METAL LANTERN|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
6.  |   536365|   84406B|CREAM CUPID HEART...|       8|2010/12/1 8:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|
7.  |   536365|   84029G|KNITTED UNION FLA...|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
8.  |   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
9.  +---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
10. only showing top 5 rows

3、对数据进行查询,选择满足条件的数据。在zeppelin中执行如下代码:

1.  import java.util.Properties
2.       
3.  // 创建本地临时视图
4.  r2DF.createOrReplaceTempView("retailTable")
5.  // 查找时间小于2011-12-01的数据
6.  var r3DF = spark.sql("select * from retailTable where ts<\"2011-12-01\"")
7.       
8.  // 查找时间大于等于2011-12-01的数据
9.  var r4DF = spark.sql("select * from retailTable where ts>=\"2011-12-01\"")
10.      
11. // 选取数据
12. var selectData =  r4DF.select("invoiceNo","stockCode","description","quantity","unitPrice","customerID","country","ts")
13.      
14. // 修改列的名字
15. var writeMySQL = selectData.withColumnRenamed("ts","invoiceDate")
16. // 显示修改后的DataFrame
17. writeMySQL.show(5)

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
2.  |invoiceNo|stockCode|         description|quantity|unitPrice|customerID|       country|        invoiceDate|
3.  +---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
4.  |  C579889|    23245|SET OF 3 REGENCY ...|      -8|     4.15|     13853|United Kingdom|2011-12-01 08:12:00|
5.  |  C579890|    84947|ANTIQUE SILVER TE...|      -1|     1.25|     15197|United Kingdom|2011-12-01 08:14:00|
6.  |  C579890|    23374|RED SPOT PAPER GI...|      -1|     0.82|     15197|United Kingdom|2011-12-01 08:14:00|
7.  |  C579890|    84945|MULTI COLOUR SILV...|      -2|     0.85|     15197|United Kingdom|2011-12-01 08:14:00|
8.  |  C579891|    23485|BOTANICAL GARDENS...|      -1|     25.0|     13644|United Kingdom|2011-12-01 08:18:00|
9.  +---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
10. only showing top 5 rows

4、将查询到的数据存储到MySQL中。

 (1)首先启动MySQL服务器。在终端窗口中,执行以下命令:

1.  $ service mysql start

(2)登录MySQL服务器。在终端窗口中,执行以下命令:

1.  $ mysql -u root -p

然后根据提示,输入登录密码:root。

 (3)执行以下SQL语句,创建测试表:

1.  mysql> create database retailDB;
2.  mysql> exit;

(4)在zeppelin中执行如下代码:


1.  // 将DataFrame数据存储到数据库中
2.  val prop = new Properties()
3.  prop.setProperty("user", "root")
4.  prop.setProperty("password", "root")
5.       
6.  writeMySQL.write.mode("append").jdbc("jdbc:mysql://localhost:3306/retailDB?characterEncoding=UTF-8", "transactions", prop)

验证保存成功。进入MySQL,通过【select count(*) from transactions;】来查看写入MySQL的数据条数。


5、将DataFrame存储到HDFS中。选择满足条件的数据,将数据写入到HDFS中。在zeppelin中执行如下代码:

1.  var selectData = r3DF.select("invoiceNo","stockCode","description","quantity","unitPrice","customerID","country","ts")
2.  var writeHDFS = selectData.withColumnRenamed("ts","invoiceDate")
3.  writeHDFS.select("*").write.format("json").save("/Users/r3DF")

同时按下【shift+enter】对程序进行输出。

 验证保存到HDFS中成功。在终端窗口下,执行以下命令,查看写入HDFS的json数据:

1.  # hdfs dfs -ls /Users/r3DF

可以看到已经写入成功,如下图所示:


1013f4ddbc2040558b6b69c9ff11f890.png


9. 实验结果及分析:

实验结果运行准确,无误


10. 实验结论:

经过本节实验的学习,通过学习DataFrame持久存储,进一步巩固了我们的Spark基础。


11. 总结及心得体会:

SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。


12、 实验测试

1、数据写入MySQL中mode=’append’的意思是什么( A ){单选}

 A、追加

 B、覆盖

 C、修改

 D、删除


13、实验拓展

1、给定给一个文本数据,将数据转换为DataFrame类型,并将数据写入到MySQL 中。


34f2257444904fb4be7d73efd02e4438.png

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
机器学习/深度学习 JSON 数据格式
CatBoost模型部署与在线预测教程
CatBoost模型部署与在线预测教程【2月更文挑战第16天】
600 2
|
编解码 数据挖掘 开发者
Pandas数据导出:CSV文件
Pandas是Python中强大的数据分析库,提供了灵活的数据结构如DataFrame和Series。通过`to_csv()`函数可轻松将数据保存为CSV文件。本文介绍了基本用法、常见问题(如编码、索引、分隔符等)及解决方案,并涵盖大文件处理和报错解决方法,帮助用户高效导出数据。
1275 83
|
机器学习/深度学习 供应链 监控
ERP系统中的供应链可视化与智能预测解析
【7月更文挑战第25天】 ERP系统中的供应链可视化与智能预测解析
577 5
|
边缘计算 运维 5G
|
传感器 人工智能 监控
|
Python
使用Python pandas的sort_values()方法可按一个或多个列对DataFrame排序
【5月更文挑战第2天】使用Python pandas的sort_values()方法可按一个或多个列对DataFrame排序。示例代码展示了如何按'Name'和'Age'列排序 DataFrame。先按'Name'排序,再按'Age'排序。sort_values()的by参数接受列名列表,ascending参数控制排序顺序(默认升序),inplace参数决定是否直接修改原DataFrame。
1589 1
|
索引 Python
Pandas学习笔记之Dataframe
Pandas学习笔记之Dataframe
1760 2
|
缓存 分布式计算 资源调度
MapReduce入门(一篇就够了)
MapReduce入门(一篇就够了)
10450 1
MapReduce入门(一篇就够了)
|
API 索引 Python
【Pandas】已完美解决:AttributeError: ‘DataFrame‘ object has no attribute ‘ix‘
【Pandas】已完美解决:AttributeError: ‘DataFrame‘ object has no attribute ‘ix‘
913 0
|
机器学习/深度学习 传感器 算法
基于Mediapipe深度学习算法的手势识别系统【含python源码+PyqtUI界面+原理详解】-python手势识别 深度学习实战项目
基于Mediapipe深度学习算法的手势识别系统【含python源码+PyqtUI界面+原理详解】-python手势识别 深度学习实战项目

热门文章

最新文章