DataFrame的操作-使用DSL

简介: DataFrame的操作-使用DSL

1. 实验室名称:

大数据实验教学系统

2. 实验项目名称:

DataFrame的操作-使用DSL

3. 实验学时:

4. 实验原理:

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。


0a5145d6532f4495b11a9606f2903194.png


5. 实验目的:

掌握操作DataFrame的各种方法(转换、过滤、排序等)。

 掌握DataFrame存储操作。


6. 实验内容:

操作DataFrame的转换、过滤、排序等方法。具体包含如下内容:

 - select(cols)

 - selectExpr(expr)

 - drop(cols)

 - dropDuplicates(subset=None)

 - dropna(how=’any’,thresh=None,subset=None)

 - filter(condition)

 - where(condition)

 - limit(num)

 - withColumn(colName,col)

 - withColumnRename(existing,new)

 - orderBy(cols,kwargs)


7. 实验器材(设备、虚拟机名称):

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1


8. 实验步骤:

8.1 环境准备

1、右击Ubuntu操作系统桌面,从弹出菜单中选择【Open in Terminal】命令打开终端。

 在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh
4.  $ zeppelin-daemon.sh start

2、将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:

1.  $ hdfs dfs -mkdir -p /data/dataset/batch
2.  $ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/

3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:

0fbc9b83740b4317b594e3cfc065fbe2.png


8.2 DataFrame的方法

1、select(*cols)通过表达式选取DataFRame中符合条件的数据,返回新的DataFrame。在zeppelin中执行如下代码:

1.  // 读取csv数据
2.  var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
3.       
4.  // 通过select统计共有多少条数据
5.  df.select("*").count()

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  df: org.apache.spark.sql.DataFrame = [ID: string, Gener: string ... 2 more fields]
2.  res196: Long = 40

通过表达式选取DataFRame中符合条件的数据。在zeppelin中执行如下代码:

1.  // 选取Age和Gener列,获取前十条数据
2.  df.select("Age","Gener").show(10)
3.       
4.  // 查找Age列,并重命名为age,输出前十条数据
5.  df.select($"Age".alias("age")).show(10)

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---+------+
2.  |Age| Gener|
3.  +---+------+
4.  | 34|  Male|
5.  | 23|Female|
6.  | 26|Female|
7.  | 27|Female|
8.  | 24|  Male|
9.  | 33|  Male|
10. | 34|  Male|
11. | 23|  Male|
12. | 26|  Male|
13. | 27|Female|
14. +---+------+
15. only showing top 10 rows
16. 
17. +---+
18. |age|
19. +---+
20. | 34|
21. | 23|
22. | 26|
23. | 27|
24. | 24|
25. | 33|
26. | 34|
27. | 23|
28. | 26|
29. | 27|
30. +---+
31. only showing top 10 rows

2、selectExpr(*expr),这个方法是select方法的一个变体,可以接受一个SQL表达式,返回新的DataFrame。在zeppelin中执行如下代码:

1.  // 分别求取Age的乘积,Age的平方根。显示前十条数据
2.  df.selectExpr("Age * 2","sqrt(Age)").show(10)

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---------+-------------------------+
2.  |(Age * 2)|SQRT(CAST(Age AS DOUBLE))|
3.  +---------+-------------------------+
4.  |     68.0|        5.830951894845301|
5.  |     46.0|        4.795831523312719|
6.  |     52.0|       5.0990195135927845|
7.  |     54.0|        5.196152422706632|
8.  |     48.0|        4.898979485566356|
9.  |     66.0|        5.744562646538029|
10. |     68.0|        5.830951894845301|
11. |     46.0|        4.795831523312719|
12. |     52.0|       5.0990195135927845|
13. |     54.0|        5.196152422706632|
14. +---------+-------------------------+
15. only showing top 10 rows

3、drop(*cols)按照列名删除DataFrame中的列,返回新的DataFrame。在zeppelin中执行如下代码:

1.  // 打印所有的列名DataFrame.columns.values.tolist()
2.  println(df.columns.toList)
3.       
4.  // 删除Age列名
5.  var df1 = df.drop("Age")
6.       
7.  // 打印删除后的列名
8.  println(df1.columns.toList)

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  List(ID, Gener, Age, Annual Income)
2.  List(ID, Gener, Annual Income)

4、dropDuplicates(subset=None)删除重复行,subset用于指定删除重复行的时候考虑那几列。在zeppelin中执行如下代码:

1.  // 导入所需依赖
2.  import org.apache.spark.sql.Row
3.  import org.apache.spark.sql._
4.  import org.apache.spark.sql.types._
5.       
6.  // 创建RDD,转换为DataFrame
7.  var df = sc.parallelize(List(Row("simple",15,175),Row("simple",15,175),Row("simpleBDP",2,180)),3)
8.  val schema = StructType(
9.        List(
10.         StructField("name", StringType, true),
11.         StructField("age", IntegerType, true),
12.         StructField("height", IntegerType, true)
13.       )
14.     )
15.      
16. var df1 = spark.createDataFrame(df, schema)
17.      
18. // 展示数据
19. // df.show
20.      
21. // 删除重复的数据
22. df1.dropDuplicates().show()
23.      
24. // 指定重复的列进行删除
25.  df1.dropDuplicates(List("age","name")).show()

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---------+---+------+
2.  |     name|age|height|
3.  +---------+---+------+
4.  |simpleBDP|  2|   180|
5.  |   simple| 15|   175|
6.  +---------+---+------+
7.  
8.  +---------+---+------+
9.  |     name|age|height|
10. +---------+---+------+
11. |simpleBDP|  2|   180|
12. |   simple| 15|   175|
13. +---------+---+------+

5、na.drop删除DataFrame中的空数据,加入”any”和”all”指定如何删除控制,加入数字参数指定有多少个空值进行删除,加入字段名删除指定字段中的空值。在zeppelin中执行如下代码:

1.  var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null)))
2.       
3.  val schema = StructType(
4.        List(
5.          StructField("luck", DoubleType, true),
6.          StructField("age", DoubleType, true),
7.          StructField("weight", DoubleType, true)
8.        )
9.      )
10.      
11. // 创建DataFrame
12. var df = spark.createDataFrame(dfs,schema)
13.      
14. // 显示DataFrame
15. df.show()
16.      
17. // 有任意一个为na进行删除
18. df.na.drop("any").show
19.      
20. // 全部为na进行删除
21. df.na.drop("all").show
22.      
23. // 删除有两个na值的数据
24. df.na.drop(2).show()

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +----+----+------+
2.  |luck| age|weight|
3.  +----+----+------+
4.  |null|27.0| 170.0|
5.  |44.0|27.0| 170.0|
6.  |null|null|  null|
7.  +----+----+------+
8.  
9.  +----+----+------+
10. |luck| age|weight|
11. +----+----+------+
12. |44.0|27.0| 170.0|
13. +----+----+------+
14. 
15. +----+----+------+
16. |luck| age|weight|
17. +----+----+------+
18. |null|27.0| 170.0|
19. |44.0|27.0| 170.0|
20. +----+----+------+
21. 
22. +----+----+------+
23. |luck| age|weight|
24. +----+----+------+
25. |null|27.0| 170.0|
26. |44.0|27.0| 170.0|
27. +----+----+------+

6、filter(condition)按照传入的条件进行过滤,其实where方法就是filter方法的一个别名而已。。在zeppelin中执行如下代码:

1.  var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null)))
2.       
3.  val schema = StructType(
4.        List(
5.          StructField("luck", DoubleType, true),
6.          StructField("age", DoubleType, true),
7.          StructField("weight", DoubleType, true)
8.        )
9.      )
10.      
11. // 创建DataFrame
12. var df = spark.createDataFrame(dfs,schema)
13.      
14. // 过滤数据
15. // df.filter($"luck"==44.0).show
16. df.filter("luck is not null").show

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +----+----+------+
2.  |luck| age|weight|
3.  +----+----+------+
4.  |44.0|27.0| 170.0|
5.  +----+----+------+

