Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 1.整体运行流程使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implic

1.整体运行流程

使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
AI 代码解读

(1)查看teenagers的Schema信息

scala> teenagers.printSchema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
AI 代码解读

(2)查看运行流程

scala> teenagers.queryExecution
res3: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true
AI 代码解读

QueryExecution中表示的是整体Spark SQL运行流程,从上面的输出结果可以看到,一个SQL语句要执行需要经过下列步骤:

== (1)Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== (2)Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== (3)Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== (4)Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

//启动动态字节码生成技术(bytecode generation,CG),提升查询效率
Code Generation: true
AI 代码解读

2.全表查询运行流程

执行语句:

val all= sqlContext.sql("SELECT * FROM people")
AI 代码解读

运行流程:

scala> all.queryExecution
res9: org.apache.spark.sql.SQLContext#QueryExecution =
//注意*号被解析为unresolvedalias(*)
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
//unresolvedalias(*)被analyzed为Schema中所有的字段
//UnresolvedRelation [people]被analyzed为Subquery people
name: string, age: int
Project [name#0,age#1]
 Subquery people
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Scan PhysicalRDD[name#0,age#1]

Code Generation: true
AI 代码解读

3. filter查询运行流程

执行语句:

scala> val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]
AI 代码解读

执行流程:

scala> filterQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 //多出了Filter,后同
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20

== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true
AI 代码解读

4. join查询运行流程

执行语句:

val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")
AI 代码解读

查看整体执行流程

scala> joinQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Filter
//Join Inner
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter ('a.age = 'b.age)
  'Join Inner, None
   'UnresolvedRelation [people], Some(a)
   'UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#2,age#3]
 Filter (age#1 = age#3)
  Join Inner, None
   Subquery a
    Subquery people
     LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
   Subquery b
    Subquery people
     LogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Project [name#0,age#1,name#2,age#3]
 Join Inner, Some((age#1 = age#3))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4]...

//查看其Physical Plan
scala> joinQuery.queryExecution.sparkPlan
res16: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#2,age#3]
 SortMergeJoin [age#1], [age#3]
  Scan PhysicalRDD[name#0,age#1]
  Scan PhysicalRDD[name#2,age#3]
AI 代码解读

前面的例子与下面的例子等同,只不过其运行方式略有不同,执行语句:

scala> val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age")
innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]
AI 代码解读

查看整体执行流程:

scala> innerQuery.queryExecution
res2: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Join Inner
//另外这里面没有Filter
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Join Inner, Some(('a.age = 'b.age))
  'UnresolvedRelation [people], Some(a)
  'UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#4,age#5]
 Join Inner, Some((age#1 = age#5))
  Subquery a
   Subquery people
    LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
  Subquery b
   Subquery people
    LogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

//注意Optimized Logical Plan与Analyzed Logical Plan
//并没有进行特别的优化,突出这一点是为了比较后面的子查询
//其Analyzed和Optimized间的区别
== Optimized Logical Plan ==
Project [name#0,age#1,name#4,age#5]
 Join Inner, Some((age#1 = age#5))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ...

//查看其Physical Plan
scala> innerQuery.queryExecution.sparkPlan
res14: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#6,age#7]
 SortMergeJoin [age#1], [age#7]
  Scan PhysicalRDD[name#0,age#1]
  Scan PhysicalRDD[name#6,age#7]
AI 代码解读

5. 子查询运行流程

执行语句:

scala> val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19")
subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]
AI 代码解读

查看整体执行流程:


scala> subQuery.queryExecution
res4: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter ('a.age <= 19)
  'Subquery a
   'Project [unresolvedalias(*)]
    'Filter ('age >= 13)
     'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter (age#1 <= 19)
  Subquery a
   Project [name#0,age#1]
    Filter (age#1 >= 13)
     Subquery people
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

//这里需要注意Optimized与Analyzed间的区别
//Filter被进行了优化
== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

AI 代码解读

6. 聚合SQL运行流程

执行语句:

scala> val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19 group by a.name")
aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]
AI 代码解读

运行流程查看:


scala> aggregateQuery.queryExecution
res6: org.apache.spark.sql.SQLContext#QueryExecution =
//注意'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
//即group by a.name被 parsed为unresolvedalias('a.name)
== Parsed Logical Plan ==
'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
 'Filter ('a.age <= 19)
  'Subquery a
   'Project [unresolvedalias(*)]
    'Filter ('age >= 13)
     'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, _c1: bigint
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
 Filter (age#1 <= 19)
  Subquery a
   Project [name#0,age#1]
    Filter (age#1 >= 13)
     Subquery people
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  LogicalRDD [name#0,age#1], MapPartitions...

//查看其Physical Plan
scala> aggregateQuery.queryExecution.sparkPlan
res10: org.apache.spark.sql.execution.SparkPlan =
TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L])
 TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L])
  Filter ((age#1 >= 13) && (age#1 <= 19))
   Scan PhysicalRDD[name#0,age#1]
AI 代码解读

其它SQL语句,大家可以使用同样的方法查看其执行流程,以掌握Spark SQL背后实现的基本思想。

相关文章
鸿蒙HarmonyOS应用开发 | HarmonyOS Next-从应用开发到上架全流程解析
HarmonyOS Next是华为推出的最新版本鸿蒙操作系统,强调多设备协同和分布式技术,提供丰富的开发工具和API接口。本文详细解析了从应用开发到上架的全流程,包括环境搭建、应用设计与开发、多设备适配、测试调试、应用上架及推广等环节,并介绍了鸿蒙原生应用开发者激励计划,帮助开发者更好地融入鸿蒙生态。通过DevEco Studio集成开发环境和华为提供的多种支持工具,开发者可以轻松创建并发布高质量的鸿蒙应用,享受技术和市场推广的双重支持。
630 11
MySQL原理简介—1.SQL的执行流程
本文介绍了MySQL驱动、数据库连接池及SQL执行流程的关键组件和作用。主要内容包括:MySQL驱动用于建立Java系统与数据库的网络连接;数据库连接池提高多线程并发访问效率;MySQL中的连接池维护多个数据库连接并进行权限验证;网络连接由线程处理,监听请求并读取数据;SQL接口负责执行SQL语句;查询解析器将SQL语句解析为可执行逻辑;查询优化器选择最优查询路径;存储引擎接口负责实际的数据操作;执行器根据优化后的执行计划调用存储引擎接口完成SQL语句的执行。整个流程确保了高效、安全地处理SQL请求。
285 77
Android调试终极指南:ADB安装+多设备连接+ANR日志抓取全流程解析,覆盖环境变量配置/多设备调试/ANR日志分析全流程,附Win/Mac/Linux三平台解决方案
ADB(Android Debug Bridge)是安卓开发中的重要工具,用于连接电脑与安卓设备,实现文件传输、应用管理、日志抓取等功能。本文介绍了 ADB 的基本概念、安装配置及常用命令。包括:1) 基本命令如 `adb version` 和 `adb devices`;2) 权限操作如 `adb root` 和 `adb shell`;3) APK 操作如安装、卸载应用;4) 文件传输如 `adb push` 和 `adb pull`;5) 日志记录如 `adb logcat`;6) 系统信息获取如屏幕截图和录屏。通过这些功能,用户可高效调试和管理安卓设备。
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
本期内容为「ximagine」频道《显示器测试流程》的规范及标准,我们主要使用Calman、DisplayCAL、i1Profiler等软件及CA410、Spyder X、i1Pro 2等设备,是我们目前制作内容数据的重要来源,我们深知所做的仍是比较表面的活儿,和工程师、科研人员相比有着不小的差距,测试并不复杂,但是相当繁琐,收集整理测试无不花费大量时间精力,内容不完善或者有错误的地方,希望大佬指出我们好改进!
212 16
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
137 12
新手上云教程参考:阿里云服务器租用、域名注册、备案及域名解析流程图文教程
对于想要在阿里云上搭建网站或应用的用户来说,购买阿里云服务器和注册域名,绑定以及备案的流程至关重要。本文将以图文形式为您介绍阿里云服务器购买、域名注册、备案及绑定的全流程,以供参考,帮助用户轻松上手。
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
336 0
阿里云服务器租用、注册域名、备案及域名解析完整流程参考(图文教程)
对于很多初次建站的用户来说,选购云服务器和注册应及备案和域名解析步骤必须了解的,目前轻量云服务器2核2G68元一年,2核4G4M服务器298元一年,域名注册方面,阿里云推出域名1元购买活动,新用户注册com和cn域名2年首年仅需0元,xyz和top等域名首年仅需1元。对于建站的用户来说,购买完云服务器并注册好域名之后,下一步还需要操作备案和域名绑定。本文为大家展示阿里云服务器的购买流程,域名注册、绑定以及备案的完整流程,全文以图文教程形式为大家展示具体细节及注意事项,以供新手用户参考。
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
119 12

热门文章

最新文章

推荐镜像

更多