Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】

简介: Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】

前言

       昨天实验课试着做了一个 Spark SQL 小案例,发现好多内容还是没有掌握,以及好多书上没有的内容需要学习。

一、数据准备

csv 文件内容部分数据展示:

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q

字段说明

• PassengerId : 乘客编号。

• Survived : 是否存活,0表示未能存活,1表示存活。

• Pclass : 描述乘客所属的等级,总共分为三等,用1、2、3来描述:1表示高等;2表示中等;3表示低等。

• Name : 乘客姓名。

• Sex : 乘客性别。

• Age : 乘客年龄。

• SibSp : 与乘客同行的兄弟姐妹(Siblings)和配偶(Spouse)数目。

• Parch : 与乘客同行的家长(Parents)和孩子(Children)数目。

• Ticket : 乘客登船所使用的船票编号。

• Fare : 乘客上船的花费。

• Cabin : 乘客所住的船舱。

• Embarked : 乘客上船时的港口,C表示Cherbourg;Q表示Queenstown;S表示Southampton。

二、Spark数据预处理

1、通过读取本地文件生成 DataFrame 对象。

// 创建 SparkSession 对象
    val conf = new SparkConf().setMaster("local[*]").setAppName("practice1")
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    // 导入隐式转换相关依赖
    import spark.implicits._
    // 读取csv文件生成 DataFrame 对象
    val df = spark.read.format("csv")
      .option("header","true")
      .option("mode","DROPMALFORMED")
      .load("data/practice1/titanic.csv")

2、修改字段类型

DataFrame 读取进来的都是 StringType 类型,我们需要对部分字段进行修改。


       'withColumn'是一个DataFrame转换函数,用于在现有的DataFrame上添加或替换列。这个函数接收两个参数,第一个是新列的名称,第二个是新列的值。对于新列的值,我们使用 cast 方法将它强制转为一个新的类型。


       cast方法用于将一个数据类型的值转换为另一个数据类型。它可以用于将一种数据类型转换为另一种数据类型,例如将字符串转换为整数或将整数转换为浮点数等。


withColumn 作为一个转换函数会返回一个新的 DataFrame 对象,记得通过变量或常量存储起来。

// 修改字段数据类型
    val md_df = df.withColumn("Pclass", df("Pclass").cast(IntegerType)) // 乘客登记 包括1-2-3三个等级
      .withColumn("Survived", df("Survived").cast(IntegerType)) //是否存活-1存活 0-未能存活
      .withColumn("Age", df("Age").cast(DoubleType)) // 年龄
      .withColumn("SibSp", df("SibSp").cast(IntegerType)) // 乘客的兄弟姐妹和配偶的数量
      .withColumn("Parch", df("Parch").cast(IntegerType)) //乘客的家长和孩子数目
      .withColumn("Fare", df("Fare").cast(DoubleType)) // 上传的花费

3、删除不必要的字段

// 删除不必要的字段
    val df1 = md_df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin")

4、缺失值处理

用到的函数:

DSL 语句中的 select、where函数,以及 count 、zip 函数。

涉及到的操作:

RDD 对象转为 DataFrame 对象,这里因为RDD对象的内容是元组,所以可以直接调用 toDF 方法。

统计缺失值
// 缺失值处理
    val columns: Array[String] = df1.columns  //返回df1的字段组成的数组 Array("字段1","字段2","字段3"...)
    // 通过select方法对字段数组中的每一个字段进行搜索,并通过where方法找出满足列col(字段).isNUll的值的count(个数)
    val missing_cnt: Array[Long] = columns.map(field => df1.select(col(field)).where(col(field).isNull).count())
    // 通过zip方法将两个集合数组合并成一个元组
    val tuples: Array[(Long, String)] = missing_cnt.zip(columns)
    // 把生成的元组读取为RDD对象再转为DataFrame对象
    val result_df: DataFrame = spark.sparkContext.parallelize(tuples).toDF("missing_cnt", "column_name")
    result_df.show()  // 统计缺失值

统计结果展示:

+-----------+-----------+
|missing_cnt|column_name|
+-----------+-----------+
|          0|   Survived|
|          0|     Pclass|
|          0|        Sex|
|        177|        Age|
|          0|      SibSp|
|          0|      Parch|
|          0|       Fare|
|          2|   Embarked|
+-----------+-----------+
缺失值处理
// 处理缺失值函数
    def meanAge(dataFrame: DataFrame): Double = {
      dataFrame.select("Age")
        .na.drop() //删除 Age 为空的行
        //'round' 函数用于将数字四舍五入到指定的小数位数。'mean' 函数则用于计算一组数值的平均值。
        .agg(round(mean("Age"), 0)) //对'Age'列计算平均值,并保留0位小数,也就是取整
        .first()  //由于agg操作返回的是一个DataFrame,而这个DataFrame只有一行,所以使用first()方法获取这一行。
        .getDouble(0) //从结果行中获取第一个字段(索引为0)的值,并将其转换为Double类型。
    }

处理:

val df2 = df1.na.fill(Map("Age" -> meanAge(df1), "Embarked" -> "S"))
    df2.show()

处理结果展示:

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       S|
|       0|     3|female|14.0|    0|    0| 7.8542|       S|
|       1|     2|female|55.0|    0|    0|   16.0|       S|
|       0|     3|  male| 2.0|    4|    1| 29.125|       Q|
|       1|     2|  male|30.0|    0|    0|   13.0|       S|
|       0|     3|female|31.0|    1|    0|   18.0|       S|
|       1|     3|female|30.0|    0|    0|  7.225|       C|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 20 rows

三、Spark 数据分析

1、891人当中,共多少人生还?

// 1.891人当中,共多少人生还?
    val survived_count: DataFrame = df2.groupBy("Survived").count()
    survived_count.show()
//保存结果到本地 
survived_count.coalesce(1).write.option("header","true").csv("output/practice1/survived_count.csv")

运行结果:

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+

2.不同上船港口生还情况

// 2.不同上船港口生还情况
    val survived_embark = df2.groupBy("Embarked", "Survived").count()
    survived_embark.show()
    survived_embark.coalesce(1).write.option("header","true").csv("data/practice1survived_embark.csv")

运行结果:

+--------+--------+-----+
|Embarked|Survived|count|
+--------+--------+-----+
|       Q|       1|   30|
|       S|       0|  427|
|       S|       1|  219|
|       C|       1|   93|
|       Q|       0|   47|
|       C|       0|   75|
+--------+--------+-----+

3.存活/未存活的男女数量及比例

 // 3.存活/未存活的男女数量及比例
    val survived_sex_count=df2.groupBy("Sex","Survived").count()
    val survived_sex_percent=survived_sex_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    survived_sex_percent.show()
    survived_sex_percent.coalesce(1).write.option("header", "true").csv("data/practice1/survived_sex_percent.csv")

运行结果:

+------+--------+-----+--------+
|   Sex|Survived|count| percent|
+------+--------+-----+--------+
|  male|       0|  468|52.52525|
|female|       1|  233|26.15039|
|female|       0|   81| 9.09091|
|  male|       1|  109|12.23345|
+------+--------+-----+--------+

4. 不同级别乘客生还人数和占总生还人数的比例

// 4. 不同级别乘客生还人数和占总生还人数的比例
    val survived_df = df2.filter(col("Survived")===1)
    val pclass_survived_count=survived_df.groupBy("Pclass").count()
    val pclass_survived_percent=pclass_survived_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    pclass_survived_percent.show()
    pclass_survived_percent.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_survived_percent.csv")

运行结果:

+------+-----+--------+
|Pclass|count| percent|
+------+-----+--------+
|     1|  136|39.76608|
|     3|  119|34.79532|
|     2|   87|25.43860|
+------+-----+--------+

5. 有无同行父母/孩子的生还情况

 // 5.有无同行父母/孩子的生还情况
    val df4=df2.withColumn("Parch_label",when(df2("Parch")>0,1).otherwise(0))
    val parch_survived_count=df4.groupBy("Parch_label","Survived").count()
    parch_survived_count.show()
    parch_survived_count.coalesce(1).write.option("header", "true").csv("data/practice1/parch_survived_count.csv")

