大数据Spark Dataset

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Spark Dataset

1 Dataset 是什么

Dataset是在Spark1.6中添加的新的接口,是DataFrame API的一个扩展,是Spark最新的数据抽象,结合了RDD和DataFrame的优点。


1.与RDD相比:保存了更多的描述信息,概念上等同于关系型数据库中的二维表;

2.与DataFrame相比:保存了类型信息,是强类型的,提供了编译时类型检查,调用Dataset的方法先会生成逻辑计划,然后被Spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行;

Dataset是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换。

b8ddb036ea9843f4b3136678e8645102.png

从Spark 2.0开始,DataFrame与Dataset合并,每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]。

bbcaf1ab9ba343f7ae692ac432821f62.png


Dataset API是DataFrames的扩展,它提供了一种类型安全的,面向对象的编程接口。它是一个强类型,不可变的对象集合,映射到关系模式。在数据集的核心 API是一个称为编码器的新概念,它负责在JVM对象和表格表示之间进行转换。表格表示使用Spark内部Tungsten二进制格式存储,允许对序列化数据进行操作并提高内存利用率。Spark 1.6支持自动生成各种类型的编码器,包括基本类型(例如String,Integer,Long),Scala案例类和Java Bean。针对Dataset数据结构来说,可以简单的从如下四个要点记忆与理解:

9322ebcfacca405990a5d5d2f3b06193.png

Spark 框架从最初的数据结构RDD、到SparkSQL中针对结构化数据封装的数据结构DataFrame,最终使用Dataset数据集进行封装,发展流程如下。

cc24222799734fc49d5289a0eb4fe686.png

所以在实际项目中建议使用Dataset进行数据封装,数据分析性能和数据存储更加好。


2 对比DataFrame

Spark在Spark 1.3版本中引入了Dataframe,DataFrame是组织到命名列中的分布式数据集合,

但是有如下几点限制:


1.编译时类型安全:

Dataframe API不支持编译时安全性,这限制了在结构不知道时操纵数据。

以下示例在编译期间有效。但是,执行此代码时将出现运行时异常。

c45397e280de4cbf8d8fecd90b35a43b.png

2.无法对域对象(丢失域对象)进行操作:

将域对象转换为DataFrame后,无法从中重新生成它;

下面的示例中,一旦我们从personRDD创建personDF,将不会恢复Person类的原始RDD

(RDD [Person]);

78c9586a4cc94730a3c3199afde8abf7.png

基于上述的两点,从Spark 1.6开始出现Dataset,至Spark 2.0中将DataFrame与Dataset合并,其中DataFrame为Dataset特殊类型,类型为Row。

fdade644c44b4d8a951ec0a3f12b3e15.png

针对RDD、DataFrame与Dataset三者编程比较来说,Dataset API无论语法错误和分析错误在编译时都能发现,然而RDD和DataFrame有的需要在运行时才能发现。a4fd198c3d2a464894cef817d3272d13.png

此外RDD与Dataset相比较而言,由于Dataset数据使用特殊编码,所以在存储数据时更加节省内存。

71ffd1b8d0d3488e82f520044bee6a4d.png

3 RDD、DF与DS转换

实际项目开发中,常常需要对RDD、DataFrame及Dataset之间相互转换,其中要点就是Schema约束结构信息。


1)、RDD转换DataFrame或者Dataset

转换DataFrame时,定义Schema信息,两种方式

转换为Dataset时,不仅需要Schema信息,还需要RDD数据类型为CaseClass类型

2)、Dataset或DataFrame转换RDD

由于Dataset或DataFrame底层就是RDD,所以直接调用rdd函数即可转换

dataframe.rdd 或者dataset.rdd

3)、DataFrame与Dataset之间转换

由于DataFrame为Dataset特例,所以Dataset直接调用toDF函数转换为DataFrame

当将DataFrame转换为Dataset时,使用函数as[Type],指定CaseClass类型即可。

59b271689770407680cb9c9accee2bac.png

范例演示:分别读取people.txt文件数据封装到RDD、DataFrame及Dataset,查看区别及相互转换。

1.第一步、加载文件数据,封装不同数据结构6062e7b634c14373b751539450199dd0.png

Dataset转换为RDD和DataFrame:c80a3c45ff0e4e028c57db0c3a17bc56.png

