Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】

简介: Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】

前言

       昨天实验课试着做了一个 Spark SQL 小案例,发现好多内容还是没有掌握,以及好多书上没有的内容需要学习。

一、数据准备

csv 文件内容部分数据展示:

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q

字段说明

• PassengerId : 乘客编号。

• Survived : 是否存活,0表示未能存活,1表示存活。

• Pclass : 描述乘客所属的等级,总共分为三等,用1、2、3来描述:1表示高等;2表示中等;3表示低等。

• Name : 乘客姓名。

• Sex : 乘客性别。

• Age : 乘客年龄。

• SibSp : 与乘客同行的兄弟姐妹(Siblings)和配偶(Spouse)数目。

• Parch : 与乘客同行的家长(Parents)和孩子(Children)数目。

• Ticket : 乘客登船所使用的船票编号。

• Fare : 乘客上船的花费。

• Cabin : 乘客所住的船舱。

• Embarked : 乘客上船时的港口,C表示Cherbourg;Q表示Queenstown;S表示Southampton。

二、Spark数据预处理

1、通过读取本地文件生成 DataFrame 对象。

// 创建 SparkSession 对象
    val conf = new SparkConf().setMaster("local[*]").setAppName("practice1")
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    // 导入隐式转换相关依赖
    import spark.implicits._
    // 读取csv文件生成 DataFrame 对象
    val df = spark.read.format("csv")
      .option("header","true")
      .option("mode","DROPMALFORMED")
      .load("data/practice1/titanic.csv")

2、修改字段类型

DataFrame 读取进来的都是 StringType 类型,我们需要对部分字段进行修改。


       'withColumn'是一个DataFrame转换函数,用于在现有的DataFrame上添加或替换列。这个函数接收两个参数,第一个是新列的名称,第二个是新列的值。对于新列的值,我们使用 cast 方法将它强制转为一个新的类型。


       cast方法用于将一个数据类型的值转换为另一个数据类型。它可以用于将一种数据类型转换为另一种数据类型,例如将字符串转换为整数或将整数转换为浮点数等。


withColumn 作为一个转换函数会返回一个新的 DataFrame 对象,记得通过变量或常量存储起来。

// 修改字段数据类型
    val md_df = df.withColumn("Pclass", df("Pclass").cast(IntegerType)) // 乘客登记 包括1-2-3三个等级
      .withColumn("Survived", df("Survived").cast(IntegerType)) //是否存活-1存活 0-未能存活
      .withColumn("Age", df("Age").cast(DoubleType)) // 年龄
      .withColumn("SibSp", df("SibSp").cast(IntegerType)) // 乘客的兄弟姐妹和配偶的数量
      .withColumn("Parch", df("Parch").cast(IntegerType)) //乘客的家长和孩子数目
      .withColumn("Fare", df("Fare").cast(DoubleType)) // 上传的花费

3、删除不必要的字段

// 删除不必要的字段
    val df1 = md_df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin")

4、缺失值处理

用到的函数:

DSL 语句中的 select、where函数,以及 count 、zip 函数。

涉及到的操作:

RDD 对象转为 DataFrame 对象,这里因为RDD对象的内容是元组,所以可以直接调用 toDF 方法。

统计缺失值
// 缺失值处理
    val columns: Array[String] = df1.columns  //返回df1的字段组成的数组 Array("字段1","字段2","字段3"...)
    // 通过select方法对字段数组中的每一个字段进行搜索,并通过where方法找出满足列col(字段).isNUll的值的count(个数)
    val missing_cnt: Array[Long] = columns.map(field => df1.select(col(field)).where(col(field).isNull).count())
    // 通过zip方法将两个集合数组合并成一个元组
    val tuples: Array[(Long, String)] = missing_cnt.zip(columns)
    // 把生成的元组读取为RDD对象再转为DataFrame对象
    val result_df: DataFrame = spark.sparkContext.parallelize(tuples).toDF("missing_cnt", "column_name")
    result_df.show()  // 统计缺失值

统计结果展示:

+-----------+-----------+
|missing_cnt|column_name|
+-----------+-----------+
|          0|   Survived|
|          0|     Pclass|
|          0|        Sex|
|        177|        Age|
|          0|      SibSp|
|          0|      Parch|
|          0|       Fare|
|          2|   Embarked|
+-----------+-----------+
缺失值处理
// 处理缺失值函数
    def meanAge(dataFrame: DataFrame): Double = {
      dataFrame.select("Age")
        .na.drop() //删除 Age 为空的行
        //'round' 函数用于将数字四舍五入到指定的小数位数。'mean' 函数则用于计算一组数值的平均值。
        .agg(round(mean("Age"), 0)) //对'Age'列计算平均值,并保留0位小数,也就是取整
        .first()  //由于agg操作返回的是一个DataFrame,而这个DataFrame只有一行,所以使用first()方法获取这一行。
        .getDouble(0) //从结果行中获取第一个字段(索引为0)的值,并将其转换为Double类型。
    }

