Spark SQL实战(04)-API编程之DataFrame

简介: 1 SparkSessionSpark Core: SparkContextSpark SQL: 难道就没有SparkContext?

1 SparkSession

Spark Core: SparkContext


Spark SQL: 难道就没有SparkContext?


2.x之后统一的


package com.javaedge.bigdata.chapter04

import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkSessionApp {

 def main(args: Array[String]): Unit = {

   // DF/DS编程的入口点

   val spark: SparkSession = SparkSession.builder()

     .master("local").getOrCreate()

   // 读取文件的API

   val df: DataFrame = spark.read.text("/Users/javaedge/Downloads/sparksql-train/data/input.txt")

   // TODO 业务逻辑处理,通过DF/DS提供的API完成业务

   df.printSchema()

   df.show() // 展示出来  只有一个字段,string类型的value

   spark.stop()

 }

}


1.x的Spark SQL编程入口点


SQLContext

HiveContext

Spark SQL中,SQLContext、HiveContext都是用来创建DataFrame和Dataset主要入口点,二者区别如下:


数据源支持:SQLContext支持的数据源包括JSON、Parquet、JDBC等等,而HiveContext除了支持SQLContext的数据源外,还支持Hive的数据源。因此,如果需要访问Hive中的数据,需要使用HiveContext。

元数据管理:SQLContext不支持元数据管理,因此无法在内存中创建表和视图,只能直接读取数据源中的数据。而HiveContext可以在内存中创建表和视图,并将其存储在Hive Metastore中。

SQL语言支持:SQLContext和HiveContext都支持Spark SQL中的基本语法,例如SELECT、FROM、WHERE等等。但HiveContext还支持Hive中的所有SQL语法,例如INSERT、CREATE TABLE AS等等。

数据格式支持:HiveContext支持更多的数据格式,包括ORC、Avro、SequenceFile等等。而SQLContext只支持JSON、Parquet、JDBC等几种常用的数据格式。

如若访问Hive中数据或在内存中创建表和视图,推荐HiveContext;若只需访问常见数据源,使用SQLContext。


Spark 2.x后,HiveContext已被SparkSession替代,因此推荐SparkSession创建DataFrame、Dataset。


package com.javaedge.bigdata.chapter04

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.{DataFrame, SQLContext}

/**

* 了解即可,已过时

*/

object SQLContextApp {

 def main(args: Array[String]): Unit = {

   val sparkConf: SparkConf = new SparkConf().setAppName("SQLContextApp").setMaster("local")

   // 此处一定要把SparkConf传进来

   val sc: SparkContext = new SparkContext(sparkConf)

   val sqlContext: SQLContext = new SQLContext(sc)

   val df: DataFrame = sqlContext.read.text("/Users/javaedge/Downloads/sparksql-train/data/input.txt")

   df.show()

   sc.stop()

 }

}



output:

+-------------+

|        value|

+-------------+

|     pk,pk,pk|

|jepson,jepson|

|     xingxing|

+-------------+


2 DataFrame

最早在R语言数据分析包中提出,表示一种类似表格的数据结构,其中行和列都可以有命名。


Spark的DataFrame是基于RDD(弹性分布式数据集)的一种高级抽象,类似关系型数据库的表格。Spark DataFrame可看作带有模式(Schema)的RDD,而Schema则是由结构化数据类型(如字符串、整型、浮点型等)和字段名组成。


2.1 命名变迁

Spark 1.0的Spark SQL的数据结构称为SchemaRDD,具有结构化模式(schema)的分布式数据集合。


Spark 1.3版本开始,SchemaRDD重命名为DataFrame,以更好反映其API和功能实质。因此,DataFrame曾被称为SchemaRDD,但现已不再使用这名称。


2.2 Spark SQL的DataFrame优点

可通过SQL语句、API等多种方式进行查询和操作,还支持内置函数、用户自定义函数等功能

支持优化器和执行引擎,可自动对查询计划进行优化,提高查询效率

因此,DataFrame已成Spark SQL核心组件,广泛应用于数据分析、数据挖掘。


3 数据分析选型:PySpark V.S R 语言

