Spark修炼之道(高级篇)——Spark源码阅读:第十二节 Spark SQL 处理流程分析

简介: 作者:周志湖下面的代码演示了通过Case Class进行表Schema定义的例子:// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a Da

作者:周志湖

下面的代码演示了通过Case Class进行表Schema定义的例子:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

(1)sql方法返回DataFrame

  def sql(sqlText: String): DataFrame = {
    DataFrame(this, parseSql(sqlText))
  }

其中parseSql(sqlText)方法生成相应的LogicalPlan得到,该方法源码如下:

//根据传入的sql语句,生成LogicalPlan
protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)

ddlParser对象定义如下:

protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

(2)然后调用DataFrame的apply方法

private[sql] object DataFrame {
  def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
    new DataFrame(sqlContext, logicalPlan)
  }
}

可以看到,apply方法参数有两个,分别是SQLContext和LogicalPlan,调用的是DataFrame的构造方法,具体源码如下:

//DataFrame构造方法,该构造方法会自动对LogicalPlan进行分析,然后返回QueryExecution对象
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = {
    this(sqlContext, {
      val qe = sqlContext.executePlan(logicalPlan)
      //判断是否已经创建,如果是则抛异常
      if (sqlContext.conf.dataFrameEagerAnalysis) {
        qe.assertAnalyzed()  // This should force analysis and throw errors if there are any
      }
      qe
    })
  }

(3)val qe = sqlContext.executePlan(logicalPlan) 返回QueryExecution, sqlContext.executePlan方法源码如下:

protected[sql] def executePlan(plan: LogicalPlan) =
    new sparkexecution.QueryExecution(this, plan)

QueryExecution类中表达了Spark执行SQL的主要工作流程,具体如下

class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {

  @VisibleForTesting
  def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed)

  lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)

  lazy val withCachedData: LogicalPlan = {
    assertAnalyzed()
    sqlContext.cacheManager.useCachedData(analyzed)
  }

  lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)

  // TODO: Don't just pick the first one...
  lazy val sparkPlan: SparkPlan = {
    SparkPlan.currentContext.set(sqlContext)
    sqlContext.planner.plan(optimizedPlan).next()
  }

  // executedPlan should not be used to initialize any SparkPlan. It should be
  // only used for execution.
  lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)

  /** Internal version of the RDD. Avoids copies and has no schema */
  //调用toRDD方法执行任务将结果转换为RDD
  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

  protected def stringOrError[A](f: => A): String =
    try f.toString catch { case e: Throwable => e.toString }

  def simpleString: String = {
    s"""== Physical Plan ==
       |${stringOrError(executedPlan)}
      """.stripMargin.trim
  }

  override def toString: String = {
    def output =
      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")

    s"""== Parsed Logical Plan ==
       |${stringOrError(logical)}
       |== Analyzed Logical Plan ==
       |${stringOrError(output)}
       |${stringOrError(analyzed)}
       |== Optimized Logical Plan ==
       |${stringOrError(optimizedPlan)}
       |== Physical Plan ==
       |${stringOrError(executedPlan)}
       |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
    """.stripMargin.trim
  }
}

可以看到,SQL的执行流程为
1.Parsed Logical Plan:LogicalPlan
2.Analyzed Logical Plan:
lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
3.Optimized Logical Plan:lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)
4. Physical Plan:lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)

可以调用results.queryExecution方法查看,代码如下:

scala> results.queryExecution
res1: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias('name)]
 'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string
Project [name#0]
 Subquery people
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at createDataFrame at <console>:47

== Optimized Logical Plan ==
Project [name#0]
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at createDataFrame at <console>:47

== Physical Plan ==
TungstenProject [name#0]
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

(4) 然后调用DataFrame的主构造器完成DataFrame的构造

class DataFrame private[sql](
    @transient val sqlContext: SQLContext,
    @DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable 

(5)
当调用DataFrame的collect等方法时,便会触发执行executedPlan

  def collect(): Array[Row] = withNewExecutionId {
    queryExecution.executedPlan.executeCollect()
  }

例如:

scala> results.collect
res6: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])

整体流程图如下:
这里写图片描述

目录
相关文章
|
12天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
40 0
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
45 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
79 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
54 0
|
21天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
56 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
61 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
88 0