Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 1.整体运行流程使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implic

1.整体运行流程

使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

(1)查看teenagers的Schema信息

scala> teenagers.printSchema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

(2)查看运行流程

scala> teenagers.queryExecution
res3: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

QueryExecution中表示的是整体Spark SQL运行流程,从上面的输出结果可以看到,一个SQL语句要执行需要经过下列步骤:

== (1)Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== (2)Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== (3)Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== (4)Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

//启动动态字节码生成技术(bytecode generation,CG),提升查询效率
Code Generation: true

2.全表查询运行流程

执行语句:

val all= sqlContext.sql("SELECT * FROM people")

运行流程:

scala> all.queryExecution
res9: org.apache.spark.sql.SQLContext#QueryExecution =
//注意*号被解析为unresolvedalias(*)
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
//unresolvedalias(*)被analyzed为Schema中所有的字段
//UnresolvedRelation [people]被analyzed为Subquery people
name: string, age: int
Project [name#0,age#1]
 Subquery people
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Scan PhysicalRDD[name#0,age#1]

Code Generation: true

3. filter查询运行流程

执行语句:

scala> val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]

执行流程:

scala> filterQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 //多出了Filter,后同
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20

== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

4. join查询运行流程

执行语句:

val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")

查看整体执行流程

scala> joinQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Filter
//Join Inner
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter ('a.age = 'b.age)
  'Join Inner, None
   'UnresolvedRelation [people], Some(a)
   'UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#2,age#3]
 Filter (age#1 = age#3)
  Join Inner, None
   Subquery a
    Subquery people
     LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
   Subquery b
    Subquery people
     LogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Project [name#0,age#1,name#2,age#3]
 Join Inner, Some((age#1 = age#3))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4]...

//查看其Physical Plan
scala> joinQuery.queryExecution.sparkPlan
res16: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#2,age#3]
 SortMergeJoin [age#1], [age#3]
  Scan PhysicalRDD[name#0,age#1]
  Scan PhysicalRDD[name#2,age#3]

前面的例子与下面的例子等同,只不过其运行方式略有不同,执行语句:

scala> val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age")
innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]

查看整体执行流程:

scala> innerQuery.queryExecution
res2: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Join Inner
//另外这里面没有Filter
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Join Inner, Some(('a.age = 'b.age))
  'UnresolvedRelation [people], Some(a)
  'UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#4,age#5]
 Join Inner, Some((age#1 = age#5))
  Subquery a
   Subquery people
    LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
  Subquery b
   Subquery people
    LogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

//注意Optimized Logical Plan与Analyzed Logical Plan
//并没有进行特别的优化,突出这一点是为了比较后面的子查询
//其Analyzed和Optimized间的区别
== Optimized Logical Plan ==
Project [name#0,age#1,name#4,age#5]
 Join Inner, Some((age#1 = age#5))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ...

//查看其Physical Plan
scala> innerQuery.queryExecution.sparkPlan
res14: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#6,age#7]
 SortMergeJoin [age#1], [age#7]
  Scan PhysicalRDD[name#0,age#1]
  Scan PhysicalRDD[name#6,age#7]

5. 子查询运行流程

执行语句:

scala> val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19")
subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]

查看整体执行流程:


scala> subQuery.queryExecution
res4: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter ('a.age <= 19)
  'Subquery a
   'Project [unresolvedalias(*)]
    'Filter ('age >= 13)
     'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter (age#1 <= 19)
  Subquery a
   Project [name#0,age#1]
    Filter (age#1 >= 13)
     Subquery people
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

//这里需要注意Optimized与Analyzed间的区别
//Filter被进行了优化
== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

6. 聚合SQL运行流程

执行语句:

scala> val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19 group by a.name")
aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]

运行流程查看:


scala> aggregateQuery.queryExecution
res6: org.apache.spark.sql.SQLContext#QueryExecution =
//注意'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
//即group by a.name被 parsed为unresolvedalias('a.name)
== Parsed Logical Plan ==
'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
 'Filter ('a.age <= 19)
  'Subquery a
   'Project [unresolvedalias(*)]
    'Filter ('age >= 13)
     'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, _c1: bigint
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
 Filter (age#1 <= 19)
  Subquery a
   Project [name#0,age#1]
    Filter (age#1 >= 13)
     Subquery people
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  LogicalRDD [name#0,age#1], MapPartitions...

//查看其Physical Plan
scala> aggregateQuery.queryExecution.sparkPlan
res10: org.apache.spark.sql.execution.SparkPlan =
TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L])
 TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L])
  Filter ((age#1 >= 13) && (age#1 <= 19))
   Scan PhysicalRDD[name#0,age#1]

其它SQL语句,大家可以使用同样的方法查看其执行流程,以掌握Spark SQL背后实现的基本思想。

目录
相关文章
|
7天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
7天前
|
SQL 监控 安全
员工上网行为监控软件:SQL 在数据查询监控中的应用解析
在数字化办公环境中,员工上网行为监控软件对企业网络安全和管理至关重要。通过 SQL 查询和分析数据库中的数据,企业可以精准了解员工的上网行为,包括基础查询、复杂条件查询、数据统计与分析等,从而提高网络管理和安全防护的效率。
19 0
|
29天前
|
SQL 监控 数据库
SQL语句是否都需要解析及其相关技巧和方法
在数据库管理中,SQL(结构化查询语言)语句的使用无处不在,它们负责数据的查询、插入、更新和删除等操作
|
1月前
|
SQL 存储 数据库
SQL语句是否都需要解析及其相关技巧与方法
在数据库管理系统中,SQL(Structured Query Language)语句作为与数据库交互的桥梁,其执行过程往往涉及到一个或多个解析阶段
|
29天前
|
SQL 数据可视化 BI
SQL语句及查询结果解析:技巧与方法
在数据库管理和数据分析中,SQL语句扮演着至关重要的角色
|
1月前
|
SQL 监控 关系型数据库
SQL错误代码1303解析与处理方法
在SQL编程和数据库管理中,遇到错误代码是常有的事,其中错误代码1303在不同数据库系统中可能代表不同的含义
|
1月前
|
SQL 存储 关系型数据库
SQL默认索引是什么:深入解析与技巧
在SQL数据库中,索引是一种用于提高查询性能的重要数据结构
|
1月前
|
SQL 开发框架 .NET
ASP.NET连接SQL数据库:实现过程与关键细节解析an3.021-6232.com
随着互联网技术的快速发展,ASP.NET作为一种广泛使用的服务器端开发技术,其与数据库的交互操作成为了应用开发中的重要环节。本文将详细介绍在ASP.NET中如何连接SQL数据库,包括连接的基本概念、实现步骤、关键代码示例以及常见问题的解决方案。由于篇幅限制,本文不能保证达到完整的2000字,但会确保
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
42 0
|
6月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
245 0