2.第二步、加载JSON数据,将DataFrame转换为Dataset49285d966703471086799a9103236218.png

完整演示代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
 * 官方案例演示Dataset是什么:
 * http://spark.apache.org/docs/2.4.5/sql-getting-started.html#creating-datasets
 */
object SparkDatasetExample {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession
      .builder() // 使用建造者模式构建对象
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    // 演示案例一:加载文本数据,分别封装到RDD、DataFrame和Dataset中
    // 其一、SparkContext加载,封装RDD
    val peoplesRDD: RDD[String] = spark.sparkContext
      .textFile("datas/resources/people.txt")
    // 其二、调用text函数,封装DataFrame
    val peoplesDF: DataFrame = spark.read.text("datas/resources/people.txt")
    // 其三、调用textFile函数,封装Dataset
    val peoplesDS: Dataset[String] = spark.read.textFile("datas/resources/people.txt")
    // DataFrame转换为RDD
    peoplesDF.rdd
    // Dataset转换为RDD或者DataFrame
    peoplesDS.toDF()
    peoplesDS.rdd
    // 演示案例二:加载Json格式数据,DataFrame转换为Dataset
    val jsonDF: DataFrame = spark.read.json("datas/resources/employees.json")
    jsonDF.printSchema()
    val jsonDS: Dataset[Employee] = jsonDF.as[Employee]
    jsonDS.show(10)
    // 应用结束,关闭资源
    spark.stop()
  }
}

4 面试题:如何理解RDD、DataFrame和Dataset

SparkSQL中常见面试题:如何理解Spark中三种数据结构RDD、DataFrame和Dataset关系?


1.第一、数据结构RDD:

RDD(Resilient Distributed Datasets)叫做弹性分布式数据集,是Spark中最基本的数据

抽象,源码中是一个抽象类,代表一个不可变、可分区、里面的元素可并行计算的集合。

编译时类型安全,但是无论是集群间的通信,还是IO操作都需要对对象的结构和数据进行

序列化和反序列化,还存在较大的GC的性能开销,会频繁的创建和销毁对象。

2.第二、数据结构DataFrame:

与RDD类似,DataFrame是一个分布式数据容器,不过它更像数据库中的二维表格,除了

数据之外,还记录这数据的结构信息(即schema)。

DataFrame也是懒执行的,性能上要比RDD高(主要因为执行计划得到了优化)。

由于DataFrame每一行的数据结构一样,且存在schema中,Spark通过schema就能读懂

数据,因此在通信和IO时只需要序列化和反序列化数据,而结构部分不用。

Spark能够以二进制的形式序列化数据到JVM堆以外(off-heap:非堆)的内存,这些内

存直接受操作系统管理,也就不再受JVM的限制和GC的困扰了。但是DataFrame不是类

型安全的。

3.第三、数据结构Dataset:

Dataset是DataFrame API的一个扩展,是Spark最新的数据抽象,结合了RDD和DataFrame

的优点。

DataFrame=Dataset[Row](Row表示表结构信息的类型),DataFrame只知道字段,但

是不知道字段类型,而Dataset是强类型的,不仅仅知道字段,而且知道字段类型。

样例类CaseClass被用来在Dataset中定义数据的结构信息,样例类中的每个属性名称直接

对应到Dataset中的字段名称。

Dataset具有类型安全检查,也具有DataFrame的查询优化特性,还支持编解码器,当需

要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。

RDD、DataFrame和DataSet之间的转换如下,假设有个样例类: case class Emp(name: String), 相互转换

ef495b24983546a19820a1d9f617e735.png

RDD转换到DataFrame:rdd.toDF(“name”)
RDD转换到Dataset:rdd.map(x => Emp(x)).toDS
DataFrame转换到Dataset:df.as[Emp]
DataFrame转换到RDD:df.rdd
Dataset转换到DataFrame:ds.toDF
Dataset转换到RDD:ds.rdd


RDD与DataFrame或者DataSet进行操作,都需要引入隐式转换import spark.implicits._,其中的spark是SparkSession对象的名称!


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
17天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
52 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
18天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
47 6
|
16天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
62 2
|
17天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
57 1
|
17天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
18天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
4天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
50 7
|
4天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
15 2
|
17天前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
58 1