大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


SparkSQL介绍

SparkSQL特点

SparkSQL数据抽象

SparkSQL数据类型

SparkSession

在 Spark2.0 之前

  • SQLContext 是创建 DataFrame 和 执行SQL的入口
  • HiveContext 通过HiveSQL语句操作Hive数据,兼Hive操作,HiveContext继承自SQLContext
  • 在 Spark2.0 后

这些入口点统一到了SparkSession,SparkSession封装了SQLContext及HiveContext

实现了SQLContext即HiveContext所有功能

通过SparkSession可以获取到SparkContext

RDD(Resilient Distributed Dataset,弹性分布式数据集)

RDD 是 Spark 的基础抽象,它表示一个不可变的、分布式的数据集。


特点:


不可变性:RDD 是不可变的,一旦创建就不能修改。任何对 RDD 的操作都会生成一个新的 RDD。

弹性:RDD 可以自动从节点失败中恢复数据,通过将计算逻辑重新应用到原始数据来重建丢失的数据。

分布式:RDD 可以分布在多个节点上执行操作,充分利用集群的计算能力。

延迟计算:RDD 的操作是延迟执行的(lazy evaluation),即只有在触发行动操作(如 count()、collect())时,Spark 才会实际执行计算。

类型安全:RDD 是类型化的,但它的 API 是松散类型(loosely typed)的,这意味着编译器不会在编译时检查数据的类型,而是在运行时才会发现类型错误。

DataFrame

DataFrame 是一种基于 RDD 的分布式数据集,它具有命名的列。


特点:


结构化数据:DataFrame 是一个二维表格,具有命名的列和行,类似于关系数据库中的表或 Pandas 的 DataFrame。

优化引擎:DataFrame 受益于 Spark SQL 引擎的优化,如 Catalyst 优化器,可以自动优化查询并生成高效的执行计划。

丰富的 API:DataFrame 提供了一个高层次的 API,支持复杂的查询、过滤、聚合和连接操作。

类型不安全:与 RDD 不同,DataFrame 是动态类型(dynamic typing)的,数据类型检查是在运行时进行的,因此它在编译时不进行类型检查。

DataSet

DataSet 是 Spark 1.6 引入的一个新的数据抽象,它结合了 RDD 的强类型优势和 DataFrame 的优化能力。


特点:


类型安全:DataSet 是强类型的,它利用编译时类型检查,确保在编译时检测类型错误。

优化和性能:DataSet 受益于 Catalyst 优化器和 Tungsten 执行引擎,提供与 DataFrame 相同的优化能力,同时保留了类型安全性。

更丰富的 API:DataSet 提供了 RDD 的大部分 API,如 map、filter 等,同时也支持 SQL 查询。

统一 API:DataSet API 统一了 RDD 和 DataFrame,提供了一种更具表现力和安全性的编程模型。

DataFrame & Dataset 创建

不要刻意区分: DF & DS,DF是一种特殊的DS:ds.transformation => ds


由 Range 生成 Dataset

在 spark-shell 中进行测试

val numDS = spark.range(5, 100, 5)
// orderBy 转换操作 
numDS.orderBy(desc("id")).show(5)
// 统计信息
numDS.describe().show
// 显示 Schema 信息
numDS.printSchema
// 使用RDD执行同样的操作
numDS.rdd.map(_.toInt).stats
// 检查分区数
numDS.rdd.getNumPartitions

运行测试的过程如下图所示:

有集合生成Dataset

Dataset = RDD[case class],在 spark-shell 中进行测试

case class Person(name: String, age: Int, height: Int)

// 注意 Seq 中元素的类型
val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))

val ds1 = spark.createDataset(seq1)
ds1.printSchema
ds1.show

执行的结果:

再来一个测试:

val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val ds2 = spark.createDataset(seq2)
ds2.printSchema
ds2.show

执行的结果:

由集合生成DataFrame

DataFrame = RDD[Row] + Schema

继续进行测试:

val lst = List(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val df1 = spark.createDataFrame(lst).withColumnRenamed("_1", "name1").withColumnRenamed("_2", "age1").withColumnRenamed("_3", "height1")
df1.orderBy("age1").show(10)

执行的结果如下图所示:

RDD转成DataFrame

DataFrame = RDD[Row] + Schema

val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd1 = sc.makeRDD(arr).map(f => Row(f._1, f._2, f._3))

val schema = StructType(
  StructField("name", StringType, false) ::
  StructField("age", IntegerType, false) ::
  StructField("height", IntegerType, false) ::
  Nil
)

val schema1 = (new StructType).add("name", "string", false).add("age", "int", false).add("height", "int", false)
val rddToDF = spark.createDataFrame(rdd1, schema)
rddToDF.orderBy(desc("name")).show(false)

执行的结果如下图:

RDD转Dataset

Dataset = RDD[case class]

DataFrame = RDD[Row] + Schema

val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd1 = sc.makeRDD(arr)
val ds2 = spark.createDataset(rdd1)
ds2.show(10)

执行的结果如下图:

文件创建DataFrame

CSV文件

我们生成了一个CSV文件,大致内容如下:

运行测试

val df1 = spark.read.csv("/opt/wzk/data/people1.csv")
df1.printSchema()
df1.show()

运行结果如下图所示:

三者转换

Spark SQL 提供了一个领域特定语言(DSL)以方便操作结构化数据,核心思想还是SQL,仅仅是一个语法问题。


RDD 与 DataFrame 之间的转换

RDD 转换为 DataFrame

将 RDD 转换为 DataFrame 需要提供数据的模式信息。通常你会使用 toDF() 方法将 RDD 转换为 DataFrame。

这里有两种主要方法:


使用隐式转换:需要导入 spark.implicits._,这允许你在不显式提供模式的情况下将常见的 RDD(如元组)转换为 DataFrame。

使用 StructType 定义模式:如果 RDD 的数据结构比较复杂,或者你需要精确控制 DataFrame 的模式,可以使用 StructType 和 Row。

DataFrame 转换为 RDD:


将 DataFrame 转换为 RDD 非常简单,只需调用 rdd 方法即可

DataFrame 与 DataSet 之间的转换

DataFrame 转换为 DataSet

DataFrame 是无类型的,而 DataSet 是类型化的。为了将 DataFrame 转换为 DataSet,你需要定义一个对应的数据类型(通常是一个 case class)并使用 as[T] 方法

DataSet 转换为 DataFrame

将 DataSet 转换为 DataFrame 非常简单,只需调用 toDF() 方法即可

RDD 与 DataSet 之间的转换

RDD 转换为 DataSet

将 RDD 转换为 DataSet 需要将 RDD 的元素类型与 DataSet 的类型一致。与将 RDD 转换为 DataFrame 类似,通常使用隐式转换或显式提供模式信息

DataSet 转换为 RDD

DataSet 本质上是类型化的 RDD,因此转换为 RDD 非常直接,只需调用 rdd 方法

最终汇总

RDD 转换为 DataFrame:使用 toDF(),或使用 createDataFrame() 提供模式。

DataFrame 转换为 RDD:使用 rdd 方法,转换后元素类型为 Row。

DataFrame 转换为 DataSet:使用 as[T] 方法,需提供对应的 case class。

DataSet 转换为 DataFrame:使用 toDF() 方法。

RDD 转换为 DataSet:使用 toDS(),需提供对应的 case class。

DataSet 转换为 RDD:使用 rdd 方法。


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
122 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
19天前
|
SQL 算法 大数据
为什么大数据平台会回归SQL
在大数据领域,尽管非结构化数据占据了大数据平台80%以上的存储空间,结构化数据分析依然是核心任务。SQL因其广泛的应用基础和易于上手的特点成为大数据处理的主要语言,各大厂商纷纷支持SQL以提高市场竞争力。然而,SQL在处理复杂计算时表现出的性能和开发效率低下问题日益凸显,如难以充分利用现代硬件能力、复杂SQL优化困难等。为了解决这些问题,出现了像SPL这样的开源计算引擎,它通过提供更高效的开发体验和计算性能,以及对多种数据源的支持,为大数据处理带来了新的解决方案。
|
1月前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
1月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
73 6
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
103 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
72 1
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
64 1
|
1月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
29天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
234 7
|
29天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
42 2