【spark系列4】spark 3.0.1集成delta 0.7.0原理解析--delta自定义sql

本文涉及的产品
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: 【spark系列4】spark 3.0.1集成delta 0.7.0原理解析--delta自定义sql

前提


本文基于 spark 3.0.1

delta 0.7.0

我们都知道delta.io是一个给数据湖提供可靠性的开源存储层的软件,关于他的用处,可以参考Delta Lake,让你从复杂的Lambda架构中解放出来,于此类似的产品有hudi,Iceberg,因为delta无缝集成spark,所以我们来分析一下delta集成spark的内部原理以及框架,对于spark 3.x 与delta的集成是分两部分的,一部分是delta自定义的sql语法,另一部分是基于Catalog plugin API的DDL DML sql操作(spark 3.x以前是不支持的)

我们今天先分析第一部分 delta自定义的sql语法


自定义的DeltaDataSource


我们在用delta的时候,得指定delta特定的格式,如下:

val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
df.show()

那这个delta datasource是怎么集成到spark呢?我们来分析一下:

直接到DataStreamWriter,如下:

 val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
      val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
      val useV1Source = disabledSources.contains(cls.getCanonicalName) ||
        // file source v2 does not support streaming yet.
        classOf[FileDataSourceV2].isAssignableFrom(cls)

DataSource.lookupDataSource 方法是关键点。如下:

def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcDataSourceV2].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
        "org.apache.spark.sql.avro.AvroFileFormat"
      case name => name
    }
    val provider2 = s"$provider1.DefaultSource"
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

这里用到了ServiceLoader.load的方法,该是java的SPI,具体的细节可以网上查阅,我们说重点

直接找到ServiceLoader.LazyIterator部分

private class LazyIterator
        implements Iterator<S>
    {
        Class<S> service;
        ClassLoader loader;
        Enumeration<URL> configs = null;
        Iterator<String> pending = null;
        String nextName = null;
        private LazyIterator(Class<S> service, ClassLoader loader) {
            this.service = service;
            this.loader = loader;
        }
        private boolean hasNextService() {
            if (nextName != null) {
                return true;
            }
            if (configs == null) {
                try {
                    String fullName = PREFIX + service.getName();
                    if (loader == null)
                        configs = ClassLoader.getSystemResources(fullName);
                    else
                        configs = loader.getResources(fullName);
                } catch (IOException x) {
                    fail(service, "Error locating configuration files", x);
                }
            }

其中的loader.getResources方法,就是查找classpath下的特定文件,如果有多个就会返回多个,

对于spark来说,查找的是class DataSourceRegister,也就是META-INF/services/org.apache.spark.sql.sources.DataSourceRegister文件,实际上spark内部的datasource的实现,通过通过这种方式加载进来的


我们查看一下delta的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister文件为org.apache.spark.sql.delta.sources.DeltaDataSource,注意DeltaDatasource是基于Datasource v1进行开发的,

至此我们就知道了delta datasource和spark结合的大前提的实现


分析

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("...")
  .master("...")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()

我们可以看到 config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")

spark configuration,我们可以看到对该spark.sql.extensions的解释是

A comma-separated list of classes that implement Function1[SparkSessionExtensions, Unit] used to configure Spark Session extensions. The classes must have a no-args constructor. If multiple extensions are specified, they are applied in the specified order. For the case of rules and planner strategies, they are applied in the specified order. For the case of parsers, the last parser is used and each parser can delegate to its predecessor. For the case of function name conflicts, the last registered function name is used.

一句话就是用来对sparksession的扩展,可以对spark sql的逻辑计划进行扩展,且这个功能从spark 2.2.0就有了

看一下io.delta.sql.DeltaSparkSessionExtension类

class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectParser { (session, parser) =>
      new DeltaSqlParser(parser)
    }
    extensions.injectResolutionRule { session =>
      new DeltaAnalysis(session, session.sessionState.conf)
    }
    extensions.injectCheckRule { session =>
      new DeltaUnsupportedOperationsCheck(session)
    }
    extensions.injectPostHocResolutionRule { session =>
      new PreprocessTableUpdate(session.sessionState.conf)
    }
    extensions.injectPostHocResolutionRule { session =>
      new PreprocessTableMerge(session.sessionState.conf)
    }
    extensions.injectPostHocResolutionRule { session =>
      new PreprocessTableDelete(session.sessionState.conf)
    }
  }
}

DeltaSqlParser class就是delta对于自身语法的支持,那到底怎么支持以及支持什么呢?

