【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)

一、Spark SQL简介

park SQL是spark的一个模块,主要用于进行结构化数据的SQL查询引擎,开发人员能够通过使用SQL语句,实现对结构化数据的处理,开发人员可以不了解Scala语言和Spark常用API,通过spark SQL,可以使用Spark框架提供的强大的数据分析能力。spark SQL前身为Shark。Shark是Spark上的数据仓库,最初设计成与Hive兼容,但是该项目于2014年项目因设计问题中止研发,转向Spark SQL。Spark SQL全面继承了Shark,并进行了优化。 Spark SQL主要提供了以下三个功能:

1)Spark SQL可从各种结构化数据源中读取数据,进行数据分析。

2)Spark SQL包含行业标准的JDBC和ODBC连接方式,因此它不局限于在Spark程序内使用SQL语句进行查询。

3)Spark SQL可以无缝地将SQL查询与Spark程序进行结合,它能够将结构化数据作为Spark中的分布式数据集(RDD)进行查询。

二、Spark架构

Spark SQL架构与Hive架构相比,把底层的MapReduce执行引擎更改为Spark,还修改了Catalyst优化器,Spark SQL快速的计算效率得益于Catalyst优化器。从HQL被解析成语法抽象树起,执行计划生成和优化的工作全部交给Spark SQL的Catalyst优化器进行负责和管理。

三、Dataframe

1:Dataframe简介

Spark SQL使用的数据抽象并非是RDD,而是DataFrame。

在Spark 1.3.0版本之前,DataFrame被称为SchemaRDD。

DataFrame使Spark具备处理大规模结构化数据的能力。 在Spark中,DataFrame是一种以RDD为基础的分布式数据集。 DataFrame的结构类似传统数据库的二维表格,可以从很多数据源中创建,如结构化文件、外部数据库、Hive表等数据源。

DataFrame可以看作是分布式的Row对象的集合,在二维表数据集的每一列都带有名称和类型,这就是Schema元信息,这使得Spark框架可获取更多数据结构信息,从而对在DataFrame背后的数据源以及作用于DataFrame之上数据变换进行针对性的优化,最终达到提升计算效率。

2:Dataframe的创建

创建DataFrame的两种基本方式:

已存在的RDD调用toDF()方法转换得到DataFrame。

通过Spark读取数据源直接创建DataFrame。

若使用SparkSession方式创建DataFrame,可以使用spark.read从不同类型的文件中加载数据创建DataFrame。spark.read的具体操作,如下所示。

具体操作示例如下: 首先在linux操作系统hadoop文件夹下存了两个文件,一个是people.json,存的是 json格式数据,内容如下: {"name":"张三"} {"name":"李四", "age":45} {"name":"王五", "age":28} 另一个是people.txt,存的是普通文本格式,内容如下: 张三, 29 李四, 30 王五, 19。 打开spark-shell,如图所示

读取json文件创建DataFrame,命令如下

读取txt文件创建DataFrame,命令如下

已存在的RDD调用toDF()方法转换得到DataFrame,具体操作如下:

(1)从文件系统加载数据创建RDD

val lineRDD = sc.textFile("file:///hadoop/people").Map(_.split(","))

(2)定义类

case class Person(name:String,age:Int)

(3)实例化类并赋值

val personRDD = lineRDD.Map(x => Person(x(0), x(1).toInt))

(4)调用toDF()方法转换得到DataFrame

val personDF = personRDD.toDF()

3:Dataframe常用操作

DataFrame提供了两种语法风格,即DSL风格语法和SQL风格语法。二者在功能上并无区别,仅仅是根据用户习惯,自定义选择操作方式。

DataFrame提供了一个领域特定语言(DSL)以方便操作结构化数据。

在程序中直接使用spark.sql()方式执行SQL查询,结果将作为一个DataFrame返回,使用SQL风格操作的前提是将DataFrame注册成一个临时表。

DSL风格操作DataFrame的常用方法,具体如下表如示。

DSL风格操作Dataframe实例如下

SQL风格操作Dataframe

三、Dataset

Dataset是从Spark1.6 Alpha版本中引入的一个新的数据抽象结构,最终在Spark2.0版本被定义成Spark新特性。Dataset提供了特定域对象中的强类型集合,也就是在RDD的每行数据中添加了类型约束条件,只有约束条件的数据类型才能正常运行。Dataset结合了RDD和DataFrame的优点,并且可以调用封装的方法以并行方式进行转换等操作。

RDD、DataFrame及Dataset的区别

RDD数据的表现形式,即序号(1),此时RDD数据没有数据类型和元数据信息。

DataFrame数据的表现形式,即序号(2),此时DataFrame数据中添加Schema元数据信息(列名和数据类型,如ID:String),DataFrame每行类型固定为Row类型,每列的值无法直接访问,只有通过解析才能获取各个字段的值。

Dataset数据的表现形式,序号(3)和(4),其中序号(3)是在RDD每行数据的基础之上,添加一个数据类型(value:String)作为Schema元数据信息。而序号(4)每行数据添加People强数据类型,在Dataset[Person]中里存放了3个字段和属性,Dataset每行数据类型可自定义,一旦定义后,就具有错误检查机制。

1、通过SparkSession中的createDataset来创建Dataset

scala > val personDs=spark.createDataset(sc.textFile("/spark/person.txt"))
personDs: org.apache.spark.sql.Dataset[String] = [value: string]
scala > personDs.show()
+---------------+
|          value     |
+---------------+
|1  zhangsan 20|
|2  lisi           29|
|3  wangwu  25|
|4  zhaoliu   30 |     
|5  tianqi      35|
|6  jerry        40|
+---------------+

2、DataFrame通过“as[ElementType]”方法转换得到Dataset

scala> spark.read.text("/spark/person.txt").as[String]
res14: org.apache.spark.sql.Dataset[String] = [value: string]
scala> spark.read.text("/spark/person.txt").as[String].toDF()
res15: org.apache.spark.sql.DataFrame = [value: string]

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
2月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
66 0
|
1月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
70 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
88 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
58 0
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
5月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
134 13
|
5月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
5月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
69 6
下一篇
DataWorks