Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】

简介: Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】

前言

       昨天实验课试着做了一个 Spark SQL 小案例,发现好多内容还是没有掌握,以及好多书上没有的内容需要学习。

一、数据准备

csv 文件内容部分数据展示:

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q

字段说明

• PassengerId : 乘客编号。

• Survived : 是否存活,0表示未能存活,1表示存活。

• Pclass : 描述乘客所属的等级,总共分为三等,用1、2、3来描述:1表示高等;2表示中等;3表示低等。

• Name : 乘客姓名。

• Sex : 乘客性别。

• Age : 乘客年龄。

• SibSp : 与乘客同行的兄弟姐妹(Siblings)和配偶(Spouse)数目。

• Parch : 与乘客同行的家长(Parents)和孩子(Children)数目。

• Ticket : 乘客登船所使用的船票编号。

• Fare : 乘客上船的花费。

• Cabin : 乘客所住的船舱。

• Embarked : 乘客上船时的港口,C表示Cherbourg;Q表示Queenstown;S表示Southampton。

二、Spark数据预处理

1、通过读取本地文件生成 DataFrame 对象。

// 创建 SparkSession 对象
    val conf = new SparkConf().setMaster("local[*]").setAppName("practice1")
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    // 导入隐式转换相关依赖
    import spark.implicits._
    // 读取csv文件生成 DataFrame 对象
    val df = spark.read.format("csv")
      .option("header","true")
      .option("mode","DROPMALFORMED")
      .load("data/practice1/titanic.csv")

2、修改字段类型

DataFrame 读取进来的都是 StringType 类型,我们需要对部分字段进行修改。


       'withColumn'是一个DataFrame转换函数,用于在现有的DataFrame上添加或替换列。这个函数接收两个参数,第一个是新列的名称,第二个是新列的值。对于新列的值,我们使用 cast 方法将它强制转为一个新的类型。


       cast方法用于将一个数据类型的值转换为另一个数据类型。它可以用于将一种数据类型转换为另一种数据类型,例如将字符串转换为整数或将整数转换为浮点数等。


withColumn 作为一个转换函数会返回一个新的 DataFrame 对象,记得通过变量或常量存储起来。

// 修改字段数据类型
    val md_df = df.withColumn("Pclass", df("Pclass").cast(IntegerType)) // 乘客登记 包括1-2-3三个等级
      .withColumn("Survived", df("Survived").cast(IntegerType)) //是否存活-1存活 0-未能存活
      .withColumn("Age", df("Age").cast(DoubleType)) // 年龄
      .withColumn("SibSp", df("SibSp").cast(IntegerType)) // 乘客的兄弟姐妹和配偶的数量
      .withColumn("Parch", df("Parch").cast(IntegerType)) //乘客的家长和孩子数目
      .withColumn("Fare", df("Fare").cast(DoubleType)) // 上传的花费

3、删除不必要的字段

// 删除不必要的字段
    val df1 = md_df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin")

4、缺失值处理

用到的函数:

DSL 语句中的 select、where函数,以及 count 、zip 函数。

涉及到的操作:

RDD 对象转为 DataFrame 对象,这里因为RDD对象的内容是元组,所以可以直接调用 toDF 方法。

统计缺失值
// 缺失值处理
    val columns: Array[String] = df1.columns  //返回df1的字段组成的数组 Array("字段1","字段2","字段3"...)
    // 通过select方法对字段数组中的每一个字段进行搜索,并通过where方法找出满足列col(字段).isNUll的值的count(个数)
    val missing_cnt: Array[Long] = columns.map(field => df1.select(col(field)).where(col(field).isNull).count())
    // 通过zip方法将两个集合数组合并成一个元组
    val tuples: Array[(Long, String)] = missing_cnt.zip(columns)
    // 把生成的元组读取为RDD对象再转为DataFrame对象
    val result_df: DataFrame = spark.sparkContext.parallelize(tuples).toDF("missing_cnt", "column_name")
    result_df.show()  // 统计缺失值

统计结果展示:

+-----------+-----------+
|missing_cnt|column_name|
+-----------+-----------+
|          0|   Survived|
|          0|     Pclass|
|          0|        Sex|
|        177|        Age|
|          0|      SibSp|
|          0|      Parch|
|          0|       Fare|
|          2|   Embarked|
+-----------+-----------+
缺失值处理
// 处理缺失值函数
    def meanAge(dataFrame: DataFrame): Double = {
      dataFrame.select("Age")
        .na.drop() //删除 Age 为空的行
        //'round' 函数用于将数字四舍五入到指定的小数位数。'mean' 函数则用于计算一组数值的平均值。
        .agg(round(mean("Age"), 0)) //对'Age'列计算平均值,并保留0位小数,也就是取整
        .first()  //由于agg操作返回的是一个DataFrame,而这个DataFrame只有一行,所以使用first()方法获取这一行。
        .getDouble(0) //从结果行中获取第一个字段(索引为0)的值,并将其转换为Double类型。
    }

