深入了解sparkSession

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 深入了解sparkSession

在上一篇文章中,我们简单了解了SparkSession,这篇文章将深入了解SparkSession的相关内容。

深入了解SparkSession

当我们深入了解SparkSession时,重要的一部分是了解其核心数据结构,其中包括DataFrame类、Row类和Dataset类。在Scala中,Spark使用这些类来处理和操作分布式数据。

DataFrame类

DataFrame是Spark中最基本和最常用的数据结构之一

它是一个分布式的数据表,类似于关系型数据库中的表。DataFrame由一系列的列组成,每列都有一个名称和数据类型。你可以将DataFrame看作是一个分布式的数据集,可以进行各种复杂的数据操作和转换。

在Scala中,我们可以使用SparkSession的createDataFrame方法创建DataFrame。以下是一个简单的例子:

import org.apache.spark.sql.{SparkSession, DataFrame}

val spark = SparkSession.builder.appName("example").getOrCreate()

// 通过Seq创建DataFrame
val data = Seq(("Alice", 25), ("Bob", 30), ("Charlie", 22))
val columns = Seq("Name", "Age")
val df: DataFrame = spark.createDataFrame(data).toDF(columns: _*)

// 显示DataFrame的内容
df.show()

上面的例子中,我们创建了一个DataFrame,其中包含两列:Name和Age。我们使用show方法来显示DataFrame的内容。

Row类

Row是DataFrame中的一行数据,它是一个泛型对象,可以包含不同类型的数据。在Scala中,我们可以通过Row类的实例来表示一行数据。以下是一个简单的例子:

import org.apache.spark.sql.Row

// 创建一个Row对象
val row: Row = Row("John", 28)

// 访问Row的数据
val name: String = row.getString(0)
val age: Int = row.getInt(1)

println(s"Name: $name, Age: $age")

在这个例子中,我们创建了一个包含名字和年龄的Row对象,并通过getStringgetInt方法访问其中的数据。

Dataset类

Dataset是Spark中引入的一个强类型的数据抽象,它结合了DataFrame和RDD的优点。Dataset提供了类型安全性和更丰富的API,使得在编译时能够捕获更多的错误。在Scala中,可以通过定义一个样例类(case class)来定义Dataset的结构。

以下是一个使用Dataset的简单例子:

import org.apache.spark.sql.{SparkSession, Dataset}

// 定义一个样例类表示数据结构
case class Person(name: String, age: Int)

val spark = SparkSession.builder.appName("example").getOrCreate()

// 创建一个Dataset
val data: Seq[Person] = Seq(Person("Alice", 25), Person("Bob", 30), Person("Charlie", 22))
val ds: Dataset[Person] = spark.createDataset(data)

// 使用Dataset的API进行操作
val filteredDS = ds.filter(person => person.age > 25)
filteredDS.show()

在这个例子中,我们定义了一个名为Person的样例类,表示数据的结构。然后,我们使用该样例类创建了一个Dataset,并使用filter方法对数据进行过滤。

以上是在Scala中深入了解SparkSession及其相关数据结构的一些基本概念。使用这些概念,你可以更好地理解和利用Spark进行大规模数据处理。在实际工作中,可以通过组合使用DataFrame、Row和Dataset等类来完成各种复杂的数据处理任务。

RDD

在Apache Spark中,RDD(Resilient Distributed Dataset)是一个基本的抽象数据类型,代表一个可以被分区的、可并行操作的元素集合。RDD是Spark的核心数据结构之一,它允许开发者在分布式计算环境中进行弹性、容错的数据处理。

以下是一个简单的Scala例子,展示如何创建和操作RDD:

import org.apache.spark.{SparkConf, SparkContext}

// 创建Spark配置和Spark上下文
val conf = new SparkConf().setAppName("RDDExample").setMaster("local[*]")
val sc = new SparkContext(conf)

// 创建一个RDD,从集合中并行化生成
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)

// 对RDD进行转换操作,例如将每个元素乘以2
val transformedRDD = rdd.map(x => x * 2)

// 执行一个行动操作,例如将RDD的元素打印出来
transformedRDD.collect().foreach(println)

// 关闭Spark上下文
sc.stop()

在这个例子中,首先创建了一个Spark配置(SparkConf)和一个Spark上下文(SparkContext)。接着,通过parallelize方法将一个本地集合转换成RDD,这个RDD包含了集合中的元素,并分布在Spark集群的各个节点上。然后,通过map转换操作对RDD中的每个元素进行乘以2的操作。最后,通过collect行动操作将RDD的元素收集到驱动程序中,并通过foreach打印出来。