数据规模:如果需要处理大型数据集,则使用PySpark更为合适,因为它可以在分布式计算集群上运行,并且能够处理较大规模的数据。而R语言则可能会受限于单机内存和计算能力。

熟练程度:如果你或你的团队已经很熟悉Python,那么使用PySpark也许更好一些,因为你们不需要再去学习新的编程语言。相反,如果已经对R语言很熟悉,那么继续使用R语言也许更为方便。

生态系统:Spark生态系统提供了许多额外的库和工具,例如Spark Streaming和GraphX等,这些库和工具可以与PySpark无缝集成。而R语言的生态系统也有一些类似的库和工具,但相对来说可选择性就更少一些。

总之,选择使用哪种工具进行数据分析应该基于具体情况进行考虑。如果需要处理大规模数据集,并需要与Spark生态系统集成,那么PySpark可能更适合;如果更加熟悉R语言,或者数据量较小,那么使用R语言也可以做到高效的数据分析。


4 深入理解

Dataset是一个分布式数据集,提供RDD强类型和使用强大的lambda函数的能力,并结合了Spark SQL优化的执行引擎。Dataset可以从JVM对象构建而成,并通过函数式转换(如map、flatMap、filter等)进行操作。Scala和Java都支持Dataset API,但Python没有对Dataset API提供支持。由于Python是一种动态语言,许多Dataset API的优点已经自然地可用,例如可以通过名称访问行的字段。R语言也有类似的特点。


DataFrame,具有命名列的Dataset,类似:


关系数据库中的表

Python中的数据框

但内部有更多优化功能。DataFrame可从各种数据源构建,如:


结构化数据文件

Hive表

外部数据库

现有RDD

DataFrame API 在 Scala、Java、Python 和 R 都可用。在Scala和Java中,DataFrame由一组Rows组成的Dataset表示:


Scala API中,DataFrame只是Dataset[Row]的类型别名

Java API中,用户需要使用Dataset表示DataFrame

通常将Scala/Java中的Dataset of Rows称为DataFrame。


5 实战

People.json


{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

package com.javaedge.bigdata.chapter04

import org.apache.spark.sql.{DataFrame, SparkSession}

object DataFrameAPIApp {

 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder()

     .master("local").appName("DataFrameAPIApp")

     .getOrCreate()

   import spark.implicits._

   val people: DataFrame = spark.read.json(

     "/Users/javaedge/Downloads/sparksql-train/data/people.json")

   // 查看DF的内部结构:列名、列的数据类型、是否可以为空

   people.printSchema()

   // 展示出DF内部的数据

   people.show()

 }

}

output:

root

 |-- age: long (nullable = true)

 |-- name: string (nullable = true)

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+



// DF里面有两列,只要name列 ==> select name from people

// 两个 API 一样的,只是参数不同,使用稍有不同

people.select("name").show()

people.select($"name").show()


output:


+-------+

|   name|

+-------+

|Michael|

|   Andy|

| Justin|

+-------+


// select * from people where age > 21

people.filter($"age" > 21).show()

people.filter("age > 21").show()



output:

+---+----+

|age|name|

+---+----+

| 30|Andy|

+---+----+


// select age, count(1) from people group by age

people.groupBy("age").count().show()


output:

+----+-----+

| age|count|

+----+-----+

|  19|    1|

|null|    1|

|  30|    1|

+----+-----+


createOrReplaceTempView

若现在,我就想完全使用 SQL 查询了,怎么实现 DF 到表的转换呢?


Spark SQL用来将一个 DataFrame 注册成一个临时表(Temporary Table)的方法。之后可使用 Spark SQL 语法及已注册的表名对 DataFrame 进行查询和操作。


允许为 DataFrame 指定一个名称,并将其保存为一个临时表。该表只存在于当前 SparkSession 的上下文,不会在元数据存储中注册表,也不会在磁盘创建任何文件。因此,临时表在SparkSession终止后就会被删。


一旦临时表被注册,就可使用 SQL 或 DSL 对其查询。如:


people.createOrReplaceTempView("people")

spark.sql("select name from people where age > 21").show()