处理:

val df2 = df1.na.fill(Map("Age" -> meanAge(df1), "Embarked" -> "S"))
    df2.show()

处理结果展示:

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       S|
|       0|     3|female|14.0|    0|    0| 7.8542|       S|
|       1|     2|female|55.0|    0|    0|   16.0|       S|
|       0|     3|  male| 2.0|    4|    1| 29.125|       Q|
|       1|     2|  male|30.0|    0|    0|   13.0|       S|
|       0|     3|female|31.0|    1|    0|   18.0|       S|
|       1|     3|female|30.0|    0|    0|  7.225|       C|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 20 rows

三、Spark 数据分析

1、891人当中,共多少人生还?

// 1.891人当中,共多少人生还?
    val survived_count: DataFrame = df2.groupBy("Survived").count()
    survived_count.show()
//保存结果到本地 
survived_count.coalesce(1).write.option("header","true").csv("output/practice1/survived_count.csv")

运行结果:

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+

2.不同上船港口生还情况

// 2.不同上船港口生还情况
    val survived_embark = df2.groupBy("Embarked", "Survived").count()
    survived_embark.show()
    survived_embark.coalesce(1).write.option("header","true").csv("data/practice1survived_embark.csv")

运行结果:

+--------+--------+-----+
|Embarked|Survived|count|
+--------+--------+-----+
|       Q|       1|   30|
|       S|       0|  427|
|       S|       1|  219|
|       C|       1|   93|
|       Q|       0|   47|
|       C|       0|   75|
+--------+--------+-----+

3.存活/未存活的男女数量及比例

 // 3.存活/未存活的男女数量及比例
    val survived_sex_count=df2.groupBy("Sex","Survived").count()
    val survived_sex_percent=survived_sex_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    survived_sex_percent.show()
    survived_sex_percent.coalesce(1).write.option("header", "true").csv("data/practice1/survived_sex_percent.csv")

运行结果:

+------+--------+-----+--------+
|   Sex|Survived|count| percent|
+------+--------+-----+--------+
|  male|       0|  468|52.52525|
|female|       1|  233|26.15039|
|female|       0|   81| 9.09091|
|  male|       1|  109|12.23345|
+------+--------+-----+--------+

4. 不同级别乘客生还人数和占总生还人数的比例

// 4. 不同级别乘客生还人数和占总生还人数的比例
    val survived_df = df2.filter(col("Survived")===1)
    val pclass_survived_count=survived_df.groupBy("Pclass").count()
    val pclass_survived_percent=pclass_survived_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    pclass_survived_percent.show()
    pclass_survived_percent.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_survived_percent.csv")

运行结果:

+------+-----+--------+
|Pclass|count| percent|
+------+-----+--------+
|     1|  136|39.76608|
|     3|  119|34.79532|
|     2|   87|25.43860|
+------+-----+--------+

5. 有无同行父母/孩子的生还情况

 // 5.有无同行父母/孩子的生还情况
    val df4=df2.withColumn("Parch_label",when(df2("Parch")>0,1).otherwise(0))
    val parch_survived_count=df4.groupBy("Parch_label","Survived").count()
    parch_survived_count.show()
    parch_survived_count.coalesce(1).write.option("header", "true").csv("data/practice1/parch_survived_count.csv")

运行结果:

+-----------+--------+-----+
|Parch_label|Survived|count|
+-----------+--------+-----+
|          1|       0|  104|
|          1|       1|  109|
|          0|       0|  445|
|          0|       1|  233|
+-----------+--------+-----+

6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况

    // 6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况
    val df3=survived_df.withColumn("Age_label",when(df2("Age")<=18,"minor").when(df2("Age")>18 && df2("Age")<=35,"young").when(df2("Age")>35 && df2("Age")<=55,"middle").otherwise("older"))
    val age_survived=df3.groupBy("Age_label","Survived").count()
    age_survived.show()
    age_survived.coalesce(1).write.option("header", "true").csv("data/practice1/age_survived.csv")

运行结果:

+---------+--------+-----+
|Age_label|Survived|count|
+---------+--------+-----+
|    young|       1|  189|
|    older|       1|   12|
|    minor|       1|   70|
|   middle|       1|   71|
+---------+--------+-----+

7. 提取乘客等级和上船费用信息

    // 7.提取乘客等级和上船费用信息
    val sef = Seq("Pclass", "Fare")
    val df5 = df2.select(sef.head, sef.tail: _*)
    df5.show(5)
    df5.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_fare.csv")

运行结果:

+------+-------+
|Pclass|   Fare|
+------+-------+
|     3|   7.25|
|     1|71.2833|
|     3|  7.925|
|     1|   53.1|
|     3|   8.05|
+------+-------+
only showing top 5 rows

四、数据可视化

数据可视化部分打算在学完 R 语言再完成,Python 实现后续更新。