处理:

val df2 = df1.na.fill(Map("Age" -> meanAge(df1), "Embarked" -> "S"))
    df2.show()

处理结果展示:

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       S|
|       0|     3|female|14.0|    0|    0| 7.8542|       S|
|       1|     2|female|55.0|    0|    0|   16.0|       S|
|       0|     3|  male| 2.0|    4|    1| 29.125|       Q|
|       1|     2|  male|30.0|    0|    0|   13.0|       S|
|       0|     3|female|31.0|    1|    0|   18.0|       S|
|       1|     3|female|30.0|    0|    0|  7.225|       C|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 20 rows

三、Spark 数据分析

1、891人当中,共多少人生还?

// 1.891人当中,共多少人生还?
    val survived_count: DataFrame = df2.groupBy("Survived").count()
    survived_count.show()
//保存结果到本地 
survived_count.coalesce(1).write.option("header","true").csv("output/practice1/survived_count.csv")

运行结果:

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+

2.不同上船港口生还情况

// 2.不同上船港口生还情况
    val survived_embark = df2.groupBy("Embarked", "Survived").count()
    survived_embark.show()
    survived_embark.coalesce(1).write.option("header","true").csv("data/practice1survived_embark.csv")

运行结果:

+--------+--------+-----+
|Embarked|Survived|count|
+--------+--------+-----+
|       Q|       1|   30|
|       S|       0|  427|
|       S|       1|  219|
|       C|       1|   93|
|       Q|       0|   47|
|       C|       0|   75|
+--------+--------+-----+

3.存活/未存活的男女数量及比例

 // 3.存活/未存活的男女数量及比例
    val survived_sex_count=df2.groupBy("Sex","Survived").count()
    val survived_sex_percent=survived_sex_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    survived_sex_percent.show()
    survived_sex_percent.coalesce(1).write.option("header", "true").csv("data/practice1/survived_sex_percent.csv")

运行结果:

+------+--------+-----+--------+
|   Sex|Survived|count| percent|
+------+--------+-----+--------+
|  male|       0|  468|52.52525|
|female|       1|  233|26.15039|
|female|       0|   81| 9.09091|
|  male|       1|  109|12.23345|
+------+--------+-----+--------+

4. 不同级别乘客生还人数和占总生还人数的比例

// 4. 不同级别乘客生还人数和占总生还人数的比例
    val survived_df = df2.filter(col("Survived")===1)
    val pclass_survived_count=survived_df.groupBy("Pclass").count()
    val pclass_survived_percent=pclass_survived_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    pclass_survived_percent.show()
    pclass_survived_percent.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_survived_percent.csv")

运行结果:

+------+-----+--------+
|Pclass|count| percent|
+------+-----+--------+
|     1|  136|39.76608|
|     3|  119|34.79532|
|     2|   87|25.43860|
+------+-----+--------+

5. 有无同行父母/孩子的生还情况

 // 5.有无同行父母/孩子的生还情况
    val df4=df2.withColumn("Parch_label",when(df2("Parch")>0,1).otherwise(0))
    val parch_survived_count=df4.groupBy("Parch_label","Survived").count()
    parch_survived_count.show()
    parch_survived_count.coalesce(1).write.option("header", "true").csv("data/practice1/parch_survived_count.csv")

运行结果:

+-----------+--------+-----+
|Parch_label|Survived|count|
+-----------+--------+-----+
|          1|       0|  104|
|          1|       1|  109|
|          0|       0|  445|
|          0|       1|  233|
+-----------+--------+-----+

6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况

    // 6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况
    val df3=survived_df.withColumn("Age_label",when(df2("Age")<=18,"minor").when(df2("Age")>18 && df2("Age")<=35,"young").when(df2("Age")>35 && df2("Age")<=55,"middle").otherwise("older"))
    val age_survived=df3.groupBy("Age_label","Survived").count()
    age_survived.show()
    age_survived.coalesce(1).write.option("header", "true").csv("data/practice1/age_survived.csv")

运行结果:

+---------+--------+-----+
|Age_label|Survived|count|
+---------+--------+-----+
|    young|       1|  189|
|    older|       1|   12|
|    minor|       1|   70|
|   middle|       1|   71|
+---------+--------+-----+

7. 提取乘客等级和上船费用信息

    // 7.提取乘客等级和上船费用信息
    val sef = Seq("Pclass", "Fare")
    val df5 = df2.select(sef.head, sef.tail: _*)
    df5.show(5)
    df5.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_fare.csv")

运行结果:

+------+-------+
|Pclass|   Fare|
+------+-------+
|     3|   7.25|
|     1|71.2833|
|     3|  7.925|
|     1|   53.1|
|     3|   8.05|
+------+-------+
only showing top 5 rows

四、数据可视化

数据可视化部分打算在学完 R 语言再完成,Python 实现后续更新。

目录
打赏
0
0
0
0
37
分享
相关文章
Pandas数据应用:天气数据分析
本文介绍如何使用 Pandas 进行天气数据分析。Pandas 是一个强大的 Python 数据处理库,适合处理表格型数据。文章涵盖加载天气数据、处理缺失值、转换数据类型、时间序列分析(如滚动平均和重采样)等内容,并解决常见报错如 SettingWithCopyWarning、KeyError 和 TypeError。通过这些方法,帮助用户更好地进行气候趋势预测和决策。
170 71
从数据小白到大数据达人:一步步成为数据分析专家
从数据小白到大数据达人:一步步成为数据分析专家
273 92
Pandas数据应用:电子商务数据分析
本文介绍如何使用 Pandas 进行电子商务数据分析,涵盖数据加载、清洗、预处理、分析与可视化。通过 `read_csv` 等函数加载数据,利用 `info()` 和 `describe()` 探索数据结构和统计信息。针对常见问题如缺失值、重复记录、异常值等,提供解决方案,如 `dropna()`、`drop_duplicates()` 和正则表达式处理。结合 Matplotlib 等库实现数据可视化,探讨内存不足和性能瓶颈的应对方法,并总结常见报错及解决策略,帮助提升电商企业的数据分析能力。
184 73
数据团队必读:智能数据分析文档(DataV Note)五种高效工作模式
数据项目复杂,涉及代码、数据、运行环境等多部分。随着AI发展,数据科学团队面临挑战。协作式数据文档(如阿里云DataV Note)成为提升效率的关键工具。它支持跨角色协同、异构数据处理、多语言分析及高效沟通,帮助创建知识库,实现可重现的数据科学过程,并通过一键分享报告促进数据驱动决策。未来,大模型AI将进一步增强其功能,如智能绘图、总结探索、NLP2SQL/Python和AutoReport,为数据分析带来更多可能。
111 21
基于机器学习的数据分析:PLC采集的生产数据预测设备故障模型
本文介绍如何利用Python和Scikit-learn构建基于PLC数据的设备故障预测模型。通过实时采集温度、振动、电流等参数,进行数据预处理和特征提取,选择合适的机器学习模型(如随机森林、XGBoost),并优化模型性能。文章还分享了边缘计算部署方案及常见问题排查,强调模型预测应结合定期维护,确保系统稳定运行。
104 0
Pandas数据应用:医疗数据分析
Pandas是Python中强大的数据操作和分析库,广泛应用于医疗数据分析。本文介绍了使用Pandas进行医疗数据分析的常见问题及解决方案,涵盖数据导入、预处理、清洗、转换、可视化等方面。通过解决文件路径错误、编码不匹配、缺失值处理、异常值识别、分类变量编码等问题,结合Matplotlib等工具实现数据可视化,并提供了解决常见报错的方法。掌握这些技巧可以提高医疗数据分析的效率和准确性。
107 22
数据集中存在大量的重复值,会对后续的数据分析和处理产生什么影响?
数据集中存在大量重复值可能会对后续的数据分析和处理产生多方面的负面影响
247 56
Pandas数据应用:股票数据分析
本文介绍了如何使用Pandas库进行股票数据分析。首先,通过pip安装并导入Pandas库。接着,从本地CSV文件读取股票数据,并解决常见的解析错误。然后,利用head()、info()等函数查看数据基本信息,进行数据清洗,处理缺失值和重复数据。再者,结合Matplotlib和Seaborn进行数据可视化,绘制收盘价折线图。最后,进行时间序列分析,设置日期索引、重采样和计算移动平均线。通过这些步骤,帮助读者掌握Pandas在股票数据分析中的应用。
121 5
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
213 0