【spark系列4】spark 3.0.1集成delta 0.7.0原理解析--delta自定义sql

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 【spark系列4】spark 3.0.1集成delta 0.7.0原理解析--delta自定义sql

前提


本文基于 spark 3.0.1

delta 0.7.0

我们都知道delta.io是一个给数据湖提供可靠性的开源存储层的软件,关于他的用处,可以参考Delta Lake,让你从复杂的Lambda架构中解放出来,于此类似的产品有hudi,Iceberg,因为delta无缝集成spark,所以我们来分析一下delta集成spark的内部原理以及框架,对于spark 3.x 与delta的集成是分两部分的,一部分是delta自定义的sql语法,另一部分是基于Catalog plugin API的DDL DML sql操作(spark 3.x以前是不支持的)

我们今天先分析第一部分 delta自定义的sql语法


自定义的DeltaDataSource


我们在用delta的时候,得指定delta特定的格式,如下:

val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
df.show()

那这个delta datasource是怎么集成到spark呢?我们来分析一下:

直接到DataStreamWriter,如下:

 val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
      val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
      val useV1Source = disabledSources.contains(cls.getCanonicalName) ||
        // file source v2 does not support streaming yet.
        classOf[FileDataSourceV2].isAssignableFrom(cls)

DataSource.lookupDataSource 方法是关键点。如下:

def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcDataSourceV2].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
        "org.apache.spark.sql.avro.AvroFileFormat"
      case name => name
    }
    val provider2 = s"$provider1.DefaultSource"
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

这里用到了ServiceLoader.load的方法,该是java的SPI,具体的细节可以网上查阅,我们说重点

直接找到ServiceLoader.LazyIterator部分

private class LazyIterator
        implements Iterator<S>
    {
        Class<S> service;
        ClassLoader loader;
        Enumeration<URL> configs = null;
        Iterator<String> pending = null;
        String nextName = null;
        private LazyIterator(Class<S> service, ClassLoader loader) {
            this.service = service;
            this.loader = loader;
        }
        private boolean hasNextService() {
            if (nextName != null) {
                return true;
            }
            if (configs == null) {
                try {
                    String fullName = PREFIX + service.getName();
                    if (loader == null)
                        configs = ClassLoader.getSystemResources(fullName);
                    else
                        configs = loader.getResources(fullName);
                } catch (IOException x) {
                    fail(service, "Error locating configuration files", x);
                }
            }

其中的loader.getResources方法,就是查找classpath下的特定文件,如果有多个就会返回多个,

对于spark来说,查找的是class DataSourceRegister,也就是META-INF/services/org.apache.spark.sql.sources.DataSourceRegister文件,实际上spark内部的datasource的实现,通过通过这种方式加载进来的


我们查看一下delta的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister文件为org.apache.spark.sql.delta.sources.DeltaDataSource,注意DeltaDatasource是基于Datasource v1进行开发的,

至此我们就知道了delta datasource和spark结合的大前提的实现


分析

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("...")
  .master("...")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()

我们可以看到 config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")

spark configuration,我们可以看到对该spark.sql.extensions的解释是

A comma-separated list of classes that implement Function1[SparkSessionExtensions, Unit] used to configure Spark Session extensions. The classes must have a no-args constructor. If multiple extensions are specified, they are applied in the specified order. For the case of rules and planner strategies, they are applied in the specified order. For the case of parsers, the last parser is used and each parser can delegate to its predecessor. For the case of function name conflicts, the last registered function name is used.

一句话就是用来对sparksession的扩展,可以对spark sql的逻辑计划进行扩展,且这个功能从spark 2.2.0就有了

看一下io.delta.sql.DeltaSparkSessionExtension类

class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectParser { (session, parser) =>
      new DeltaSqlParser(parser)
    }
    extensions.injectResolutionRule { session =>
      new DeltaAnalysis(session, session.sessionState.conf)
    }
    extensions.injectCheckRule { session =>
      new DeltaUnsupportedOperationsCheck(session)
    }
    extensions.injectPostHocResolutionRule { session =>
      new PreprocessTableUpdate(session.sessionState.conf)
    }
    extensions.injectPostHocResolutionRule { session =>
      new PreprocessTableMerge(session.sessionState.conf)
    }
    extensions.injectPostHocResolutionRule { session =>
      new PreprocessTableDelete(session.sessionState.conf)
    }
  }
}

DeltaSqlParser class就是delta对于自身语法的支持,那到底怎么支持以及支持什么呢?

我们看一下extensions.injectParser代码

 private[this] val parserBuilders = mutable.Buffer.empty[ParserBuilder]
  private[sql] def buildParser(
      session: SparkSession,
      initial: ParserInterface): ParserInterface = {
    parserBuilders.foldLeft(initial) { (parser, builder) =>
      builder(session, parser)
    }
  }
  /**
   * Inject a custom parser into the [[SparkSession]]. Note that the builder is passed a session
   * and an initial parser. The latter allows for a user to create a partial parser and to delegate
   * to the underlying parser for completeness. If a user injects more parsers, then the parsers
   * are stacked on top of each other.
   */
  def injectParser(builder: ParserBuilder): Unit = {
    parserBuilders += builder
  }

我们看到buildParser方法对我们传入的DeltaSqlParser进行了方法的初始化,也就是说DeltaSqlParser 的delegate变量被赋值为initial,

