一、实验目的
掌握数据整合、数据清洗和数据转换方法。
二、实验内容
1、整合来自不同数据源的数据。
2、对数据进行清洗。
3、对数据进行转换。
三、实验原理
数据质量一直是业界普遍存在的问题。不正确或不一致的数据的存在可能会对分析产生误导。90%的时间,数据科学家们并非花时间在建立炫酷的模型上,而是花在数据准备上。做任何分析,最难也最花时间的部分都在数据准备。有一个行业术语叫做“数据工程”,指的是数据的来源和准备。
数据准备阶段是一个非常重要的阶段,不仅对于算法来说是正确的,而且还可以让我们更好地理解我们的数据,这样我们就可以在实现算法的同时采取正确的方法。
四、实验环境
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
五、实验步骤
5.1 启动HDFS集群、Spark集群和Zeppelin服务器
在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh 4. $ zeppelin-daemon.sh start
然后使用jps命令查看进程,确保已经正确地启动了HDFS集群、Spark集群和Zeppelin服务器。
5.2 准备实验数据
将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:
1. $ hdfs dfs -mkdir -p /data/dataset 2. $ hdfs dfs -put /data/dataset/batch/salary.json /data/dataset/ 3. $ hdfs dfs -put /data/dataset/batch/designation.json /data/dataset
执行以下命令,查看数据文件是否已经上传到HDFS中:
1. $ hdfs dfs -ls /data/dataset/
5.3 数据整合
假设这样的场景:员工数据分散存储在本地的RDDs、JSON文件和SQL数据库中。我们的任务是将这些数据加载到DataFrame中。一旦数据从不同的来源获得,接下来就是将它们全部合并,以便将数据作为一个整体进行清理、格式化,并转换为分析所需的格式。
1、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页。点击”Create new note”链接,创建一个新的笔记本,命名为”analy_demo”,如下图所示:
2、构造代表员工信息的DataFrame。在zeppelin中输入如下代码:
1. // 创建一个RDD并转换为DataFrame 2. val employeesDF = sc.parallelize(List((1, "陈柯宇", 25), (2, "陶心瑶", 35),(3, "楼一萱", 24), 3. (4, "张希", 28), (5, "王心凌", 26), (6, "庄妮", 35), 4. (7,"何洁", 38), (8, "成方圆", 32), (9, "孙玉", 29), 5. (10, "刘珂矣", 29),(11, "林忆莲", 28), (12, "蓝琪儿", 25), 6. (13, "白安",31))).toDF("emp_id","name","age") 7. employeesDF.show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
3、加载存储有薪资数据的json文件创建DataFrame。在zeppelin中执行如下代码:
1. val salaryFilePath = "hdfs://localhost:9000/data/dataset/salary.json" 2. val salaryDF = spark.read.json(salaryFilePath) 3. salaryDF.show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
4、加载存储有职务数据的json文件创建DataFrame。在zeppelin中执行如下代码:
1. val designationFilePath = "hdfs://localhost:9000/data/dataset/designation.json" 2. val designationDF = spark.read.json(designationFilePath) 3. designationDF.show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
5、数据整合:组合从各种数据源获得的数据。在zeppelin中执行如下代码:
1. val final_data = employeesDF.join(salaryDF,$"emp_id"===$"e_id"). 2. join(designationDF,$"emp_id"===$"id"). 3. select("emp_id","name","age","role","salary") 4. final_data.cache 5. final_data.show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
5.4 数据清洗
一旦把数据整合到一起,就必须花足够的时间和精力去整理它,然后再分析它。这是一个迭代的过程,因为我们必须验证对数据所采取的操作,并一直持续到我们对数据质量满意为止。
数据中某些不洁度通常存在于任何数据集中。数据可能有各种各样的问题,这里我们将处理一些常见的情况,比如缺失值、重复值、转换或格式化(从数字中添加或删除数字,将一个列分割成两个,合并两个列到一个)。
1、缺失值处理。在zeppelin中执行如下代码:
1. // 可以删除带有缺失值的行 2. var clean_data = final_data.na.drop() 3. clean_data.show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
可以看出,带有缺失值的”emp_id”为4的数据已经被删除了。
也可以使用平均值替换缺失值。在zeppelin中执行如下代码:
1. // 也可以使用平均值替换缺失值 2. val mean_salary = final_data.select(floor(avg("salary"))).first()(0).toString.toDouble 3. clean_data = final_data.na.fill(Map("salary" -> mean_salary)) 4. clean_data.show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
2、异常值处理:删除包含离群值的行,或者均值替代方法。在这里我们识别异常值并用均值替代。在zeppelin中执行如下代码:
1. // 计算每一行的偏差 2. val devs = clean_data.select((($"salary" - mean_salary) * ($"salary" - mean_salary)).alias("deviation")) 3. 4. // 计算标准偏差 5. val stddev = devs.select(sqrt(avg("deviation"))).first().getDouble(0) 6. 7. // 用平均工资替换超过2个标准差范围内的异常值(UDF) 8. val outlierfunc = udf((value: Long, mean: Double) => { 9. if (value > mean+(2*stddev) || value < mean-(2*stddev)) 10. mean 11. else 12. value 13. }) 14. val no_outlier = clean_data.withColumn("updated_salary",outlierfunc(col("salary"),lit(mean_salary))) 15. 16. // 观察修改后的值 17. no_outlier.filter($"salary" =!= $"updated_salary").show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
可以看出,”emp_id”为13的记录中,薪资列属于异常值,我们用平均工资来代替它。
3、重复值处理:在数据集中处理重复记录有不同的方法,既可以删除重复的行,也可以基于某列的子集删除重复的行。在zeppelin中执行如下代码:
1. // 删除重复的行 2. // val no_outlier_no_duplicates = no_outlier.dropDuplicates() 3. // no_outlier_no_duplicates.show 4. 5. // 也可基于某列的子集删除重复的行 6. val test_df = no_outlier.dropDuplicates("role").show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
可以看出,当我们基于”role”列删除重复的行时,每种role值只保留一个。
5.5 数据转换
存在有各种各样的数据转换需求,这里我们将讨论一些基本类型的转换。
1、将两列合并成一列。在zeppelin中执行如下代码:
1. // 创建一个udf来连接两个列值 2. val concatfunc = udf((name: String, age: Integer) => {name + "_" + age}) 3. 4. // 应用该udf来创建合并的列 5. val concat_df = final_data.withColumn("name_age",concatfunc($"name", $"age")) 6. 7. // 显示 8. concat_df.show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
可以看出,在上面的转换中,我们将”name”列和”age”列合并成一个”name_age”列。
2、将字符/数字添加到现有的字符/数字。在zeppelin中执行如下代码:
1. // 向数据添加常量 2. val addconst = udf((age:Integer) => {age + 10}) 3. val data_new = concat_df.withColumn("age_incremented",addconst(col("age"))) 4. data_new.show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
当基于一个列中已经存在的值追加新的列时,如果新列与旧列同名,则会覆盖旧列。在zeppelin中执行如下代码:
1. concat_df.withColumn("age",addconst(col("age"))).show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
3、从现有的字符/数字中删除或替换字符/数字。在zeppelin中执行如下代码:
1. // 替换一个列中的值 2. final_data.na.replace("role",Map("合伙人" -> "同事")).show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
注:如果在replace中列名参数是”*”,那么替换应用到所有的列。
4、更改日期的格式。在zeppelin中执行如下代码:
1. import org.apache.spark.sql.functions.udf 2. // 日期转换 3. // 构造数据集 4. case class Book (title: String, author: String, pubtime: String) 5. val books = Seq(Book("浮生六记","[清]沈复","2018/7/1"), 6. Book("云边有个小卖部","张嘉佳","2018/07/12"), 7. Book("菊与刀","[美]本尼迪克特",null), 8. Book("苏菲的世界","乔斯坦·贾德","2017,10 12"), 9. Book("罗生门",null,null) 10. ) 11. 12. val ds1 = sc.parallelize(books).toDS() 13. 14. // 定义udf: 将传入的字符串转换成YYYY-MM-DD格式 15. def toDateUDF = udf((s: String) => { 16. var (year, month, day) = ("","","") 17. 18. // 格式化日期 19. if(s != null) { 20. var x = s.split(" |/|,") // 拆分 21. year = "%04d".format(x(0).toInt) 22. month = "%02d".format(x(1).toInt) 23. day = "%02d".format(x(2).toInt) 24. 25. year + "-" + month + "-" + day 26. } else { 27. null 28. } 29. }) 30. 31. // 应用udf并将日期字符串转换为标准的形式YYYY-MM-DD 32. ds1.withColumn("pubtime",toDateUDF(ds1("pubtime"))).show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
在这里,我们看到的是日期列有许多不同日期格式的数据点的情况。我们需要将所有不同的日期格式标准化成一种格式。要做到这一点,我们首先必须创建一个用户定义的函数(udf),它可以处理不同的格式,并将不同的日期格式转换为一种通用格式。
六、 实验知识测试
无
七、实验拓展
下面是一个简单的数据清洗的示例。
1. 构造一个带缺失值的Dataset。在zeppelin中执行如下代码:
1. // 构造一个带缺失值的Dataset 2. case class Author (name: String, dynasty: String, dob: String) 3. 4. val authors = Seq(Author("曹雪芹","清代","1724年"), 5. Author("施耐庵","元末明初","1296年"), 6. Author("罗贯中","元末明初",null), 7. Author("吴承恩",null,null) 8. ) 9. 10. val ds1 = sc.parallelize(authors).toDS() 11. ds1.show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
2. 删除带有缺失值的行。在zeppelin中执行如下代码:
1. ds1.na.drop().show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
3. 删除带有至少两个缺失值的行。在zeppelin中执行如下代码:
1. ds1.na.drop(minNonNulls = ds1.columns.length - 1).show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
4. 使用一个给定的字符串填充所有缺失的值。在zeppelin中执行如下代码:
1. ds1.na.fill("不详").show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
5. 在不同列使用不同的字符串填充缺失值。在zeppelin中执行如下代码:
1. ds1.na.fill(Map("dynasty"->"--", "dob"->"不详")).show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
6. 删除重复的值。在zeppelin中执行如下代码:
1. // 删除重复的行 2. val authors = Seq(Author("曹雪芹","清代","1724年"), 3. Author("曹雪芹","清代","1724年"), 4. Author("施耐庵","元末明初","1296年"), 5. Author("曹雪芹","清朝","1724年"), 6. Author("罗贯中","元末明初",null), 7. Author("吴承恩",null,null) 8. ) 9. 10. val ds1 = sc.parallelize(authors).toDS() 11. ds1.show() 12. 13. // 删除重复的行 14. ds1.dropDuplicates().show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
7. 基于一个列的子集删除重复。在zeppelin中执行如下代码:
1. // 基于一个列的子集删除重复 2. ds1.dropDuplicates("name").show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
8. 基于一个子集删除重复。在zeppelin中执行如下代码:
1. // 基于一个子集删除重复 2. ds1.dropDuplicates(Array("dynasty","dob")).show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
— end —