【赵渝强老师】Spark SQL的数据模型:DataFrame

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。

b076.png

通过SQL语句处理数据的前提是需要创建一张表,在Spark SQL中表被定义DataFrame,它由两部分组成:表结构的Schema和数据集合RDD,下图说明了DataFrame的组成。


   

从图中可以看出RDD是一个Java对象的数据集合,而DataFrame增加了Schema的结构信息。因此可以把DataFrame看成是一张表,而DataFrame的表现形式也可以看成是RDD。DataFrame除了具有RDD的特性以外,还提供了更加丰富的算子,并且还提升执行效率、减少数据读取以及执行计划的优化。

   

视频讲解如下:

   

创建DataFrame主要可以通过三种不同的方式来进行创建,这里还是以的员工数据的csv文件为例。文件内容如下:

7369,SMITH,CLERK,7902,1980/12/17,800,0,20 
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30 
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30 
7566,JONES,MANAGER,7839,1981/4/2,2975,0,20 
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 
7698,BLAKE,MANAGER,7839,1981/5/1,2850,0,30 
7782,CLARK,MANAGER,7839,1981/6/9,2450,0,10 
7788,SCOTT,ANALYST,7566,1987/4/19,3000,0,20 
7839,KING,PRESIDENT,-1,1981/11/17,5000,0,10 
7844,TURNER,SALESMAN,7698,1981/c9/8,1500,0,30 
7876,ADAMS,CLERK,7788,1987/5/23,1100,0,20 
7900,JAMES,CLERK,7698,1981/12/3,950,0,30 
7902,FORD,ANALYST,7566,1981/12/3,3000,0,20 
7934,MILLER,CLERK,7782,1982/1/23,1300,0,10


下面分别举例进行说明如何使用spark-shell在Spark SQL中创建DataFrame。

一、使用case class定义DataFrame表结构

   

Scala中提供了一种特殊的类,用case class进行声明,中文也可以称作“样本类”。样本类是一种特殊的类,经过优化以用于模式匹配。样本类类似于常规类,带有一个case 修饰符的类,在构建不可变类时,样本类非常有用,特别是在并发性和数据传输对象的上下文中。在Spark SQL中也可以使用样本类来创建DataFrame的表结构。


(1)定义员工表的结构Schema。

scala> case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)


(2)将员工数据读入RDD。

scala> val rdd1 = sc.textFile("/scott/emp.csv").map(_.split(","))


(3)关联RDD和Schema。

scala> val emp = rdd1.map(x=>Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))


(4)生成DataFrame。

scala> val df = emp.toDF


(5)查询员工表中的数据,结果如下图所示。

scala> df.showopen2023scala> df.show



二、使用StructType定义DataFrame表结构

   

Spark 提供了StructType用于定义结构化的数据类型,类似于关系型数据库中的表结构。通过定义StructType,可以指定数据中每个字段的名称和数据类型,从而更好地组织和处理数据。


(1)导入需要的类型

scala> import org.apache.spark.sql.types._ 
scala> import org.apache.spark.sql.Row


(2)定义表结构。

scala> val myschema = StructType(
       List(StructField("empno",DataTypes.IntegerType),
            StructField("ename",DataTypes.StringType),
            StructField("job",DataTypes.StringType),
            StructField("mgr", DataTypes.IntegerType),
            StructField("hiredate", DataTypes.StringType),
            StructField("sal", DataTypes.IntegerType),
            StructField("comm",DataTypes.IntegerType),
            StructField("deptno", DataTypes.IntegerType)))


(3)将数据读入RDD。

scala> val rdd2 = sc.textFile("/scott/emp.csv").map(_.split(","))


(4)将RDD中的数据映射成Row对象。

scala> val rowRDD = rdd2.map(x=>Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))


(5)创建DataFrame。

scala> val df = spark.createDataFrame(rowRDD,myschema)


三、直接加载带格式的数据文件

   

Spark提供了结构化的示例数据文件,利用这些结构化的数据文件可以直接创建DataFrame,这些文件位于Spark安装目录下的/examples/src/main/resources中。下面是提供的people.json文件中的数据内容。


{"name":"Michael"} 
{"name":"Andy", "age":30} 
{"name":"Justin", "age":19}

   

由于数据源文件本身就具有格式,因此可以直接创建DataFrame。下面是具体的步骤。

(1)为了便于操作,将people.json文件复制到用户的HOME目录下

cp people.json /root


(2)直接创建DataFrame。这里加载的文件在本地目录,也可以是HDFS。

scala> val people = spark.read.json("file:///root/people.json")


(3)执行一个简单的查询,如下图所示。

scala> people.show




相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
7天前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
7天前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
42 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
76 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
34 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
49 0
|
2月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
4月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
110 13
|
4月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
4月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
60 6