大文件处理


val zips: DataFrame = spark.read.json("/Users/javaedge/Downloads/sparksql-train/data/zips.json")

zips.printSchema()

zips.show(5)

loc信息没用展示全,超过一定长度就使用…来展示,默认只显示前20条:show() ==> show(20) ==> show(numRows, truncate = true)


4.png


不想被截断就这样:

3.png



zips.head(3).foreach(println)

zips.first()

zips.take(5)


Output:


2.png


head(n: Int)

Spark的DataFrame API中的一个方法,可以返回一个包含前n行数据的数组。这个方法通常用于快速检查一个DataFrame的前几行数据,以了解数据集的大致结构和内容。


先对DataFrame使用.limit(n)方法,限制返回行数前n行

然后使用queryExecution方法生成一个Spark SQL查询计划

最后使用collectFromPlan方法收集数据并返回一个包含前n行数据的数组

该 API 可能导致数据集的全部数据被加载到内存,因此在处理大型数据集时应该谨慎使用。若只想查看数据集结构和内容,使用:


.show()

或.take()

获取前几行数据,而非.head()。


// 过滤出大于40000,字段重新命名

zips.filter(zips.col("pop") > 40000)

 .withColumnRenamed("_id", "new_id")

 .show(5, truncate = false)

output:

+------+----------+-----------------------+-----+-----+

|new_id|city      |loc                    |pop  |state|

+------+----------+-----------------------+-----+-----+

|01040 |HOLYOKE   |[-72.626193, 42.202007]|43704|MA   |

|01085 |MONTGOMERY|[-72.754318, 42.129484]|40117|MA   |

|01201 |PITTSFIELD|[-73.247088, 42.453086]|50655|MA   |

|01420 |FITCHBURG |[-71.803133, 42.579563]|41194|MA   |

|01701 |FRAMINGHAM|[-71.425486, 42.300665]|65046|MA   |

+------+----------+-----------------------+-----+-----+



import org.apache.spark.sql.functions._

// 统计加州pop最多的10个城市名称和ID  desc是一个内置函数

zips.select("_id", "city", "pop", "state")

 .filter(zips.col("state") === "CA")

 .orderBy(desc("pop"))

 .show(5, truncate = false)

output:

+-----+------------+-----+-----+

|_id  |city        |pop  |state|

+-----+------------+-----+-----+

|90201|BELL GARDENS|99568|CA   |

|90011|LOS ANGELES |96074|CA   |

|90650|NORWALK     |94188|CA   |

|91331|ARLETA      |88114|CA   |

|90280|SOUTH GATE  |87026|CA   |

+-----+------------+-----+-----+



可惜啊,我不会写代码,可以使用 MySQL 语法吗?


zips.createOrReplaceTempView("zips")

spark.sql("select _id,city,pop,state" +

 "from zips where state='CA'" +

 "order by pop desc" +

 "limit 10").show()


import spark.implicits._ 作用

在Scala中使用Apache Spark进行数据分析时经常用到的,它的作用是将隐式转换函数导入当前作用域中。这些隐式转换函数包含了许多DataFrame和Dataset的转换方法,例如将RDD转换为DataFrame或将元组转换为Dataset等。


具体来说,这行代码使用了SparkSession对象中的implicits属性,该属性返回了一个类型为org.apache.spark.sql.SQLImplicits的实例。通过调用该实例的方法,可以将各种Scala数据类型(如case class、元组等)与Spark SQL中的数据类型(如Row、DataFrame、Dataset等)之间进行转换,从而方便地进行数据操作和查询。


在使用许多Spark SQL API的时候,往往需要使用这行代码将隐式转换函数导入当前上下文,以获得更加简洁和易于理解的代码编写方式。


如果不导入会咋样

如果不导入spark.implicits._会导致编译错误或者运行时异常。因为在进行DataFrame和Dataset的操作时,需要使用到一些隐式转换函数。如果没有导入spark.implicits._,则这些隐式转换函数无法被自动引入当前上下文,就需要手动地导入这些函数,这样会使编码变得比较麻烦。