相关文章
|
6天前
|
数据采集 DataWorks 数据挖掘
提升数据分析效率:DataWorks在企业级数据治理中的应用
【8月更文第25天】本文将探讨阿里巴巴云的DataWorks平台如何通过建立统一的数据标准、规范以及实现数据质量监控和元数据管理来提高企业的数据分析效率。我们将通过具体的案例研究和技术实践来展示DataWorks如何简化数据处理流程,减少成本,并加速业务决策。
121 54
|
9天前
|
存储 数据挖掘 数据处理
DataFrame探索之旅:如何一眼洞察数据本质,提升你的数据分析能力?
【8月更文挑战第22天】本文通过电商用户订单数据的案例,展示了如何使用Python的pandas库查看DataFrame信息。首先导入数据并使用`head()`, `columns`, `shape`, `describe()`, 和 `dtypes` 方法来快速概览数据的基本特征。接着,通过对数据进行分组操作计算每位顾客的平均订单金额,以此展示初步数据分析的过程。掌握这些技能对于高效的数据分析至关重要。
22 2
|
11天前
|
数据采集 机器学习/深度学习 算法
"揭秘数据质量自动化的秘密武器:机器学习模型如何精准捕捉数据中的‘隐形陷阱’,让你的数据分析无懈可击?"
【8月更文挑战第20天】随着大数据成为核心资源,数据质量直接影响机器学习模型的准确性和效果。传统的人工审查方法效率低且易错。本文介绍如何运用机器学习自动化评估数据质量,解决缺失值、异常值等问题,提升模型训练效率和预测准确性。通过Python和scikit-learn示例展示了异常值检测的过程,最后强调在自动化评估的同时结合人工审查的重要性。
35 2
|
7天前
|
SQL 存储 NoSQL
数据模型与应用场景对比:SQL vs NoSQL
【8月更文第24天】随着大数据时代的到来,数据存储技术也在不断演进和发展。传统的SQL(Structured Query Language)数据库和新兴的NoSQL(Not Only SQL)数据库各有优势,在不同的应用场景中发挥着重要作用。本文将从数据模型的角度出发,对比分析SQL和NoSQL数据库的特点,并通过具体的代码示例来说明它们各自适用的场景。
20 0
|
8天前
|
SQL 关系型数据库 MySQL
4、SQL数据操纵
4、SQL数据操纵
23 0
|
8天前
|
SQL 存储 JSON
【Azure 存储服务】Blob中数据通过Stream Analytics导出到SQL/Cosmos DB
【Azure 存储服务】Blob中数据通过Stream Analytics导出到SQL/Cosmos DB
|
10天前
|
SQL JSON 关系型数据库
"SQL老司机大揭秘:如何在数据库中玩转数组、映射与JSON,解锁数据处理的无限可能,一场数据与技术的激情碰撞!"
【8月更文挑战第21天】SQL作为数据库语言,其能力不断进化,尤其是在处理复杂数据类型如数组、映射及JSON方面。例如,PostgreSQL自8.2版起支持数组类型,并提供`unnest()`和`array_agg()`等函数用于数组的操作。对于映射类型,虽然SQL标准未直接支持,但通过JSON数据类型间接实现了键值对的存储与查询。如在PostgreSQL中创建含JSONB类型的表,并使用`-&gt;&gt;`提取特定字段或`@&gt;`进行复杂条件筛选。掌握这些技巧对于高效管理现代数据至关重要,并预示着SQL在未来数据处理领域将持续扮演核心角色。
21 0
|
12天前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之sql查询如何导出全量数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
19天前
|
数据采集 数据可视化 数据挖掘
数据分析大神养成记:Python+Pandas+Matplotlib助你飞跃!
在数字化时代,数据分析至关重要,而Python凭借其强大的数据处理能力和丰富的库支持,已成为该领域的首选工具。Python作为基石,提供简洁语法和全面功能,适用于从数据预处理到高级分析的各种任务。Pandas库则像是神兵利器,其DataFrame结构让表格型数据的处理变得简单高效,支持数据的增删改查及复杂变换。配合Matplotlib这一数据可视化的魔法棒,能以直观图表展现数据分析结果。掌握这三大神器,你也能成为数据分析领域的高手!
41 2
|
23天前
|
机器学习/深度学习 数据采集 数据可视化
基于爬虫和机器学习的招聘数据分析与可视化系统,python django框架,前端bootstrap,机器学习有八种带有可视化大屏和后台
本文介绍了一个基于Python Django框架和Bootstrap前端技术,集成了机器学习算法和数据可视化的招聘数据分析与可视化系统,该系统通过爬虫技术获取职位信息,并使用多种机器学习模型进行薪资预测、职位匹配和趋势分析,提供了一个直观的可视化大屏和后台管理系统,以优化招聘策略并提升决策质量。

热门文章

最新文章

下一篇
云函数