运行结果:

+-----------+--------+-----+
|Parch_label|Survived|count|
+-----------+--------+-----+
|          1|       0|  104|
|          1|       1|  109|
|          0|       0|  445|
|          0|       1|  233|
+-----------+--------+-----+

6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况

    // 6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况
    val df3=survived_df.withColumn("Age_label",when(df2("Age")<=18,"minor").when(df2("Age")>18 && df2("Age")<=35,"young").when(df2("Age")>35 && df2("Age")<=55,"middle").otherwise("older"))
    val age_survived=df3.groupBy("Age_label","Survived").count()
    age_survived.show()
    age_survived.coalesce(1).write.option("header", "true").csv("data/practice1/age_survived.csv")

运行结果:

+---------+--------+-----+
|Age_label|Survived|count|
+---------+--------+-----+
|    young|       1|  189|
|    older|       1|   12|
|    minor|       1|   70|
|   middle|       1|   71|
+---------+--------+-----+

7. 提取乘客等级和上船费用信息

    // 7.提取乘客等级和上船费用信息
    val sef = Seq("Pclass", "Fare")
    val df5 = df2.select(sef.head, sef.tail: _*)
    df5.show(5)
    df5.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_fare.csv")

运行结果:

+------+-------+
|Pclass|   Fare|
+------+-------+
|     3|   7.25|
|     1|71.2833|
|     3|  7.925|
|     1|   53.1|
|     3|   8.05|
+------+-------+
only showing top 5 rows

四、数据可视化

数据可视化部分打算在学完 R 语言再完成,Python 实现后续更新。

相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
860 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
321 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
SQL
SQL如何只让特定列中只显示一行数据
SQL如何只让特定列中只显示一行数据
|
9月前
|
SQL 自然语言处理 数据库
【Azure Developer】分享两段Python代码处理表格(CSV格式)数据 : 根据每列的内容生成SQL语句
本文介绍了使用Python Pandas处理数据收集任务中格式不统一的问题。针对两种情况:服务名对应多人拥有状态(1/0表示),以及服务名与人名重复列的情况,分别采用双层for循环和字典数据结构实现数据转换,最终生成Name对应的Services列表(逗号分隔)。此方法高效解决大量数据的人工处理难题,减少错误并提升效率。文中附带代码示例及执行结果截图,便于理解和实践。
259 4
|
5月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
|
5月前
|
SQL
SQL中如何删除指定查询出来的数据
SQL中如何删除指定查询出来的数据
|
5月前
|
SQL 关系型数据库 MySQL
SQL如何对不同表的数据进行更新
本文介绍了如何将表A的Col1数据更新到表B的Col1中,分别提供了Microsoft SQL和MySQL的实现方法,并探讨了多表合并后更新的优化方式,如使用MERGE语句提升效率。适用于数据库数据同步与批量更新场景。
|
7月前
|
SQL 数据挖掘 关系型数据库
【SQL 周周练】一千条数据需要做一天,怎么用 SQL 处理电表数据(如何动态构造自然月)
题目来自于某位发帖人在某 Excel 论坛的求助,他需要将电表缴费数据按照缴费区间拆开后再按月份汇总。当时用手工处理数据,自称一千条数据就需要处理一天。我将这个问题转化为 SQL 题目。
273 12
|
6月前
|
SQL DataWorks 数据管理
SQL血缘分析实战!数据人必会的3大救命场景
1. 开源工具:Apache Atlas(元数据管理)、Spline(血缘追踪) 2. 企业级方案:阿里DataWorks血缘分析、腾讯云CDW血缘引擎 3. 自研技巧:在ETL脚本中植入版本水印,用注释记录业务逻辑变更 📌 重点总结:
|
7月前
|
SQL 数据采集 资源调度
【SQL 周周练】爬取短视频发现数据缺失,如何用 SQL 填充
爬虫爬取抖音和快手的短视频数据时,如果遇到数据缺失的情况,如何使用 SQL 语句完成数据的补全。
201 5