一、实验目的
掌握Spark SQL的探索性数据分析技术。
二、实验内容
1、进行探索性数据分析。
2、探索性数据可视化。
3、对数据进行采样。
4、创建数据透视表。
三、实验原理
Exploratory Data Analysis (EDA),或Initial Data Analysis (IDA),是一种数据分析方法,试图最大限度地洞察数据。这包括评估数据的质量和结构,计算汇总或描述性统计,以及绘制适当的图表。它可以揭示底层结构,并建议如何建模数据。此外,EDA帮助我们检测数据中的异常值、错误和异常,并且决定如何处理这些数据通常比其他更复杂的分析更重要。EDA使我们能够测试我们的基本假设,发现数据中的集群和其他模式,并识别各种变量之间可能的关系。仔细的EDA过程对于理解数据是至关重要的,而且有时足以揭示如此糟糕的以至于使用更复杂的基于模型的分析是不合理的数据质量。
在Spark SQL中,DataFrame上定义有一个summary()函数。这个函数将返回DataFrame中一列数值的记录的数量(count)、均值(mean)、标准差(stdev)、最小值(min)、最大值(max)。
四、实验环境
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
五、实验步骤
5.1 启动HDFS集群、Spark集群和Zeppelin服务器
在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh 4. $ zeppelin-daemon.sh start
然后使用jps命令查看进程,确保已经正确地启动了HDFS集群、Spark集群和Zeppelin服务器。
5.2 准备实验数据
将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:
1. $ hdfs dfs -mkdir -p /data/dataset/ 2. $ hdfs dfs -put /data/dataset/batch/chuangxin.csv /data/dataset/ 3. $ hdfs dfs -put /data/dataset/bank-full.csv /data/dataset/
执行以下命令,查看数据文件是否已经上传到HDFS中:
1. $ hdfs dfs -ls /data/dataset/
5.3 进行探索性数据分析
在探索性数据分析中,我们研究给定的数据。研究数据意味着统计记录的数量,然后寻找有意义的模式。
1、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页。点击”Create new note”链接,创建一个新的笔记本,命名为”analy_demo”,如下图所示:
2、 加载数据。在zeppelin中输入如下代码:
1. // 加载数据到DataFrame 2. val filePath = "hdfs://localhost:9000/data/dataset/chuangxin.csv" 3. val df1 = spark.read.option("header","true").option("inferSchema","true").csv(filePath) 4. 5. // 查看schema 6. df1.printSchema 7. 8. // 查看前5条数据 9. df1.show(5)
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
3、 统计总共有多少条记录。在zeppelin中执行如下代码:
1. // 统计总共有多少条记录 2. df1.count
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
可以看出,这个数据集中总共有1525条记录。
4、 识别缺失值。在zeppelin中执行如下代码:
1. // 识别缺失值:分析样本数据集中缺少数据字段的记录数量 2. df1.groupBy("SQJE").count().show 3. df1.groupBy("FFJE").count().show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
在这里,我们分别对”SQJE”(申请金额)列和”FFJE”(发放金额)列进行判断,看这两列是否有缺失值。从结果可以看出,”SQJE”列没有缺失值,而”FFJE”列有一个缺失值。
5、 找出有缺失值的记录。在zeppelin中执行如下代码:
1. df1.where($"FFJE".isNull).show 2. // df1.filter($"FFJE".isNull).show // 等价上一句
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
由此可知,申请编号(SQBH)为”201703143002750”的记录没有发放金额。
6、选择子集。因为不需要id列、XH列、QYCKMC列、SQQYLX列和SQBH列,所以将他们舍弃。在zeppelin中执行如下代码:
1. // 只保留NF(年份)列、SQJE(申请金额)列、FFJE(发放金额)列 2. val df2 = df1.select('NF,'SQJE,'FFJE) 3. df2.show(5)
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
7、计算一些基本统计信息,以提高对数据的理解,Spark SQL提供了describe()函数。这个函数计算数字列和字符串列的基本统计信息,包括count、mean、stddev、min和max。在zeppelin中执行如下代码:
1. df2.describe().show() 2. // df2.describe("SQJE","FFJE").show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
8、汇总统计信息,使用summary()函数。这个函数为数值列和字符串列计算指定的统计信息。可用的统计信息包括:- count - mean - stddev - min - max -以百分比形式指定的任意近似百分位数(如75%)。如果没有给出参数(即要统计的信息),这个函数将计算count、mean、stddev、min、近似四分位数(25%、50%和75%的百分位数)和max。在zeppelin中执行如下代码:
1. // 汇总统计信息 2. df2.summary().show 3. 4. // 要对特定列执行摘要,首先选择它们 5. // df2.select("SQJE","FFJE").summary().show 6. // df2.select("SQJE","FFJE").summary("count", "min", "25%", "75%", "max").show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
9、计算申请申请金额列和发放金额列的协方差。在zeppelin中执行如下代码:
1. // 协方差 2. df1.stat.cov("SQJE","FFJE")
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
由结果可知,申请金额列和发放金额列的变化趋势一致。通俗地说,申请金额越多,实际发放的金额就越多。
10、下面我们计算申请金额和发放金额之间的相关性。在zeppelin中执行如下代码:
1. // 相关性 2. df1.stat.corr("SQJE","FFJE")
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
由结果可知,申请金额列和发放金额列是正相关的,而且是显著性相关的。(一般相关性在0.4~0.7之间,我们称为显著性相关)
11、我们可以在两个变量之间创建交叉表格或交叉标记,以评估它们之间的相互关系。在zeppelin中执行如下代码:
1. df1.stat.crosstab("SQJE", "FFJE").show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
12、提取数据列中最常出现的项。在zeppelin中执行如下代码:
1. // 找出SQQYLX列中的频繁项:哪些企业类型申请得最多 2. df1.stat.freqItems(Seq("SQQYLX")).show 3. 4. df1.stat.freqItems(Seq("SQQYLX"),0.5).show // 第2个参数指定阈值
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
13、使用典型的聚合函数来总结数据,以便更好地理解它。在zeppelin中执行如下代码:
1. // 按申请企业的类型(SQQYLX)进行分类 2. df1.groupBy($"SQQYLX").agg(count("SQJE").as("申请数量"),count("FFJE").as("发放数量"), 3. avg("SQJE").as("平均申请金额"),avg("FFJE").as("平均发放金额")).show
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
5.4 探索性数据可视化
Apache Zeppelin是一个基于web的工具,支持交互式数据分析和可视化。它支持多种语言解释器,并带有内置的Spark集成。使用Apache Zeppelin进行探索性数据分析非常简单快捷。
1、加载数据。在zeppelin中执行如下代码:
1. val filePath = "hdfs://localhost:9000/data/dataset/bank-full.csv" 2. val bankText = sc.textFile(filePath) 3. bankText.cache
2、定义类型。在zeppelin中执行如下代码:
1. // 创建 case class 2. case class Bank(age:Integer, 3. job:String, 4. marital:String, 5. education:String, 6. balance:Integer 7. )
同时按下”【Shift + Enter】”键,执行以上代码
3、数据提炼。拆分每一行,过滤掉第一行(以age开头的标题行),并映射到 Bank case class。在zeppelin中执行如下代码:
1. val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map(s => 2. Bank(s(0).replaceAll("\"","").replaceAll(" ", "").toInt, 3. s(1).replaceAll("\"",""), 4. s(2).replaceAll("\"",""), 5. s(3).replaceAll("\"",""), 6. s(5).replaceAll("\"","").toInt) 7. )
7
同时按下”【Shift + Enter】”键,执行以上代码
4、从RDD转换为DataFrame。在zeppelin中执行如下代码:
1. val bankDF = bank.toDF() 2. bankDF.show(5)
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
5、注册临时视图。在zeppelin中执行如下代码:
1. bankDF.createOrReplaceTempView("bank_tb")
同时按下”【Shift + Enter】”键,执行以上代码。
6、然后执行sql可视化,查看不同年龄段的客户人数。在zeppelin中执行如下代码:
1. %sql 2. select age,count(age) as total_ages 3. from bank_tb 4. where age<${maxAge=30} 5. group by age 6. order by age
同时按下”【Shift + Enter】”键,执行以上代码。可以看到生成的饼状图如下:
还可以创建一个折线图,还可以读取每个绘制点的坐标值,如下图所示:
7、此外,我们可以创建一个接受输入值的文本框,使体验具有交互性。在下面的图中,我们创建了一个文本框,可以接受年龄参数的不同值,柱状图也随之更新。在zeppelin中执行如下代码:
1. %sql 2. select age, count(1) 3. from bank_tb 4. where marital="${marital=single,single(未婚)|divorced(离婚)|married(已婚)}" 5. group by age 6. order by age
同时按下”【Shift + Enter】”键,执行以上代码。可以看到生成的饼状图如下:
8、同样,我们也可以创建下拉列表,用户可以在其中选择适当的选项。例如,根据婚姻状况的不同显示对应的年龄分布。在zeppelin中执行如下代码:
1. %sql 2. select age, count(1) 3. from bank_tb 4. where marital="${marital=single,single(未婚)|divorced(离婚)|married(已婚)}" 5. group by age 6. order by age
同时按下”【Shift + Enter】”键,执行以上代码。可以看到生成的柱状图如下:
5.5 对数据进行抽样
统计人员广泛使用抽样技术进行数据分析。Spark支持近似和精确的样本生成。近似采样速度更快,而且满足大多数情况。
1、使用DataFrame/DatasetAPI进行抽样。下面的代码对银行客户数据进行抽样,并统计样本的大小和样本中每种婚姻类型的客户数量。在zeppelin中执行如下代码:
1. import scala.collection.immutable.Map 2. 3. // 指定不同婚姻类型的抽样比例 4. val fractions = Map("unknown" -> .10, "divorced" -> .15, "married" -> 0.5, "single" -> .25) 5. val dsStratifiedSample = bankDF.stat.sampleBy("marital", fractions, 36L) 6. 7. // 计算总样本数 8. dsStratifiedSample.count() 9. 10. // 计算样本中不同婚姻类型的客户数量 11. dsStratifiedSample.groupBy("marital").count().orderBy("marital").show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
从输出结果中可以看出,抽取的样本总数为17577,其中离异的样本数770,已婚的样本数为13548,单身的样本数为3259。
2、下面的代码中使用DataFrame上定义的sample函数来进行抽样,使用随机种子选择部分行(占总记录的10%),同时也会列出在样本内每种记录的数量。sample函数需要三个参数:
- withReplacement:有放回或无放回抽样(true/false)
- fraction:要生成的行数的因子(根据所要求的样本量,0到1之间的任意数字)
- seed:用于采样的种子(任何随机种子)
在zeppelin中执行如下代码:
1. // 有替换抽样 2. val dsSampleWithReplacement = bankDF.sample(true, .10) // 使用随机种子 3. 4. // 计算样本总数 5. dsSampleWithReplacement.count() 6. 7. // 计算样本中不同婚姻类型的客户数量 8. dsSampleWithReplacement.groupBy("marital").count().orderBy("marital").show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
使用sample不能保证提供数据集中记录总数的准确比例。
5.6 创建数据透视表
数据透视表创建数据的替代视图,通常在数据探索过程中使用。
1、以education为中心,并按婚姻状况进行统计。在zeppelin中执行如下代码:
1. bankDF.groupBy("marital").pivot("education").agg(count("education")).sort("marital").show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
2、我们为平均存款余额和平均年龄创建一个具有适当列名的DataFrame,并将空值填充为0.0。在zeppelin中执行如下代码:
1. bankDF.groupBy("job"). 2. pivot("marital", Seq("unknown", "divorced", "married", "single")). 3. agg(round(avg("balance"), 2), round(avg("age"), 2)). 4. sort("job").na.fill(0). 5. toDF("Job", "U-Bal", "U-Avg", "D-Bal", "D-Avg", "M-Bal", "M-Avg", "S-Bal", "S-Avg"). 6. show()
同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
六、 实验知识测试
1、在zeppelin中执行sql语句时需要指定正确的是( B ){单选}
A、%pyspark
B、%sql
C、%scala
D、%mysql
七、实验拓展
无