RDD是弹性的,因为它可以自动从节点故障中进行恢复。当某个节点上的数据丢失时,Spark可以根据RDD的依赖关系重新计算数据,确保计算的鲁棒性。RDD还是不可变的,一旦创建就不能被修改。对RDD的所有操作都会生成一个新的RDD,而不会改变原始的RDD。

需要注意的是,从Spark 2.0版本开始,建议使用DataFrame和Dataset这两个高层次的API,它们建立在RDD的基础上,并提供了更丰富的功能和优化。在大多数情况下,推荐使用DataFrame和Dataset而不是直接操作RDD。

RDD和DataFrame的区别

RDD(Resilient Distributed Dataset)和DataFrame是Apache Spark中两个不同的数据抽象,它们在概念上有一些重要的区别。以下是它们的主要区别:

  1. 「抽象层次不同:」
  • 「RDD:」 RDD是Spark最早引入的数据抽象,它是一个分布式的、不可变的数据集合,提供了一种粗粒度的操作接口。RDD需要用户显式地管理数据的分布和序列化,操作过程中需要手动处理分区和依赖关系。
  • 「DataFrame:」 DataFrame是在Spark 1.3版本引入的,它建立在RDD的基础上,提供了更高层次的抽象。DataFrame是一种分布式的数据表,类似于关系型数据库中的表,可以进行更丰富的高层次的数据操作,而无需显式地管理分区和依赖关系。
  1. 「类型信息不同:」
  • 「RDD:」 RDD是弱类型(untyped)的,即在编译时不检查数据类型,用户需要在运行时处理数据的类型。
  • 「DataFrame:」 DataFrame是强类型(typed)的,它使用Spark的Catalyst引擎,在编译时进行类型检查,提供了更好的类型安全性。
  1. 「优化和性能:」
  • 「RDD:」 RDD的操作是通过用户定义的函数进行的,Spark在运行时难以对这些函数进行优化,因此性能相对较低。
  • 「DataFrame:」 DataFrame通过Spark的Catalyst引擎进行逻辑和物理优化,可以更好地利用Spark的执行计划优化,从而提高性能。
  1. 「API和语法:」
  • 「RDD:」 RDD的API相对较低级,需要用户自行处理分布式计算的细节。
  • 「DataFrame:」 DataFrame提供了更高级、更直观的API,类似于SQL查询,更易于使用和学习。
  1. 「支持的语言:」
  • 「RDD:」 支持Scala、Java、Python等多种编程语言。
  • 「DataFrame:」 支持Scala、Java、Python、R等多种编程语言,同时提供了更多的语言绑定。

在实际使用中,推荐使用DataFrame和Dataset API,因为它们提供了更高层次的抽象,更好的性能优化,以及更丰富的功能。RDD仍然存在于Spark中,但主要用于一些特殊场景或与旧版本兼容。

SparkSession中的sql方法

什么是sql方法

在Spark中,sql方法是SparkSession提供的一种用于执行SQL查询的便捷方法。SparkSession是Spark 2.0版本引入的,用于替代之前版本中的SQLContextHiveContext,它提供了统一的入口点,使得在Spark应用程序中使用SQL更为方便。

sql方法允许用户在Spark中以类似于关系型数据库的方式执行SQL查询,无论数据源是什么,可以是DataFrame、表、Parquet文件、JSON数据等。该方法接受一个SQL查询字符串作为参数,并返回一个DataFrame作为查询结果。

有哪些调用方式

在Scala中,SparkSessionsql方法有多个重载形式,允许用户以不同的方式执行SQL查询。以下是其中一些重载方法及对应的用法:

sql(sqlText: String): DataFrame

这是最基本的重载形式,接受一个SQL查询字符串,并返回一个DataFrame。

val resultDataFrame = sparkSession.sql("SELECT * FROM myTable")

sql(sqlText: String, params: Map[String, String]): DataFrame

允许通过参数化的方式传递查询参数,用于防止SQL注入攻击。


val params = Map("tableName" -> "myTable", "columnName" -> "age")
val resultDataFrame = sparkSession.sql("SELECT * FROM ${tableName} WHERE ${columnName} > 21", params)

sql(sqlText: String, params: Seq[NamedExpression]): DataFrame

