Spark-SparkSQL深入学习系列一(转自OopsOutOfMemory)

简介:  /** Spark SQL源码分析系列文章*/     自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点:     1、整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里。

 /** Spark SQL源码分析系列文章*/

    自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点:

    1、整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里。这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql。
    2、效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里。

    前一段时间测试过Shark,并且对Spark SQL也进行了一些测试,但是还是忍不住对Spark SQL一探究竟,就从源代码的角度来看一下Spark SQL的核心执行流程吧。

一、引子

先来看一段简单的Spark SQL程序:
[java]  view plain  copy
  1. 1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
  2. 2import sqlContext._  
  3. 3.case class Person(name: String, age: Int)  
  4. 4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))  
  5. 5.people.registerAsTable("people")  
  6. 6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")  
  7. 7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println)  

程序前两句1和2生成SQLContext,导入sqlContext下面的all,也就是运行SparkSQL的上下文环境。
程序3,4两句是加载数据源注册table
第6句是真正的入口,是sql函数,传入一句sql,先会返回一个SchemaRDD。这一步是lazy的,直到第七句的collect这个action执行时,sql才会执行。


 二、SQLCOntext

SQLContext是执行SQL的上下文对象,首先来看一下它Hold的有哪些成员:

Catalog  

 一个存储<tableName,logicalPlan>的map结构,查找关系的目录,注册表,注销表,查询表和逻辑计划关系的类。


SqlParser 

 Parse 传入的sql来对语法分词,构建语法树,返回一个logical plan


Analyzer 

  logical plan的语法分析器


Optimizer 

 logical Plan的优化器


LogicalPlan 

逻辑计划,由catalyst的TreeNode组成,可以看到有3种语法树


SparkPlanner 

包含不同策略的优化策略来优化物理执行计划


QueryExecution 

sql执行的环境上下文


就是这些对象组成了Spark SQL的运行时,看起来很酷,有静态的metadata存储,有分析器、优化器、逻辑计划、物理计划、执行运行时。
那这些对象是怎么相互协作来执行sql语句的呢?

三、Spark SQL执行流程

话不多说,先上图,这个图我用一个在线作图工具process on话的,画的不好,图能达意就行:

      

核心组件都是绿色的方框,每一步流程的结果都是蓝色的框框,调用的方法是橙色的框框。

先概括一下,大致的执行流程是:
Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD

更具体的执行流程:

     sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 执行sql生成RDD


3.1、Parse SQL

 回到开始的程序,我们调用sql函数,其实是SQLContext里的sql函数它的实现是new一个SchemaRDD,在生成的时候就调用parseSql方法了。
[java]  view plain  copy
  1. /** 
  2. * Executes a SQL query using Spark, returning the result as a SchemaRDD. 
  3. * 
  4. * @group userf 
  5. */  
  6. def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))  
   结果是会生成一个逻辑计划
[java]  view plain  copy
  1.  @transient  
  2. protected[sql] val parser = new catalyst.SqlParser    
  3.   
  4.   
  5. protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)  

 3.2、Analyze to Execution

当我们调用SchemaRDD里面的collect方法时,则会初始化QueryExecution,开始启动执行。
[java]  view plain  copy
  1. override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()  
我们可以很清晰的看到执行步骤:

[java]  view plain  copy
  1. protected abstract class QueryExecution {  
  2.     def logical: LogicalPlan  
  3.   
  4.     lazy val analyzed = analyzer(logical)  //首先分析器会分析逻辑计划  
  5.     lazy val optimizedPlan = optimizer(analyzed) //随后优化器去优化分析后的逻辑计划  
  6.     // TODO: Don't just pick the first one...  
  7.     lazy val sparkPlan = planner(optimizedPlan).next() //根据策略生成plan物理计划  
  8.     // executedPlan should not be used to initialize any SparkPlan. It should be  
  9.     // only used for execution.  
  10.     lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //最后生成已经准备好的Spark Plan  
  11.   
  12.     /** Internal version of the RDD. Avoids copies and has no schema */  
  13.     lazy val toRdd: RDD[Row] = executedPlan.execute() //最后调用toRDD方法执行任务将结果转换为RDD  
  14.   
  15.     protected def stringOrError[A](f: => A): String =  
  16.       try f.toString catch { case e: Throwable => e.toString }  
  17.   
  18.     def simpleString: String = stringOrError(executedPlan)  
  19.   
  20.     override def toString: String =  
  21.       s"""== Logical Plan ==  
  22.          |${stringOrError(analyzed)}  
  23.          |== Optimized Logical Plan ==  
  24.          |${stringOrError(optimizedPlan)}  
  25.          |== Physical Plan ==  
  26.          |${stringOrError(executedPlan)}  
  27.       """.stripMargin.trim  
  28.   }  

至此整个流程结束。

  四、总结:

  通过分析SQLContext我们知道了Spark SQL都包含了哪些组件,SqlParser,Parser,Analyzer,Optimizer,LogicalPlan,SparkPlanner(包含Physical Plan),QueryExecution.
  通过调试代码,知道了Spark SQL的执行流程:
sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 执行sql生成RDD
  
  随后还会对里面的每个组件对象进行研究,看看catalyst究竟做了哪些优化。
目录
相关文章
|
5月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
101 5
|
5月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
74 3
|
5月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
103 0
|
2月前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
105 0
|
4月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
5月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
138 0
|
5月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
39 0
|
5月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
146 0
|
5月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
133 0
|
5月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
121 0