7、where(condition),这个方法和filter方法类似,更具传入的条件作出选择。在zeppelin中执行如下代码:


1.  // 加载csv数据
2.  var df  =  spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
3.       
4.  // 打印数据
5.  // df.show
6.       
7.  // 数据筛选
8.  df.where("Age >= 30").show

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|Age|Annual Income|
3.  +---+------+---+-------------+
4.  |  1|  Male| 34|         2000|
5.  |  6|  Male| 33|         2000|
6.  |  7|  Male| 34|         3500|
7.  | 12|Female| 33|         3500|
8.  | 13|Female| 34|         2500|
9.  | 18|  Male| 33|         2500|
10. | 19|Female| 34|         4500|
11. | 24|  Male| 33|         4500|
12. | 25|  Male| 34|         5500|
13. | 30|Female| 33|         5500|
14. | 31|  Male| 34|         2000|
15. | 36|Female| 33|         2000|
16. | 37|Female| 34|         3500|
17. +---+------+---+-------------+

8、limit(num)限制返回的数据的条数,防止返回到driver节点的数据过大造成OOM。在zeppelin中执行如下代码:


1.  // 限制返回10条数据
2.  df.limit(10).show

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|Age|Annual Income|
3.  +---+------+---+-------------+
4.  |  1|  Male| 34|         2000|
5.  |  2|Female| 23|         3500|
6.  |  3|Female| 26|         2500|
7.  |  4|Female| 27|         4500|
8.  |  5|  Male| 24|         5500|
9.  |  6|  Male| 33|         2000|
10. |  7|  Male| 34|         3500|
11. |  8|  Male| 23|         2500|
12. |  9|  Male| 26|         4500|
13. | 10|Female| 27|         5500|
14. +---+------+---+-------------+

9、withColumn(colName,col),返回一个新的DataFrame,这个DataFrame中新增加的colNAme的列,如果原来本身就有colName的列,就替换掉。在zeppelin中执行如下代码:

1.  // 添加Age列,Age本身存在,替换掉
2.  df.withColumn("Age",$"Age"*$"Age").show(10)
3.       
4.  // 添加Age2列
5.  df.withColumn("Age2",$"Age"*$"Age").show(10)

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---+------+------+-------------+
2.  | ID| Gener|   Age|Annual Income|
3.  +---+------+------+-------------+
4.  |  1|  Male|1156.0|         2000|
5.  |  2|Female| 529.0|         3500|
6.  |  3|Female| 676.0|         2500|
7.  |  4|Female| 729.0|         4500|
8.  |  5|  Male| 576.0|         5500|
9.  |  6|  Male|1089.0|         2000|
10. |  7|  Male|1156.0|         3500|
11. |  8|  Male| 529.0|         2500|
12. |  9|  Male| 676.0|         4500|
13. | 10|Female| 729.0|         5500|
14. +---+------+------+-------------+
15. only showing top 10 rows
16.      
17. +---+------+---+-------------+------+
18. | ID| Gener|Age|Annual Income|  Age2|
19. +---+------+---+-------------+------+
20. |  1|  Male| 34|         2000|1156.0|
21. |  2|Female| 23|         3500| 529.0|
22. |  3|Female| 26|         2500| 676.0|
23. |  4|Female| 27|         4500| 729.0|
24. |  5|  Male| 24|         5500| 576.0|
25. |  6|  Male| 33|         2000|1089.0|
26. |  7|  Male| 34|         3500|1156.0|
27. |  8|  Male| 23|         2500| 529.0|
28. |  9|  Male| 26|         4500| 676.0|
29. | 10|Female| 27|         5500| 729.0|
30. +---+------+---+-------------+------+
31. only showing top 10 rows

10、withColumnRename(existing,new),对已经存在的列明重命名为new,若名称不存在则这个操作不做任何事情。在zeppelin中执行如下代码:

1.  // 修改Age的列名
2.  df.withColumnRenamed("Age","age").show(10)

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|age|Annual Income|
3.  +---+------+---+-------------+
4.  |  1|  Male| 34|         2000|
5.  |  2|Female| 23|         3500|
6.  |  3|Female| 26|         2500|
7.  |  4|Female| 27|         4500|
8.  |  5|  Male| 24|         5500|
9.  |  6|  Male| 33|         2000|
10. |  7|  Male| 34|         3500|
11. |  8|  Male| 23|         2500|
12. |  9|  Male| 26|         4500|
13. | 10|Female| 27|         5500|
14. +---+------+---+-------------+
15. only showing top 10 rows

11、orderBy(cols,*kwargs),返回按照指定列排好序的新的DataFrame。在zeppelin中执行如下代码:

1.  // 对年龄进行排序
2.  df.orderBy("Age").show(5)
3.       
4.  // 降序排序
5.  df.orderBy($"Age".desc).show(5)

同时按下【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|Age|Annual Income|
3.  +---+------+---+-------------+
4.  |  8|  Male| 23|         2500|
5.  | 14|  Male| 23|         4500|
6.  |  2|Female| 23|         3500|
7.  | 32|  Male| 23|         3500|
8.  | 20|Female| 23|         5500|
9.  +---+------+---+-------------+
10. only showing top 5 rows
11.      
12. +---+------+---+-------------+
13. | ID| Gener|Age|Annual Income|
14. +---+------+---+-------------+
15. |  7|  Male| 34|         3500|
16. | 13|Female| 34|         2500|
17. |  1|  Male| 34|         2000|
18. | 31|  Male| 34|         2000|
19. | 19|Female| 34|         4500|
20. +---+------+---+-------------+
21. only showing top 5 rows

使用ascending关键字参数指定升降序排列。除了这种方式,还可以通过pyspark.sql.functions中定义好的desc降序和asc方法来排序。在zeppelin中执行如下代码:

1.  import org.apache.spark.sql.functions._
2.       
3.  // 对年龄进行排序
4.  df.orderBy(desc("Age")).show(5)
5.       
6.  // 降序排序
7.  df.orderBy($"Age".desc).show(5)
8.       
9.  // 对年龄进行排序
10. df.orderBy(asc("Age")).show(5)
11.      
12. // 生序排序
13. df.orderBy($"Age".asc).show(5)

同时按下【shift+enter】对程序进行输出。输出内容如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|Age|Annual Income|
3.  +---+------+---+-------------+
4.  |  7|  Male| 34|         3500|
5.  | 13|Female| 34|         2500|
6.  |  1|  Male| 34|         2000|
7.  | 31|  Male| 34|         2000|
8.  | 19|Female| 34|         4500|
9.  +---+------+---+-------------+
10. only showing top 5 rows
11.      
12. +---+------+---+-------------+
13. | ID| Gener|Age|Annual Income|
14. +---+------+---+-------------+
15. |  7|  Male| 34|         3500|
16. | 13|Female| 34|         2500|
17. |  1|  Male| 34|         2000|
18. | 31|  Male| 34|         2000|
19. | 19|Female| 34|         4500|
20. +---+------+---+-------------+
21. only showing top 5 rows
22.      
23. +---+------+---+-------------+
24. | ID| Gener|Age|Annual Income|
25. +---+------+---+-------------+
26. |  8|  Male| 23|         2500|
27. | 14|  Male| 23|         4500|
28. |  2|Female| 23|         3500|
29. | 32|  Male| 23|         3500|
30. | 20|Female| 23|         5500|
31. +---+------+---+-------------+
32. only showing top 5 rows
33.      
34. +---+------+---+-------------+
35. | ID| Gener|Age|Annual Income|
36. +---+------+---+-------------+
37. |  8|  Male| 23|         2500|
38. | 14|  Male| 23|         4500|
39. |  2|Female| 23|         3500|
40. | 32|  Male| 23|         3500|
41. | 20|Female| 23|         5500|
42. +---+------+---+-------------+
43. only showing top 5 rows

sort方法和orderBy方法类似。在zeppelin中执行如下代码:

1.  import org.apache.spark.sql.functions._
2.       
3.  // 降序
4.  df.sort(desc("Age")).show(5)
5.       
6.  // 升序
7.  df.sort(asc("Age")).show(5)

同时按下【shift+enter】对程序进行输出。输出内容如下所示:

1.  +---+------+---+-------------+
2.  | ID| Gener|Age|Annual Income|
3.  +---+------+---+-------------+
4.  |  7|  Male| 34|         3500|
5.  | 13|Female| 34|         2500|
6.  |  1|  Male| 34|         2000|
7.  | 31|  Male| 34|         2000|
8.  | 19|Female| 34|         4500|
9.  +---+------+---+-------------+
10. only showing top 5 rows
11.      
12. +---+------+---+-------------+
13. | ID| Gener|Age|Annual Income|
14. +---+------+---+-------------+
15. |  8|  Male| 23|         2500|
16. | 14|  Male| 23|         4500|
17. |  2|Female| 23|         3500|
18. | 32|  Male| 23|         3500|
19. | 20|Female| 23|         5500|
20. +---+------+---+-------------+
21. only showing top 5 rows

9. 实验结果及分析:

实验结果运行准确,无误


10. 实验结论:

经过本节实验的学习,通过DataFrame的操作-使用DSL,进一步巩固了我们的Spark基础。


11. 总结及心得体会:

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。


12、 实验测试

1、数据写入MySQL中mode=’append’的意思是什么( A ){单选}

 A、追加

 B、覆盖

 C、修改

 D、删除


13、实验拓展

1、给定给一个文本数据,将数据转换为DataFrame类型,并将数据写入到MySQL 中。

879c95b0db3f402ea0694cdada2b6abe.png

相关文章
|
20天前
|
存储 数据挖掘 数据处理
掌握Pandas核心数据结构:Series与DataFrame的四种创建方式
本文介绍了 Pandas 库中核心数据结构 Series 和 DataFrame 的四种创建方法,包括从列表、字典、标量和 NumPy 数组创建 Series,以及从字典、列表的列表、NumPy 数组和 Series 字典创建 DataFrame,通过示例详细说明了每种创建方式的具体应用。
123 67
|
1月前
|
Python
|
7月前
|
数据处理 索引 Python
使用pandas的merge()和join()函数进行数据处理
使用pandas的merge()和join()函数进行数据处理
125 2
|
3月前
|
数据采集 机器学习/深度学习 数据处理
DataFrame 操作
DataFrame 操作
150 1
|
3月前
|
数据挖掘 数据处理 Python
Pandas中的数据聚合神器:agg 方法
Pandas中的数据聚合神器:agg 方法
112 0
|
JSON 数据可视化 数据挖掘
python数据可视化开发(2):pandas读取Excel的数据格式处理(数据读取、指定列数据、DataFrame转json、数学运算、透视表运算输出)
python数据可视化开发(2):pandas读取Excel的数据格式处理(数据读取、指定列数据、DataFrame转json、数学运算、透视表运算输出)
387 0
|
7月前
|
人工智能 程序员 数据处理
Pandas数据处理3、DataFrame去重函数drop_duplicates()详解
Pandas数据处理3、DataFrame去重函数drop_duplicates()详解
173 0
Pandas数据处理3、DataFrame去重函数drop_duplicates()详解
|
7月前
|
人工智能 程序员 数据处理
Pandas数据处理2、DataFrame的drop函数具体参数使用详情
Pandas数据处理2、DataFrame的drop函数具体参数使用详情
97 0
|
分布式计算 Spark
199 Spark DataFrame常用操作- DSL风格语法
199 Spark DataFrame常用操作- DSL风格语法
89 0
199 Spark DataFrame常用操作- DSL风格语法
|
Python
dataframe操作查询
Pandas提供了多种查询方法,以下是一些常见的方法: 使用df.loc方法,根据行、列的标签值查询。 使用df.iloc方法,根据行、列的数字位置查询。 使用df.where方法,根据条件过滤数据。 使用df.query方法,根据字符串表达式查询数据。
724 0