基于Spark的数据清洗与转换

简介: 基于Spark的数据清洗与转换

一、实验目的

掌握数据整合、数据清洗和数据转换方法。

二、实验内容

1、整合来自不同数据源的数据。

  2、对数据进行清洗。

  3、对数据进行转换。

三、实验原理

数据质量一直是业界普遍存在的问题。不正确或不一致的数据的存在可能会对分析产生误导。90%的时间,数据科学家们并非花时间在建立炫酷的模型上,而是花在数据准备上。做任何分析,最难也最花时间的部分都在数据准备。有一个行业术语叫做“数据工程”,指的是数据的来源和准备。

 数据准备阶段是一个非常重要的阶段,不仅对于算法来说是正确的,而且还可以让我们更好地理解我们的数据,这样我们就可以在实现算法的同时采取正确的方法。


四、实验环境

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1


五、实验步骤

5.1 启动HDFS集群、Spark集群和Zeppelin服务器

在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh
4.  $ zeppelin-daemon.sh start

然后使用jps命令查看进程,确保已经正确地启动了HDFS集群、Spark集群和Zeppelin服务器。


5.2 准备实验数据

将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:

1.  $ hdfs dfs -mkdir -p /data/dataset
2.  $ hdfs dfs -put /data/dataset/batch/salary.json /data/dataset/
3.  $ hdfs dfs -put /data/dataset/batch/designation.json /data/dataset

执行以下命令,查看数据文件是否已经上传到HDFS中:

1.  $ hdfs dfs -ls /data/dataset/

5.3 数据整合

假设这样的场景:员工数据分散存储在本地的RDDs、JSON文件和SQL数据库中。我们的任务是将这些数据加载到DataFrame中。一旦数据从不同的来源获得,接下来就是将它们全部合并,以便将数据作为一个整体进行清理、格式化,并转换为分析所需的格式。

 1、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页。点击”Create new note”链接,创建一个新的笔记本,命名为”analy_demo”,如下图所示:

0c84ad85756a4795b87c24af3e0fca3b.png


2、构造代表员工信息的DataFrame。在zeppelin中输入如下代码:

1.  // 创建一个RDD并转换为DataFrame
2.  val employeesDF = sc.parallelize(List((1, "陈柯宇", 25), (2, "陶心瑶", 35),(3, "楼一萱", 24), 
3.                                        (4, "张希", 28), (5, "王心凌", 26), (6, "庄妮", 35), 
4.                                        (7,"何洁", 38), (8, "成方圆", 32), (9, "孙玉", 29), 
5.                                        (10, "刘珂矣", 29),(11, "林忆莲", 28), (12, "蓝琪儿", 25), 
6.                                        (13, "白安",31))).toDF("emp_id","name","age")
7.  employeesDF.show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

8094a848e943456e8a61716dfae1cb27.png


3、加载存储有薪资数据的json文件创建DataFrame。在zeppelin中执行如下代码:

1.  val salaryFilePath = "hdfs://localhost:9000/data/dataset/salary.json"
2.  val salaryDF = spark.read.json(salaryFilePath)
3.  salaryDF.show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:


a5ba2e9d154b43d28289c3e00f7d4793.png

4、加载存储有职务数据的json文件创建DataFrame。在zeppelin中执行如下代码:

1.  val designationFilePath = "hdfs://localhost:9000/data/dataset/designation.json"
2.  val designationDF = spark.read.json(designationFilePath)
3.  designationDF.show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

3f3ba9551e444aaf832f17ca458a6e42.png


5、数据整合:组合从各种数据源获得的数据。在zeppelin中执行如下代码:

1.  val final_data = employeesDF.join(salaryDF,$"emp_id"===$"e_id").
2.                               join(designationDF,$"emp_id"===$"id").
3.                               select("emp_id","name","age","role","salary")
4.  final_data.cache
5.  final_data.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:


687aef1139cf4e1a96a4bdeaaa80aa8c.png

5.4 数据清洗

一旦把数据整合到一起,就必须花足够的时间和精力去整理它,然后再分析它。这是一个迭代的过程,因为我们必须验证对数据所采取的操作,并一直持续到我们对数据质量满意为止。

 数据中某些不洁度通常存在于任何数据集中。数据可能有各种各样的问题,这里我们将处理一些常见的情况,比如缺失值、重复值、转换或格式化(从数字中添加或删除数字,将一个列分割成两个,合并两个列到一个)。

 1、缺失值处理。在zeppelin中执行如下代码:

1.  // 可以删除带有缺失值的行
2.  var clean_data = final_data.na.drop()
3.  clean_data.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

48442b50f81e48628f383fefd378ba2b.png