我们看一下extensions.injectParser代码

 private[this] val parserBuilders = mutable.Buffer.empty[ParserBuilder]
  private[sql] def buildParser(
      session: SparkSession,
      initial: ParserInterface): ParserInterface = {
    parserBuilders.foldLeft(initial) { (parser, builder) =>
      builder(session, parser)
    }
  }
  /**
   * Inject a custom parser into the [[SparkSession]]. Note that the builder is passed a session
   * and an initial parser. The latter allows for a user to create a partial parser and to delegate
   * to the underlying parser for completeness. If a user injects more parsers, then the parsers
   * are stacked on top of each other.
   */
  def injectParser(builder: ParserBuilder): Unit = {
    parserBuilders += builder
  }

我们看到buildParser方法对我们传入的DeltaSqlParser进行了方法的初始化,也就是说DeltaSqlParser 的delegate变量被赋值为initial,

而该buildParser方法 被BaseSessionStateBuilder调用:、

 /**
   * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
   *
   * Note: this depends on the `conf` field.
   */
  protected lazy val sqlParser: ParserInterface = {
    extensions.buildParser(session, new SparkSqlParser(conf))
  }

所以说initial的实参是SparkSqlParser,也就是SparkSqlParser成了DeltaSqlParser代理,我们再看看DeltaSqlParser的方法:

override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    builder.visit(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ => delegate.parsePlan(sqlText)
    }
  }

这里涉及到了antlr4的语法,也就是说对于逻辑计划的解析,如自身DeltaSqlParser能够解析,就进行解析,不能的话就委托给SparkSqlParser进行解析,而解析是该类DeltaSqlAstBuilder的功能:

class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
  /**
   * Create a [[VacuumTableCommand]] logical plan. Example SQL:
   * {{{
   *   VACUUM ('/path/to/dir' | delta.`/path/to/dir`) [RETAIN number HOURS] [DRY RUN];
   * }}}
   */
  override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) {
    VacuumTableCommand(
      Option(ctx.path).map(string),
      Option(ctx.table).map(visitTableIdentifier),
      Option(ctx.number).map(_.getText.toDouble),
      ctx.RUN != null)
  }
  override def visitDescribeDeltaDetail(
      ctx: DescribeDeltaDetailContext): LogicalPlan = withOrigin(ctx) {
    DescribeDeltaDetailCommand(
      Option(ctx.path).map(string),
      Option(ctx.table).map(visitTableIdentifier))
  }
  override def visitDescribeDeltaHistory(
      ctx: DescribeDeltaHistoryContext): LogicalPlan = withOrigin(ctx) {
    DescribeDeltaHistoryCommand(
      Option(ctx.path).map(string),
      Option(ctx.table).map(visitTableIdentifier),
      Option(ctx.limit).map(_.getText.toInt))
  }
  override def visitGenerate(ctx: GenerateContext): LogicalPlan = withOrigin(ctx) {
    DeltaGenerateCommand(
      modeName = ctx.modeName.getText,
      tableId = visitTableIdentifier(ctx.table))
  }
  override def visitConvert(ctx: ConvertContext): LogicalPlan = withOrigin(ctx) {
    ConvertToDeltaCommand(
      visitTableIdentifier(ctx.table),
      Option(ctx.colTypeList).map(colTypeList => StructType(visitColTypeList(colTypeList))),
      None)
  }
  override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
    visit(ctx.statement).asInstanceOf[LogicalPlan]
  }
  protected def visitTableIdentifier(ctx: QualifiedNameContext): TableIdentifier = withOrigin(ctx) {
    ctx.identifier.asScala match {
      case Seq(tbl) => TableIdentifier(tbl.getText)
      case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText))
      case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", ctx)
    }
  }
  override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null
}

那这些方法比如visitVacuumTable,visitDescribeDeltaDetail是从哪里来的呢?

咱们看看DeltaSqlBase.g4:

singleStatement
    : statement EOF
    ;
// If you add keywords here that should not be reserved, add them to 'nonReserved' list.
statement
    : VACUUM (path=STRING | table=qualifiedName)
        (RETAIN number HOURS)? (DRY RUN)?                               #vacuumTable
    | (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName)      #describeDeltaDetail
    | GENERATE modeName=identifier FOR TABLE table=qualifiedName        #generate
    | (DESC | DESCRIBE) HISTORY (path=STRING | table=qualifiedName)
        (LIMIT limit=INTEGER_VALUE)?                                    #describeDeltaHistory
    | CONVERT TO DELTA table=qualifiedName
        (PARTITIONED BY '(' colTypeList ')')?                           #convert
    | .*?                                                               #passThrough
    ;

这里涉及到的antlr4语法,不会的可以自行网上查阅。注意一下spark 和delta用到的都是visit的模式。

再来对于一下delta官网提供的操作 :