类似于上一个例子,但是使用了Spark的表达式(Expression)作为参数。

import org.apache.spark.sql.catalyst.expressions.NamedExpression

val params: Seq[NamedExpression] = Seq($"tableName", $"columnName")
val resultDataFrame = sparkSession.sql("SELECT * FROM ${tableName} WHERE ${columnName} > 21", params: _*)

sql(sqlText: String, args: Any*): DataFrame

允许通过可变参数列表传递查询参数,参数的顺序与SQL查询中的?占位符对应。


val resultDataFrame = sparkSession.sql("SELECT * FROM myTable WHERE age > ?", 21)

这些重载方法提供了不同的方式来执行SQL查询,以适应不同的使用场景和需求。选择合适的方法取决于查询的复杂性、参数化需求以及结果DataFrame的期望结构。

SparkSession是如何执行SQL语句的

当调用sql方法时,SparkSession会将SQL查询字符串解析为逻辑查询计划(Logical Plan)。这个逻辑计划描述了查询的逻辑结构,但尚未指定具体的物理执行计划。

接下来,SparkSession会使用Catalyst优化器对逻辑查询计划进行优化,包括谓词下推、列剪枝等优化。经过优化后的逻辑计划将被转换为物理执行计划(Physical Plan),这个计划描述了如何在Spark集群上执行查询。

最后,SparkSession会将物理执行计划转换为一系列的RDD转换操作,这些操作会被发送到Spark集群上的各个节点进行执行。具体的执行计划可以通过explain方法查看,以便更好地理解Spark是如何执行查询的。

执行SQL语句的流程

执行SQL语句的流程可以总结为以下几个步骤:

  1. 「解析:」 将SQL查询字符串解析为逻辑查询计划。
  2. 「优化:」 使用Catalyst优化器对逻辑查询计划进行优化,生成物理执行计划。
  3. 「执行计划转换:」 将物理执行计划转换为一系列的RDD转换操作。
  4. 「分布式执行:」 将转换后的执行计划分发到Spark集群上的各个节点进行执行。
  5. 「结果返回:」 将执行的结果汇总返回给用户。

执行SQL语句的原理

执行SQL语句的原理基于Spark的分布式计算引擎和优化器。Spark使用Catalyst作为查询优化器,它能够根据查询的逻辑结构和数据分布情况生成高效的物理执行计划。

物理执行计划会被转换为一系列的Spark RDD(Resilient Distributed Datasets)操作,这些操作通过Spark集群上的任务并行执行,从而实现了分布式计算。RDD操作的弹性特性确保了在节点故障时的恢复,保证了计算的鲁棒性。

总体而言,Spark的执行SQL语句的原理是将SQL查询经过解析、优化和执行计划生成的过程,最终转化为分布式的、容错的计算任务,以高效地处理大规模的数据。

目录
相关文章
|
7月前
|
SQL 存储 缓存
Paimon与Spark
Paimon与Spark
275 1
|
SQL 机器学习/深度学习 分布式计算
Spark5:SparkSQL
Spark5:SparkSQL
112 0
|
4月前
|
SQL 机器学习/深度学习 分布式计算
|
SQL 分布式计算 HIVE
spark2.2 SparkSession思考与总结1
spark2.2 SparkSession思考与总结1
113 0
spark2.2 SparkSession思考与总结1
|
SQL 存储 缓存
让你真正理解什么是SparkContext, SQLContext 和HiveContext
让你真正理解什么是SparkContext, SQLContext 和HiveContext
315 0
让你真正理解什么是SparkContext, SQLContext 和HiveContext
|
SQL 缓存 分布式计算
spark2的SparkSession思考与总结2:SparkSession有哪些函数及作用是什么
spark2的SparkSession思考与总结2:SparkSession有哪些函数及作用是什么
297 0
|
分布式计算 Spark Java
Spark2.4.0 SparkSession 源码分析
创建SparkContext new SparkSession
3285 0
|
SQL JSON 分布式计算
SparkSQL 是什么_适用场景 | 学习笔记
快速学习 SparkSQL 是什么_适用场景
327 0
|
SQL 缓存 分布式计算
SparkSQL 初识_2
快速学习 SparkSQL 初识_2
135 0
SparkSQL 初识_2
|
缓存 分布式计算 算法
SparkSQL 初识_1
快速学习 SparkSQL 初识_1
145 0
下一篇
DataWorks