可以看出,带有缺失值的”emp_id”为4的数据已经被删除了。

 也可以使用平均值替换缺失值。在zeppelin中执行如下代码:

1.  // 也可以使用平均值替换缺失值
2.  val mean_salary = final_data.select(floor(avg("salary"))).first()(0).toString.toDouble
3.  clean_data = final_data.na.fill(Map("salary" -> mean_salary))
4.  clean_data.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

f276618702fe48d7b5fbd37f9476a326.png


2、异常值处理:删除包含离群值的行,或者均值替代方法。在这里我们识别异常值并用均值替代。在zeppelin中执行如下代码:

1.  // 计算每一行的偏差
2.  val devs = clean_data.select((($"salary" - mean_salary) * ($"salary" - mean_salary)).alias("deviation"))
3.       
4.  // 计算标准偏差
5.  val stddev = devs.select(sqrt(avg("deviation"))).first().getDouble(0)
6.       
7.  // 用平均工资替换超过2个标准差范围内的异常值(UDF)
8.  val outlierfunc = udf((value: Long, mean: Double) => {
9.      if (value > mean+(2*stddev) || value < mean-(2*stddev)) 
10.         mean 
11.     else 
12.         value
13.   })  
14. val no_outlier = clean_data.withColumn("updated_salary",outlierfunc(col("salary"),lit(mean_salary)))
15.      
16. // 观察修改后的值
17. no_outlier.filter($"salary" =!= $"updated_salary").show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

3b15ef335b6b489f8da7cd52b6bca177.png


可以看出,”emp_id”为13的记录中,薪资列属于异常值,我们用平均工资来代替它。

 3、重复值处理:在数据集中处理重复记录有不同的方法,既可以删除重复的行,也可以基于某列的子集删除重复的行。在zeppelin中执行如下代码:

1.  // 删除重复的行
2.  // val no_outlier_no_duplicates = no_outlier.dropDuplicates()
3.  // no_outlier_no_duplicates.show
4.       
5.  // 也可基于某列的子集删除重复的行
6.  val test_df = no_outlier.dropDuplicates("role").show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

e58c2a7d0e2d4ae689aed3a9c1c0d690.png


可以看出,当我们基于”role”列删除重复的行时,每种role值只保留一个。


5.5 数据转换

存在有各种各样的数据转换需求,这里我们将讨论一些基本类型的转换。

 1、将两列合并成一列。在zeppelin中执行如下代码:

1.  // 创建一个udf来连接两个列值
2.  val concatfunc = udf((name: String, age: Integer) => {name + "_" + age})
3.       
4.  // 应用该udf来创建合并的列
5.  val concat_df = final_data.withColumn("name_age",concatfunc($"name", $"age"))
6.       
7.  // 显示 
8.  concat_df.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

86e675cc23004afebd9abeb0584a8108.png


可以看出,在上面的转换中,我们将”name”列和”age”列合并成一个”name_age”列。

 2、将字符/数字添加到现有的字符/数字。在zeppelin中执行如下代码:

1.  // 向数据添加常量
2.  val addconst = udf((age:Integer) => {age + 10})
3.  val data_new = concat_df.withColumn("age_incremented",addconst(col("age")))
4.  data_new.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

8b633280ce06433f9c0bac4c29283696.png


当基于一个列中已经存在的值追加新的列时,如果新列与旧列同名,则会覆盖旧列。在zeppelin中执行如下代码:

1.  concat_df.withColumn("age",addconst(col("age"))).show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

de2a5b64587a4ddc86c3c64c8d63a50d.png


3、从现有的字符/数字中删除或替换字符/数字。在zeppelin中执行如下代码:


1.  // 替换一个列中的值
2.  final_data.na.replace("role",Map("合伙人" -> "同事")).show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:


88a9072ed83748c492e1fd22cd355815.png

注:如果在replace中列名参数是”*”,那么替换应用到所有的列。

 4、更改日期的格式。在zeppelin中执行如下代码:

1.  import org.apache.spark.sql.functions.udf
2.  // 日期转换
3.  // 构造数据集
4.  case class Book (title: String, author: String, pubtime: String)
5.  val books = Seq(Book("浮生六记","[清]沈复","2018/7/1"),
6.                  Book("云边有个小卖部","张嘉佳","2018/07/12"),
7.                  Book("菊与刀","[美]本尼迪克特",null),
8.                  Book("苏菲的世界","乔斯坦·贾德","2017,10 12"),
9.                  Book("罗生门",null,null)
10.             )
11.      
12. val ds1 = sc.parallelize(books).toDS()
13.      
14. // 定义udf: 将传入的字符串转换成YYYY-MM-DD格式
15. def toDateUDF = udf((s: String) => {
16.     var (year, month, day) = ("","","")
17.      
18.     // 格式化日期
19.     if(s != null) {
20.         var x = s.split(" |/|,")    // 拆分
21.         year = "%04d".format(x(0).toInt)
22.         month = "%02d".format(x(1).toInt)
23.         day = "%02d".format(x(2).toInt)
24.      
25.         year + "-" + month + "-" + day
26.     } else {
27.         null
28.     }
29. })
30.      
31. // 应用udf并将日期字符串转换为标准的形式YYYY-MM-DD
32. ds1.withColumn("pubtime",toDateUDF(ds1("pubtime"))).show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

4879577b2cec4018946ec08a9d2697f3.png


在这里,我们看到的是日期列有许多不同日期格式的数据点的情况。我们需要将所有不同的日期格式标准化成一种格式。要做到这一点,我们首先必须创建一个用户定义的函数(udf),它可以处理不同的格式,并将不同的日期格式转换为一种通用格式。


六、 实验知识测试


七、实验拓展

下面是一个简单的数据清洗的示例。

 1. 构造一个带缺失值的Dataset。在zeppelin中执行如下代码:

1.  // 构造一个带缺失值的Dataset
2.  case class Author (name: String, dynasty: String, dob: String)
3.  
4.  val authors = Seq(Author("曹雪芹","清代","1724年"),
5.                    Author("施耐庵","元末明初","1296年"),
6.                    Author("罗贯中","元末明初",null),
7.                    Author("吴承恩",null,null)
8.                 )
9.       
10. val ds1 = sc.parallelize(authors).toDS()
11. ds1.show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:


129d250a28484b3dbb70f5fa40c97447.png

2. 删除带有缺失值的行。在zeppelin中执行如下代码:

1.  ds1.na.drop().show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

08cd3e9d7d7c41deba4840957d142f16.png


3. 删除带有至少两个缺失值的行。在zeppelin中执行如下代码:

1.  ds1.na.drop(minNonNulls = ds1.columns.length - 1).show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

b73845993d7047918fd220d4fadb1f6f.png


4. 使用一个给定的字符串填充所有缺失的值。在zeppelin中执行如下代码:

1.  ds1.na.fill("不详").show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:


1c3ffe4ce0f744a1b16fc5f5da9421c3.png

5. 在不同列使用不同的字符串填充缺失值。在zeppelin中执行如下代码:

1.  ds1.na.fill(Map("dynasty"->"--", "dob"->"不详")).show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:


42910b2c7f6b4a54880adfadabe01a9b.png


6. 删除重复的值。在zeppelin中执行如下代码:

1.  // 删除重复的行
2.  val authors = Seq(Author("曹雪芹","清代","1724年"),
3.                    Author("曹雪芹","清代","1724年"),
4.                    Author("施耐庵","元末明初","1296年"),
5.                    Author("曹雪芹","清朝","1724年"),
6.                    Author("罗贯中","元末明初",null),
7.                    Author("吴承恩",null,null)
8.                  )
9.       
10. val ds1 = sc.parallelize(authors).toDS()
11. ds1.show()
12.      
13. // 删除重复的行
14. ds1.dropDuplicates().show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:


555a101789fe4dff838c41b51b3ab8c2.png

082fcbc72af84042b53c9aa36cfe89d2.png

7. 基于一个列的子集删除重复。在zeppelin中执行如下代码:

1.  // 基于一个列的子集删除重复
2.  ds1.dropDuplicates("name").show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:

c1654a03ffb54fd285a8ecf63b6105bf.png


8. 基于一个子集删除重复。在zeppelin中执行如下代码:

1.  // 基于一个子集删除重复
2.  ds1.dropDuplicates(Array("dynasty","dob")).show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:


9c92439f219b4f75bf9b041c408c072f.png

— end —


相关文章
|
2月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
112 1
Spark快速大数据分析PDF下载读书分享推荐
|
1月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
139 3
|
19天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
31 3
|
23天前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
41 3
|
28天前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
3月前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
162 59
|
20天前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
37 0
|
1月前
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
62 2
|
2月前
|
分布式计算 Hadoop 大数据
Hadoop与Spark在大数据处理中的对比
【7月更文挑战第30天】Hadoop和Spark在大数据处理中各有优势,选择哪个框架取决于具体的应用场景和需求。Hadoop适合处理大规模数据的离线分析,而Spark则更适合需要快速响应和迭代计算的应用场景。在实际应用中,可以根据数据处理的需求、系统的可扩展性、成本效益等因素综合考虑,选择适合的框架进行大数据处理。