1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
DataFrame的操作-使用DSL
3. 实验学时:
4. 实验原理:
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
5. 实验目的:
掌握操作DataFrame的各种方法(转换、过滤、排序等)。
掌握DataFrame存储操作。
6. 实验内容:
操作DataFrame的转换、过滤、排序等方法。具体包含如下内容:
- select(cols)
- selectExpr(expr)
- drop(cols)
- dropDuplicates(subset=None)
- dropna(how=’any’,thresh=None,subset=None)
- filter(condition)
- where(condition)
- limit(num)
- withColumn(colName,col)
- withColumnRename(existing,new)
- orderBy(cols,kwargs)
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
8. 实验步骤:
8.1 环境准备
1、右击Ubuntu操作系统桌面,从弹出菜单中选择【Open in Terminal】命令打开终端。
在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh 4. $ zeppelin-daemon.sh start
2、将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:
1. $ hdfs dfs -mkdir -p /data/dataset/batch 2. $ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/
3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:
8.2 DataFrame的方法
1、select(*cols)通过表达式选取DataFRame中符合条件的数据,返回新的DataFrame。在zeppelin中执行如下代码:
1. // 读取csv数据 2. var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv") 3. 4. // 通过select统计共有多少条数据 5. df.select("*").count()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. df: org.apache.spark.sql.DataFrame = [ID: string, Gener: string ... 2 more fields] 2. res196: Long = 40
通过表达式选取DataFRame中符合条件的数据。在zeppelin中执行如下代码:
1. // 选取Age和Gener列,获取前十条数据 2. df.select("Age","Gener").show(10) 3. 4. // 查找Age列,并重命名为age,输出前十条数据 5. df.select($"Age".alias("age")).show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+ 2. |Age| Gener| 3. +---+------+ 4. | 34| Male| 5. | 23|Female| 6. | 26|Female| 7. | 27|Female| 8. | 24| Male| 9. | 33| Male| 10. | 34| Male| 11. | 23| Male| 12. | 26| Male| 13. | 27|Female| 14. +---+------+ 15. only showing top 10 rows 16. 17. +---+ 18. |age| 19. +---+ 20. | 34| 21. | 23| 22. | 26| 23. | 27| 24. | 24| 25. | 33| 26. | 34| 27. | 23| 28. | 26| 29. | 27| 30. +---+ 31. only showing top 10 rows
2、selectExpr(*expr),这个方法是select方法的一个变体,可以接受一个SQL表达式,返回新的DataFrame。在zeppelin中执行如下代码:
1. // 分别求取Age的乘积,Age的平方根。显示前十条数据 2. df.selectExpr("Age * 2","sqrt(Age)").show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---------+-------------------------+ 2. |(Age * 2)|SQRT(CAST(Age AS DOUBLE))| 3. +---------+-------------------------+ 4. | 68.0| 5.830951894845301| 5. | 46.0| 4.795831523312719| 6. | 52.0| 5.0990195135927845| 7. | 54.0| 5.196152422706632| 8. | 48.0| 4.898979485566356| 9. | 66.0| 5.744562646538029| 10. | 68.0| 5.830951894845301| 11. | 46.0| 4.795831523312719| 12. | 52.0| 5.0990195135927845| 13. | 54.0| 5.196152422706632| 14. +---------+-------------------------+ 15. only showing top 10 rows
3、drop(*cols)按照列名删除DataFrame中的列,返回新的DataFrame。在zeppelin中执行如下代码:
1. // 打印所有的列名DataFrame.columns.values.tolist() 2. println(df.columns.toList) 3. 4. // 删除Age列名 5. var df1 = df.drop("Age") 6. 7. // 打印删除后的列名 8. println(df1.columns.toList)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. List(ID, Gener, Age, Annual Income) 2. List(ID, Gener, Annual Income)
4、dropDuplicates(subset=None)删除重复行,subset用于指定删除重复行的时候考虑那几列。在zeppelin中执行如下代码:
1. // 导入所需依赖 2. import org.apache.spark.sql.Row 3. import org.apache.spark.sql._ 4. import org.apache.spark.sql.types._ 5. 6. // 创建RDD,转换为DataFrame 7. var df = sc.parallelize(List(Row("simple",15,175),Row("simple",15,175),Row("simpleBDP",2,180)),3) 8. val schema = StructType( 9. List( 10. StructField("name", StringType, true), 11. StructField("age", IntegerType, true), 12. StructField("height", IntegerType, true) 13. ) 14. ) 15. 16. var df1 = spark.createDataFrame(df, schema) 17. 18. // 展示数据 19. // df.show 20. 21. // 删除重复的数据 22. df1.dropDuplicates().show() 23. 24. // 指定重复的列进行删除 25. df1.dropDuplicates(List("age","name")).show()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---------+---+------+ 2. | name|age|height| 3. +---------+---+------+ 4. |simpleBDP| 2| 180| 5. | simple| 15| 175| 6. +---------+---+------+ 7. 8. +---------+---+------+ 9. | name|age|height| 10. +---------+---+------+ 11. |simpleBDP| 2| 180| 12. | simple| 15| 175| 13. +---------+---+------+
5、na.drop删除DataFrame中的空数据,加入”any”和”all”指定如何删除控制,加入数字参数指定有多少个空值进行删除,加入字段名删除指定字段中的空值。在zeppelin中执行如下代码:
1. var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null))) 2. 3. val schema = StructType( 4. List( 5. StructField("luck", DoubleType, true), 6. StructField("age", DoubleType, true), 7. StructField("weight", DoubleType, true) 8. ) 9. ) 10. 11. // 创建DataFrame 12. var df = spark.createDataFrame(dfs,schema) 13. 14. // 显示DataFrame 15. df.show() 16. 17. // 有任意一个为na进行删除 18. df.na.drop("any").show 19. 20. // 全部为na进行删除 21. df.na.drop("all").show 22. 23. // 删除有两个na值的数据 24. df.na.drop(2).show()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +----+----+------+ 2. |luck| age|weight| 3. +----+----+------+ 4. |null|27.0| 170.0| 5. |44.0|27.0| 170.0| 6. |null|null| null| 7. +----+----+------+ 8. 9. +----+----+------+ 10. |luck| age|weight| 11. +----+----+------+ 12. |44.0|27.0| 170.0| 13. +----+----+------+ 14. 15. +----+----+------+ 16. |luck| age|weight| 17. +----+----+------+ 18. |null|27.0| 170.0| 19. |44.0|27.0| 170.0| 20. +----+----+------+ 21. 22. +----+----+------+ 23. |luck| age|weight| 24. +----+----+------+ 25. |null|27.0| 170.0| 26. |44.0|27.0| 170.0| 27. +----+----+------+
6、filter(condition)按照传入的条件进行过滤,其实where方法就是filter方法的一个别名而已。。在zeppelin中执行如下代码:
1. var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null))) 2. 3. val schema = StructType( 4. List( 5. StructField("luck", DoubleType, true), 6. StructField("age", DoubleType, true), 7. StructField("weight", DoubleType, true) 8. ) 9. ) 10. 11. // 创建DataFrame 12. var df = spark.createDataFrame(dfs,schema) 13. 14. // 过滤数据 15. // df.filter($"luck"==44.0).show 16. df.filter("luck is not null").show
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +----+----+------+ 2. |luck| age|weight| 3. +----+----+------+ 4. |44.0|27.0| 170.0| 5. +----+----+------+
7、where(condition),这个方法和filter方法类似,更具传入的条件作出选择。在zeppelin中执行如下代码:
1. // 加载csv数据 2. var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv") 3. 4. // 打印数据 5. // df.show 6. 7. // 数据筛选 8. df.where("Age >= 30").show
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|Age|Annual Income| 3. +---+------+---+-------------+ 4. | 1| Male| 34| 2000| 5. | 6| Male| 33| 2000| 6. | 7| Male| 34| 3500| 7. | 12|Female| 33| 3500| 8. | 13|Female| 34| 2500| 9. | 18| Male| 33| 2500| 10. | 19|Female| 34| 4500| 11. | 24| Male| 33| 4500| 12. | 25| Male| 34| 5500| 13. | 30|Female| 33| 5500| 14. | 31| Male| 34| 2000| 15. | 36|Female| 33| 2000| 16. | 37|Female| 34| 3500| 17. +---+------+---+-------------+
8、limit(num)限制返回的数据的条数,防止返回到driver节点的数据过大造成OOM。在zeppelin中执行如下代码:
1. // 限制返回10条数据 2. df.limit(10).show
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|Age|Annual Income| 3. +---+------+---+-------------+ 4. | 1| Male| 34| 2000| 5. | 2|Female| 23| 3500| 6. | 3|Female| 26| 2500| 7. | 4|Female| 27| 4500| 8. | 5| Male| 24| 5500| 9. | 6| Male| 33| 2000| 10. | 7| Male| 34| 3500| 11. | 8| Male| 23| 2500| 12. | 9| Male| 26| 4500| 13. | 10|Female| 27| 5500| 14. +---+------+---+-------------+
9、withColumn(colName,col),返回一个新的DataFrame,这个DataFrame中新增加的colNAme的列,如果原来本身就有colName的列,就替换掉。在zeppelin中执行如下代码:
1. // 添加Age列,Age本身存在,替换掉 2. df.withColumn("Age",$"Age"*$"Age").show(10) 3. 4. // 添加Age2列 5. df.withColumn("Age2",$"Age"*$"Age").show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+------+-------------+ 2. | ID| Gener| Age|Annual Income| 3. +---+------+------+-------------+ 4. | 1| Male|1156.0| 2000| 5. | 2|Female| 529.0| 3500| 6. | 3|Female| 676.0| 2500| 7. | 4|Female| 729.0| 4500| 8. | 5| Male| 576.0| 5500| 9. | 6| Male|1089.0| 2000| 10. | 7| Male|1156.0| 3500| 11. | 8| Male| 529.0| 2500| 12. | 9| Male| 676.0| 4500| 13. | 10|Female| 729.0| 5500| 14. +---+------+------+-------------+ 15. only showing top 10 rows 16. 17. +---+------+---+-------------+------+ 18. | ID| Gener|Age|Annual Income| Age2| 19. +---+------+---+-------------+------+ 20. | 1| Male| 34| 2000|1156.0| 21. | 2|Female| 23| 3500| 529.0| 22. | 3|Female| 26| 2500| 676.0| 23. | 4|Female| 27| 4500| 729.0| 24. | 5| Male| 24| 5500| 576.0| 25. | 6| Male| 33| 2000|1089.0| 26. | 7| Male| 34| 3500|1156.0| 27. | 8| Male| 23| 2500| 529.0| 28. | 9| Male| 26| 4500| 676.0| 29. | 10|Female| 27| 5500| 729.0| 30. +---+------+---+-------------+------+ 31. only showing top 10 rows
10、withColumnRename(existing,new),对已经存在的列明重命名为new,若名称不存在则这个操作不做任何事情。在zeppelin中执行如下代码:
1. // 修改Age的列名 2. df.withColumnRenamed("Age","age").show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|age|Annual Income| 3. +---+------+---+-------------+ 4. | 1| Male| 34| 2000| 5. | 2|Female| 23| 3500| 6. | 3|Female| 26| 2500| 7. | 4|Female| 27| 4500| 8. | 5| Male| 24| 5500| 9. | 6| Male| 33| 2000| 10. | 7| Male| 34| 3500| 11. | 8| Male| 23| 2500| 12. | 9| Male| 26| 4500| 13. | 10|Female| 27| 5500| 14. +---+------+---+-------------+ 15. only showing top 10 rows
11、orderBy(cols,*kwargs),返回按照指定列排好序的新的DataFrame。在zeppelin中执行如下代码:
1. // 对年龄进行排序 2. df.orderBy("Age").show(5) 3. 4. // 降序排序 5. df.orderBy($"Age".desc).show(5)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|Age|Annual Income| 3. +---+------+---+-------------+ 4. | 8| Male| 23| 2500| 5. | 14| Male| 23| 4500| 6. | 2|Female| 23| 3500| 7. | 32| Male| 23| 3500| 8. | 20|Female| 23| 5500| 9. +---+------+---+-------------+ 10. only showing top 5 rows 11. 12. +---+------+---+-------------+ 13. | ID| Gener|Age|Annual Income| 14. +---+------+---+-------------+ 15. | 7| Male| 34| 3500| 16. | 13|Female| 34| 2500| 17. | 1| Male| 34| 2000| 18. | 31| Male| 34| 2000| 19. | 19|Female| 34| 4500| 20. +---+------+---+-------------+ 21. only showing top 5 rows
使用ascending关键字参数指定升降序排列。除了这种方式,还可以通过pyspark.sql.functions中定义好的desc降序和asc方法来排序。在zeppelin中执行如下代码:
1. import org.apache.spark.sql.functions._ 2. 3. // 对年龄进行排序 4. df.orderBy(desc("Age")).show(5) 5. 6. // 降序排序 7. df.orderBy($"Age".desc).show(5) 8. 9. // 对年龄进行排序 10. df.orderBy(asc("Age")).show(5) 11. 12. // 生序排序 13. df.orderBy($"Age".asc).show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|Age|Annual Income| 3. +---+------+---+-------------+ 4. | 7| Male| 34| 3500| 5. | 13|Female| 34| 2500| 6. | 1| Male| 34| 2000| 7. | 31| Male| 34| 2000| 8. | 19|Female| 34| 4500| 9. +---+------+---+-------------+ 10. only showing top 5 rows 11. 12. +---+------+---+-------------+ 13. | ID| Gener|Age|Annual Income| 14. +---+------+---+-------------+ 15. | 7| Male| 34| 3500| 16. | 13|Female| 34| 2500| 17. | 1| Male| 34| 2000| 18. | 31| Male| 34| 2000| 19. | 19|Female| 34| 4500| 20. +---+------+---+-------------+ 21. only showing top 5 rows 22. 23. +---+------+---+-------------+ 24. | ID| Gener|Age|Annual Income| 25. +---+------+---+-------------+ 26. | 8| Male| 23| 2500| 27. | 14| Male| 23| 4500| 28. | 2|Female| 23| 3500| 29. | 32| Male| 23| 3500| 30. | 20|Female| 23| 5500| 31. +---+------+---+-------------+ 32. only showing top 5 rows 33. 34. +---+------+---+-------------+ 35. | ID| Gener|Age|Annual Income| 36. +---+------+---+-------------+ 37. | 8| Male| 23| 2500| 38. | 14| Male| 23| 4500| 39. | 2|Female| 23| 3500| 40. | 32| Male| 23| 3500| 41. | 20|Female| 23| 5500| 42. +---+------+---+-------------+ 43. only showing top 5 rows
sort方法和orderBy方法类似。在zeppelin中执行如下代码:
1. import org.apache.spark.sql.functions._ 2. 3. // 降序 4. df.sort(desc("Age")).show(5) 5. 6. // 升序 7. df.sort(asc("Age")).show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+ 2. | ID| Gener|Age|Annual Income| 3. +---+------+---+-------------+ 4. | 7| Male| 34| 3500| 5. | 13|Female| 34| 2500| 6. | 1| Male| 34| 2000| 7. | 31| Male| 34| 2000| 8. | 19|Female| 34| 4500| 9. +---+------+---+-------------+ 10. only showing top 5 rows 11. 12. +---+------+---+-------------+ 13. | ID| Gener|Age|Annual Income| 14. +---+------+---+-------------+ 15. | 8| Male| 23| 2500| 16. | 14| Male| 23| 4500| 17. | 2|Female| 23| 3500| 18. | 32| Male| 23| 3500| 19. | 20|Female| 23| 5500| 20. +---+------+---+-------------+ 21. only showing top 5 rows
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过DataFrame的操作-使用DSL,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
12、 实验测试
1、数据写入MySQL中mode=’append’的意思是什么( A ){单选}
A、追加
B、覆盖
C、修改
D、删除
13、实验拓展
1、给定给一个文本数据,将数据转换为DataFrame类型,并将数据写入到MySQL 中。