Vacuum
Describe History
Describe Detail
Generate
Convert to Delta
Convert Delta table to a Parquet table

这样就能对应上了,如Vacuum操作对应vacuumTable,Convert to Delta对应 convert.

其实delta支持拓展了spark,我们也可按照delta的方式,对spark进行扩展,从而实现自己的sql语法

相关文章
|
2月前
|
安全 算法 网络协议
解析:HTTPS通过SSL/TLS证书加密的原理与逻辑
HTTPS通过SSL/TLS证书加密,结合对称与非对称加密及数字证书验证实现安全通信。首先,服务器发送含公钥的数字证书,客户端验证其合法性后生成随机数并用公钥加密发送给服务器,双方据此生成相同的对称密钥。后续通信使用对称加密确保高效性和安全性。同时,数字证书验证服务器身份,防止中间人攻击;哈希算法和数字签名确保数据完整性,防止篡改。整个流程保障了身份认证、数据加密和完整性保护。
|
15天前
|
SQL 人工智能 自然语言处理
Text2SQL圣经:从0到1精通Text2Sql(Chat2Sql)的原理,以及Text2Sql开源项目的使用
Text2SQL圣经:从0到1精通Text2Sql(Chat2Sql)的原理,以及Text2Sql开源项目的使用
Text2SQL圣经:从0到1精通Text2Sql(Chat2Sql)的原理,以及Text2Sql开源项目的使用
|
1月前
|
机器学习/深度学习 数据可视化 PyTorch
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
241 7
深入解析图神经网络注意力机制:数学原理与可视化实现
|
1月前
|
机器学习/深度学习 缓存 自然语言处理
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
Tiktokenizer 是一款现代分词工具,旨在高效、智能地将文本转换为机器可处理的离散单元(token)。它不仅超越了传统的空格分割和正则表达式匹配方法,还结合了上下文感知能力,适应复杂语言结构。Tiktokenizer 的核心特性包括自适应 token 分割、高效编码能力和出色的可扩展性,使其适用于从聊天机器人到大规模文本分析等多种应用场景。通过模块化设计,Tiktokenizer 确保了代码的可重用性和维护性,并在分词精度、处理效率和灵活性方面表现出色。此外,它支持多语言处理、表情符号识别和领域特定文本处理,能够应对各种复杂的文本输入需求。
144 6
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
|
23天前
|
传感器 人工智能 监控
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
86 2
|
2月前
|
编解码 缓存 Prometheus
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
本期内容为「ximagine」频道《显示器测试流程》的规范及标准,我们主要使用Calman、DisplayCAL、i1Profiler等软件及CA410、Spyder X、i1Pro 2等设备,是我们目前制作内容数据的重要来源,我们深知所做的仍是比较表面的活儿,和工程师、科研人员相比有着不小的差距,测试并不复杂,但是相当繁琐,收集整理测试无不花费大量时间精力,内容不完善或者有错误的地方,希望大佬指出我们好改进!
172 16
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
|
1月前
|
Web App开发 移动开发 前端开发
React音频播放器样式自定义全解析:从入门到避坑指南
在React中使用HTML5原生&lt;audio&gt;标签时,开发者常面临视觉一致性缺失、样式定制局限和交互体验割裂等问题。通过隐藏原生控件并构建自定义UI层,可以实现完全可控的播放器视觉风格,避免状态不同步等典型问题。结合事件监听、进度条拖拽、浏览器兼容性处理及性能优化技巧,可构建高性能、可维护的音频组件,满足跨平台需求。建议优先使用成熟音频库(如react-player),仅在深度定制需求时采用原生方案。
67 12
|
2月前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
123 12
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
企业级API集成方案:基于阿里云函数计算调用DeepSeek全解析
DeepSeek R1 是一款先进的大规模深度学习模型,专为自然语言处理等复杂任务设计。它具备高效的架构、强大的泛化能力和优化的参数管理,适用于文本生成、智能问答、代码生成和数据分析等领域。阿里云平台提供了高性能计算资源、合规与数据安全、低延迟覆盖和成本效益等优势,支持用户便捷部署和调用 DeepSeek R1 模型,确保快速响应和稳定服务。通过阿里云百炼模型服务,用户可以轻松体验满血版 DeepSeek R1,并享受免费试用和灵活的API调用方式。
257 12
|
2月前
|
开发框架 监控 JavaScript
解锁鸿蒙装饰器:应用、原理与优势全解析
ArkTS提供了多维度的状态管理机制。在UI开发框架中,与UI相关联的数据可以在组件内使用,也可以在不同组件层级间传递,比如父子组件之间、爷孙组件之间,还可以在应用全局范围内传递或跨设备传递。
73 2

热门文章

最新文章

推荐镜像

更多