Spark修炼之道(高级篇)——Spark源码阅读:第十三节 Spark SQL之SQLContext(一)

简介: 作者:周志湖1. SQLContext的创建SQLContext是Spark SQL进行结构化数据处理的入口,可以通过它进行DataFrame的创建及SQL的执行,其创建方式如下://sc为SparkContextval sqlContext = new org.apache.spark.sql.SQLContext(sc)其对应的源码为:def

作者:周志湖

1. SQLContext的创建

SQLContext是Spark SQL进行结构化数据处理的入口,可以通过它进行DataFrame的创建及SQL的执行,其创建方式如下:

//sc为SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

其对应的源码为:

def this(sparkContext: SparkContext) = {
    this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true)
  }

其调用的是私有的主构造函数:

//1.主构造器中的参数CacheManager用于缓存查询结果
//在进行后续查询时会自动读取缓存中的数据
//2.SQLListener用于监听Spark scheduler事件,它继承自SparkListener
//3.isRootContext表示是否是根SQLContext
class SQLContext private[sql](
    @transient val sparkContext: SparkContext,
    @transient protected[sql] val cacheManager: CacheManager,
    @transient private[sql] val listener: SQLListener,
    val isRootContext: Boolean)
  extends org.apache.spark.Logging with Serializable {

当spark.sql.allowMultipleContexts设置为true时,则允许创建多个SQLContexts/HiveContexts,创建方法为newSession

def newSession(): SQLContext = {
    new SQLContext(
      sparkContext = sparkContext,
      cacheManager = cacheManager,
      listener = listener,
      isRootContext = false)
  }

其isRootContext 被设置为false,否则会抛出异常,因为root SQLContext只能有一个,其它SQLContext与root SQLContext共享SparkContext, CacheManager, SQLListener。如果spark.sql.allowMultipleContexts为false,则只允许一个SQLContext存在

2. 核心成员变量 ——catalog

 protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)

catalog用于注销表、注销表、判断表是否存在等,例如当DataFrame调用registerTempTable 方法时

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

会sqlContext的registerDataFrameAsTable方法

def registerTempTable(tableName: String): Unit = {
    sqlContext.registerDataFrameAsTable(this, tableName)
  }

sqlContext.registerDataFrameAsTable实质上调用的就是catalog的registerTable 方法:

private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
    catalog.registerTable(TableIdentifier(tableName), df.logicalPlan)
  }

SimpleCatalog整体源码如下:

class SimpleCatalog(val conf: CatalystConf) extends Catalog {
  private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]

  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
    tables.put(getTableName(tableIdent), plan)
  }

  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
    tables.remove(getTableName(tableIdent))
  }

  override def unregisterAllTables(): Unit = {
    tables.clear()
  }

  override def tableExists(tableIdent: TableIdentifier): Boolean = {
    tables.containsKey(getTableName(tableIdent))
  }

  override def lookupRelation(
      tableIdent: TableIdentifier,
      alias: Option[String] = None): LogicalPlan = {
    val tableName = getTableName(tableIdent)
    val table = tables.get(tableName)
    if (table == null) {
      throw new NoSuchTableException
    }
    val tableWithQualifiers = Subquery(tableName, table)

    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
    // properly qualified with this alias.
    alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
  }

  override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
    tables.keySet().asScala.map(_ -> true).toSeq
  }

  override def refreshTable(tableIdent: TableIdentifier): Unit = {
    throw new UnsupportedOperationException
  }
}

3. 核心成员变量 ——sqlParser

sqlParser在SQLContext的定义:

protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))

SparkSQLParser为顶级的Spark SQL解析器,对Spark SQL支持的SQL语法进行解析,其定义如下:

private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser

fallback函数用于解析其它非Spark SQL Dialect的语法。
Spark SQL Dialect支持的关键字包括:

protected val AS = Keyword("AS")
  protected val CACHE = Keyword("CACHE")
  protected val CLEAR = Keyword("CLEAR")
  protected val DESCRIBE = Keyword("DESCRIBE")
  protected val EXTENDED = Keyword("EXTENDED")
  protected val FUNCTION = Keyword("FUNCTION")
  protected val FUNCTIONS = Keyword("FUNCTIONS")
  protected val IN = Keyword("IN")
  protected val LAZY = Keyword("LAZY")
  protected val SET = Keyword("SET")
  protected val SHOW = Keyword("SHOW")
  protected val TABLE = Keyword("TABLE")
  protected val TABLES = Keyword("TABLES")
  protected val UNCACHE = Keyword("UNCACHE")

4. 核心成员变量 ——ddlParser

用于解析DDL(Data Definition Language 数据定义语言)

 protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

其支持的关键字有:

  protected val CREATE = Keyword("CREATE")
  protected val TEMPORARY = Keyword("TEMPORARY")
  protected val TABLE = Keyword("TABLE")
  protected val IF = Keyword("IF")
  protected val NOT = Keyword("NOT")
  protected val EXISTS = Keyword("EXISTS")
  protected val USING = Keyword("USING")
  protected val OPTIONS = Keyword("OPTIONS")
  protected val DESCRIBE = Keyword("DESCRIBE")
  protected val EXTENDED = Keyword("EXTENDED")
  protected val AS = Keyword("AS")
  protected val COMMENT = Keyword("COMMENT")
  protected val REFRESH = Keyword("REFRESH")

主要做三件事,分别是创建表、描述表和更新表

protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable

createTable方法具有如下(具体功能参考注释说明):

/**
   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
   * USING org.apache.spark.sql.avro
   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
   * or
   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
   * USING org.apache.spark.sql.avro
   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
   * or
   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
   * USING org.apache.spark.sql.avro
   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
   * AS SELECT ...
   */
  protected lazy val createTable: Parser[LogicalPlan] = {
    // TODO: Support database.table.
    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ tableIdentifier ~
      tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
      case temp ~ allowExisting ~ tableIdent ~ columns ~ provider ~ opts ~ query =>
        if (temp.isDefined && allowExisting.isDefined) {
          throw new DDLException(
            "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
        }

        val options = opts.getOrElse(Map.empty[String, String])
        if (query.isDefined) {
          if (columns.isDefined) {
            throw new DDLException(
              "a CREATE TABLE AS SELECT statement does not allow column definitions.")
          }
          // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
          val mode = if (allowExisting.isDefined) {
            SaveMode.Ignore
          } else if (temp.isDefined) {
            SaveMode.Overwrite
          } else {
            SaveMode.ErrorIfExists
          }

          val queryPlan = parseQuery(query.get)
          CreateTableUsingAsSelect(tableIdent,
            provider,
            temp.isDefined,
            Array.empty[String],
            mode,
            options,
            queryPlan)
        } else {
          val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
          CreateTableUsing(
            tableIdent,
            userSpecifiedSchema,
            provider,
            temp.isDefined,
            options,
            allowExisting.isDefined,
            managedIfNoPath = false)
        }
    }
  }

describeTable及refreshTable代码如下:

 /*
   * describe [extended] table avroTable
   * This will display all columns of table `avroTable` includes column_name,column_type,comment
   */
  protected lazy val describeTable: Parser[LogicalPlan] =
    (DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {
      case e ~ tableIdent =>
        DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)
    }

  protected lazy val refreshTable: Parser[LogicalPlan] =
    REFRESH ~> TABLE ~> tableIdentifier ^^ {
      case tableIndet =>
        RefreshTable(tableIndet)
    }
目录
相关文章
|
1月前
|
SQL 存储 分布式计算
|
2月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之怎么编写和执行Spark SQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
SQL JSON 分布式计算
|
3月前
|
SQL 分布式计算 Java
|
SQL 分布式计算 数据安全/隐私保护
Spark 官网阅读笔记
1.spark读取本地文件系统: 则该文件也必须可以在工作节点上的相同路径上访问。所以需要将文件复制到所有work 节点或使用网络安装的共享文件系统。
1602 0
|
2月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
110 1
Spark快速大数据分析PDF下载读书分享推荐
|
1月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
123 3
|
13天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
27 3
|
17天前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
35 3
|
22天前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。