而该buildParser方法 被BaseSessionStateBuilder调用:、

 /**
   * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
   *
   * Note: this depends on the `conf` field.
   */
  protected lazy val sqlParser: ParserInterface = {
    extensions.buildParser(session, new SparkSqlParser(conf))
  }

所以说initial的实参是SparkSqlParser,也就是SparkSqlParser成了DeltaSqlParser代理,我们再看看DeltaSqlParser的方法:

override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    builder.visit(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ => delegate.parsePlan(sqlText)
    }
  }

这里涉及到了antlr4的语法,也就是说对于逻辑计划的解析,如自身DeltaSqlParser能够解析,就进行解析,不能的话就委托给SparkSqlParser进行解析,而解析是该类DeltaSqlAstBuilder的功能:

class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
  /**
   * Create a [[VacuumTableCommand]] logical plan. Example SQL:
   * {{{
   *   VACUUM ('/path/to/dir' | delta.`/path/to/dir`) [RETAIN number HOURS] [DRY RUN];
   * }}}
   */
  override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) {
    VacuumTableCommand(
      Option(ctx.path).map(string),
      Option(ctx.table).map(visitTableIdentifier),
      Option(ctx.number).map(_.getText.toDouble),
      ctx.RUN != null)
  }
  override def visitDescribeDeltaDetail(
      ctx: DescribeDeltaDetailContext): LogicalPlan = withOrigin(ctx) {
    DescribeDeltaDetailCommand(
      Option(ctx.path).map(string),
      Option(ctx.table).map(visitTableIdentifier))
  }
  override def visitDescribeDeltaHistory(
      ctx: DescribeDeltaHistoryContext): LogicalPlan = withOrigin(ctx) {
    DescribeDeltaHistoryCommand(
      Option(ctx.path).map(string),
      Option(ctx.table).map(visitTableIdentifier),
      Option(ctx.limit).map(_.getText.toInt))
  }
  override def visitGenerate(ctx: GenerateContext): LogicalPlan = withOrigin(ctx) {
    DeltaGenerateCommand(
      modeName = ctx.modeName.getText,
      tableId = visitTableIdentifier(ctx.table))
  }
  override def visitConvert(ctx: ConvertContext): LogicalPlan = withOrigin(ctx) {
    ConvertToDeltaCommand(
      visitTableIdentifier(ctx.table),
      Option(ctx.colTypeList).map(colTypeList => StructType(visitColTypeList(colTypeList))),
      None)
  }
  override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
    visit(ctx.statement).asInstanceOf[LogicalPlan]
  }
  protected def visitTableIdentifier(ctx: QualifiedNameContext): TableIdentifier = withOrigin(ctx) {
    ctx.identifier.asScala match {
      case Seq(tbl) => TableIdentifier(tbl.getText)
      case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText))
      case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", ctx)
    }
  }
  override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null
}

那这些方法比如visitVacuumTable,visitDescribeDeltaDetail是从哪里来的呢?

咱们看看DeltaSqlBase.g4:

singleStatement
    : statement EOF
    ;
// If you add keywords here that should not be reserved, add them to 'nonReserved' list.
statement
    : VACUUM (path=STRING | table=qualifiedName)
        (RETAIN number HOURS)? (DRY RUN)?                               #vacuumTable
    | (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName)      #describeDeltaDetail
    | GENERATE modeName=identifier FOR TABLE table=qualifiedName        #generate
    | (DESC | DESCRIBE) HISTORY (path=STRING | table=qualifiedName)
        (LIMIT limit=INTEGER_VALUE)?                                    #describeDeltaHistory
    | CONVERT TO DELTA table=qualifiedName
        (PARTITIONED BY '(' colTypeList ')')?                           #convert
    | .*?                                                               #passThrough
    ;

这里涉及到的antlr4语法,不会的可以自行网上查阅。注意一下spark 和delta用到的都是visit的模式。

再来对于一下delta官网提供的操作 :

Vacuum
Describe History
Describe Detail
Generate
Convert to Delta
Convert Delta table to a Parquet table

这样就能对应上了,如Vacuum操作对应vacuumTable,Convert to Delta对应 convert.

其实delta支持拓展了spark,我们也可按照delta的方式,对spark进行扩展,从而实现自己的sql语法

相关文章
|
17天前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
59 13
|
2月前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
70 1
|
2月前
|
机器学习/深度学习 Python
堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能
本文深入探讨了堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能。文章详细介绍了堆叠的实现步骤,包括数据准备、基础模型训练、新训练集构建及元学习器训练,并讨论了其优缺点。
65 3
|
10天前
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
2天前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
29 14
|
11天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
59 1
|
2月前
|
运维 持续交付 虚拟化
深入解析Docker容器化技术的核心原理
深入解析Docker容器化技术的核心原理
52 1
|
2月前
|
存储 供应链 算法
深入解析区块链技术的核心原理与应用前景
深入解析区块链技术的核心原理与应用前景
60 0
|
2月前
|
JavaScript 前端开发 API
Vue.js响应式原理深度解析:从Vue 2到Vue 3的演进
Vue.js响应式原理深度解析:从Vue 2到Vue 3的演进
65 0
|
2月前
|
API 持续交付 网络架构
深入解析微服务架构:原理、优势与实践
深入解析微服务架构:原理、优势与实践
43 0

推荐镜像

更多