例如,在进行RDD和DataFrame之间的转换时,如果不导入spark.implicits._,则需要手动导入org.apache.spark.sql.Row以及org.apache.spark.sql.functions._等包,并通过调用toDF()方法将RDD转换为DataFrame。而有了导入spark.implicits._后,只需要直接调用RDD对象的toDF()方法即可完成转换。


因此,为了简化编码,通常会在Scala中使用Spark SQL时导入spark.implicits._,从而获得更加简洁易读的代码。


案例

people.select($"name").show()


如果不导入 spark.implicits._,则可以手动创建一个 Column 对象来进行筛选操作。例如,可以使用 col 函数来创建一个 Column 对象,然后在 select 方法中使用该列:


import org.apache.spark.sql.functions.col

val selected = people.select(col("name"))

selected.show()


这样就可以实现与 people.select($"name").show() 相同的效果,但需要手动创建 Column 对象。显然,在编写复杂的数据操作时,手动创建 Column 对象可能会变得非常繁琐和困难,因此通常情况下我们会选择使用隐式转换函数,从而更加方便地使用DataFrame的API。

目录
相关文章
|
1月前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
1月前
|
缓存 自然语言处理 API
阿里巴巴国际站关键字搜索 API 实战:3 步搞定多语言适配 + 限流破局,询盘量提升 40%
跨境电商API开发常陷合规、多语言、限流等坑。本文详解从国际合规(GDPR/CCPA)到参数优化、数据结构化及区域化搜索的全链路方案,附Python代码模板与缓存重试架构,助力提升调用成功率至99%+,精准询盘增长42%。
|
2月前
|
数据采集 缓存 API
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
本文详解小红书笔记详情API的开发对接、实战场景与收益模式,涵盖注册避坑、签名生成、数据解析全流程,并分享品牌营销、内容创作、SAAS工具等落地应用,助力开发者高效掘金“种草经济”。
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
|
1月前
|
Cloud Native 算法 API
Python API接口实战指南:从入门到精通
🌟蒋星熠Jaxonic,技术宇宙的星际旅人。深耕API开发,以Python为舟,探索RESTful、GraphQL等接口奥秘。擅长requests、aiohttp实战,专注性能优化与架构设计,用代码连接万物,谱写极客诗篇。
Python API接口实战指南:从入门到精通
|
1月前
|
开发者 API 机器学习/深度学习
淘宝 / 1688 / 义乌购图搜 API 实战指南:接口调用与商业场景应用
本文详解淘宝、1688、义乌购三大平台图片搜索接口的核心特点、调用流程与实战代码。涵盖跨平台对比、参数配置、响应解析及避坑指南,支持URL/Base64上传,返回商品ID、价格、销量等关键信息,助力开发者快速实现商品识别与比价功能。
淘宝 / 1688 / 义乌购图搜 API 实战指南:接口调用与商业场景应用
|
2月前
|
人工智能 运维 监控
阿里云 API 聚合实战:破解接口碎片化难题,3 类场景方案让业务响应提速 60%
API聚合破解接口碎片化困局,助力开发者降本增效。通过统一中间层整合微服务、第三方接口与AI模型,实现调用次数减少60%、响应提速70%。阿里云实测:APISIX+函数计算+ARMS监控组合,支撑百万级并发,故障定位效率提升90%。
250 0
|
2月前
|
JSON API 调度
Midjourney 技术拆解与阿里云开发者实战指南:从扩散模型到 API 批量生成
Midjourney深度解析:基于优化Stable Diffusion,实现文本到图像高效生成。涵盖技术架构、扩散模型原理、API调用、批量生成系统及阿里云生态协同,助力开发者快速落地AIGC图像创作。
463 0
|
分布式计算 Java Spark
Spark学习之编程进阶——累加器与广播(5)
Spark学习之编程进阶——累加器与广播(5) 1. Spark中两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable)。累加器对信息进行聚合,而广播变量用来高效分发较大的对象。 2. 共享变量是一种可以在Spark任务中使用的特殊类型的变量。 3. 累加器的用法: 通过在驱动器中调用SparkContex
1942 0
|
5月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
298 0
|
8月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
390 79