【spark系列5】spark 3.0.1集成delta 0.7.0原理解析--delta如何进行DDL DML操作以及Catalog plugin API

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 【spark系列5】spark 3.0.1集成delta 0.7.0原理解析--delta如何进行DDL DML操作以及Catalog plugin API

前提


本文基于 spark 3.0.1

delta 0.7.0

我们都知道delta.io是一个给数据湖提供可靠性的开源存储层的软件,关于他的用处,可以参考Delta Lake,让你从复杂的Lambda架构中解放出来,上篇文章我们分析了delta是如何自定义自己的sql,这篇文章我们分析一下delta数据是如何基于Catalog plugin API进行DDL DML sql操作的(spark 3.x以前是不支持的)


分析


delta在0.7.0以前是不能够进行save表操作的,只能存储到文件中,也就是说他的元数据是和spark的其他元数据是分开的,delta是独立存在的,也是不能和其他表进行关联操作的,只有到了delta 0.7.0版本以后,才真正意义上和spark进行了集成,这也得益于spark 3.x的Catalog plugin API 特性。

还是先从delta的configurate sparksession入手,如下:

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("...")
  .master("...")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()

对于第二个配置 config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

从spark configuration,我们可以看到对该spark.sql.catalog.spark_catalog的解释是

A catalog implementation that will be used as the v2 interface to Spark's built-in v1 catalog: spark_catalog. This catalog shares its identifier namespace with the spark_catalog and must be consistent with it; for example, if a table can be loaded by the spark_catalog, this catalog must also return the table metadata. To delegate operations to the spark_catalog, implementations can extend 'CatalogExtension'.

也就是说,通过该配置可以实现元数据的统一性,其实这也是spark社区和delta社区进行交互的一种结果


spark 3.x的Catalog plugin API


为了能搞懂delta为什么能够进行DDL和DML操作,就得先知道spark 3.x的Catalog plugin机制SPARK-31121.


首先是interface CatalogPlugin,该接口是catalog plugin的顶级接口,正如注释所说:

 * A marker interface to provide a catalog implementation for Spark.
 * <p>
 * Implementations can provide catalog functions by implementing additional interfaces for tables,
 * views, and functions.
 * <p>
 * Catalog implementations must implement this marker interface to be loaded by
 * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
 * required public no-arg constructor. After creating an instance, it will be configured by calling
 * {@link #initialize(String, CaseInsensitiveStringMap)}.
 * <p>
 * Catalog implementations are registered to a name by adding a configuration option to Spark:
 * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
 * in the Spark configuration that share the catalog name prefix,
 * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
 * string map of options in initialization with the prefix removed.
 * {@code name}, is also passed and is the catalog's name; in this case, "catalog-name".

可以通过spark.sql.catalog.catalog-name=com.example.YourCatalogClass集成到spark中

该类的实现还可以集成其他额外的tables views functions的接口,这里就得提到接口TableCatalog,该类提供了与tables相关的方法:

/**
   * List the tables in a namespace from the catalog.
   * <p>
   * If the catalog supports views, this must return identifiers for only tables and not views.
   *
   * @param namespace a multi-part namespace
   * @return an array of Identifiers for tables
   * @throws NoSuchNamespaceException If the namespace does not exist (optional).
   */
  Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;
  /**
   * Load table metadata by {@link Identifier identifier} from the catalog.
   * <p>
   * If the catalog supports views and contains a view for the identifier and not a table, this
   * must throw {@link NoSuchTableException}.
   *
   * @param ident a table identifier
   * @return the table's metadata
   * @throws NoSuchTableException If the table doesn't exist or is a view
   */
  Table loadTable(Identifier ident) throws NoSuchTableException;

这样就可以基于TableCatalog开发自己的catalog,从而实现multi-catalog support


还得有个接口DelegatingCatalogExtension,这是个实现了CatalogExtension接口的抽象类,而CatalogExtension继承了TableCatalog, SupportsNamespaces。DeltaCatalog实现了DelegatingCatalogExtension ,这部分后续进行分析。

最后还有一个class CatalogManager,这个类是用来管理CatalogPlugins的,且是线程安全的:

/**
 * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow
 * the caller to look up a catalog by name.
 *
 * There are still many commands (e.g. ANALYZE TABLE) that do not support v2 catalog API. They
 * ignore the current catalog and blindly go to the v1 `SessionCatalog`. To avoid tracking current
 * namespace in both `SessionCatalog` and `CatalogManger`, we let `CatalogManager` to set/get
 * current database of `SessionCatalog` when the current catalog is the session catalog.
 */
// TODO: all commands should look up table from the current catalog. The `SessionCatalog` doesn't
//       need to track current database at all.
private[sql]
class CatalogManager(
    conf: SQLConf,
    defaultSessionCatalog: CatalogPlugin,
    val v1SessionCatalog: SessionCatalog) extends Logging {

我们看到CatalogManager管理了v2版本的 CatalogPlugin和v1版本的sessionCatalog,这个是因为历史的原因导致必须得兼容v1版本


那CatalogManager在哪里被调用呢。

我们看一下BaseSessionStateBuilder ,可以看到该类中才是正宗使用CatalogManager的地方:

/**
   * Catalog for managing table and database states. If there is a pre-existing catalog, the state
   * of that catalog (temp tables & current database) will be copied into the new catalog.
   *
   * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields.
   */
  protected lazy val catalog: SessionCatalog = {
    val catalog = new SessionCatalog(
      () => session.sharedState.externalCatalog,
      () => session.sharedState.globalTempViewManager,
      functionRegistry,
      conf,
      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
      sqlParser,
      resourceLoader)
    parentState.foreach(_.catalog.copyStateTo(catalog))
    catalog
  }
  protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf)
  protected lazy val catalogManager = new CatalogManager(conf, v2SessionCatalog, catalog)

SessionCatalog 是v1版本的,主要是跟底层的元数据存储通信,以及管理临时视图,udf的,这一部分暂时不分析,重点放到v2版本的sessionCatalog,

我们看一下V2SessionCatalog:

/**
 * A [[TableCatalog]] that translates calls to the v1 SessionCatalog.
 */
class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
  extends TableCatalog with SupportsNamespaces {
  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
  import V2SessionCatalog._
  override val defaultNamespace: Array[String] = Array("default")
  override def name: String = CatalogManager.SESSION_CATALOG_NAME
  // This class is instantiated by Spark, so `initialize` method will not be called.
  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
  override def listTables(namespace: Array[String]): Array[Identifier] = {
    namespace match {
      case Array(db) =>
        catalog
          .listTables(db)
          .map(ident => Identifier.of(Array(ident.database.getOrElse("")), ident.table))
          .toArray
      case _ =>
        throw new NoSuchNamespaceException(namespace)
    }
  }

我们分析一下listTables方法可知,v2的sessionCatalog操作 都是委托给了v1版本的sessionCatalog去操作的,其他的方法也是一样,

而且name默认为CatalogManager.SESSION_CATALOG_NAME,也就是spark_catalog,这里后面也会提到,注意一下。

而且,catalogmanager在逻辑计划中的分析器和优化器中也会用到,因为会用到其中的元数据:

protected def analyzer: Analyzer = new Analyzer(catalogManager, conf) {
...
protected def optimizer: Optimizer = {
    new SparkOptimizer(catalogManager, catalog, experimentalMethods) {
      override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
        super.earlyScanPushDownRules ++ customEarlyScanPushDownRules
      override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
        super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
    }
  }

而analyzer和optimizer正是spark sql进行解析的核心中的核心,当然还有物理计划的生成。

那这些analyzer和optimizer是在哪里被调用呢?

我们举一个例子,DataSet中的filter方法就调用了:

 */
  def filter(conditionExpr: String): Dataset[T] = {
    filter(Column(sparkSession.sessionState.sqlParser.parseExpression(conditionExpr)))
  }

sessionState.sqlParser就是刚才所说的sqlParser:

protected lazy val sqlParser: ParserInterface = {
    extensions.buildParser(session, new SparkSqlParser(conf))
  }

只有整个逻辑 从sql解析到使用元数据的数据链路,我们就能大致知道怎么一回事了。

delta的DeltaCatalog


我们回过头来看看,delta的DeltaCatalog是怎么和spark 3.x进行结合的 ,上源码DeltaCatalog

class DeltaCatalog(val spark: SparkSession) extends DelegatingCatalogExtension
  with StagingTableCatalog
  with SupportsPathIdentifier {
  def this() = {
    this(SparkSession.active)
  }
  ...

就如之前所说的DeltaCatalog继承了DelegatingCatalogExtension,从名字可以看出这是一个委托类,那到底是怎么委托的呢以及委托给谁呢?

public abstract class DelegatingCatalogExtension implements CatalogExtension {
  private CatalogPlugin delegate;
  public final void setDelegateCatalog(CatalogPlugin delegate) {
    this.delegate = delegate;
  }

该类中有个setDelegateCatalog方法,该方法在CatalogManager中的loadV2SessionCatalog方法中被调用:

private def loadV2SessionCatalog(): CatalogPlugin = {
    Catalogs.load(SESSION_CATALOG_NAME, conf) match {
      case extension: CatalogExtension =>
        extension.setDelegateCatalog(defaultSessionCatalog)
        extension
      case other => other
    }
  }

而该方法被v2SessionCatalog调用:

private[sql] def v2SessionCatalog: CatalogPlugin = {
    conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { customV2SessionCatalog =>
      try {
        catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
      } catch {
        case NonFatal(_) =>
          logError(
            "Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog)
          defaultSessionCatalog
      }
    }.getOrElse(defaultSessionCatalog)
  }

这个就是返回默认的v2版本的SessionCatalog实例,分析一下这个方法:

   首先得到配置项SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION,也就是spark.sql.catalog.spark_catalog配置,
   如果spark配置了的话,就调用loadV2SessionCatalog加载该类,,否则就加载默认的v2SessionCatalog,也就是V2SessionCatalog实例

这里我们就发现了:

delta配置的spark.sql.catalog.spark_catalog为"org.apache.spark.sql.delta.catalog.DeltaCatalog",也就是说,spark中的V2SessionCatalog是DeltaCatalog的实例,而DeltaCatalog的委托给了BaseSessionStateBuilder中的V2SessionCatalog实例。


具体看看DeltaCatalog 的createTable方法,其他的方法类似:

override def createTable(
      ident: Identifier,
      schema: StructType,
      partitions: Array[Transform],
      properties: util.Map[String, String]): Table = {
    if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {
      createDeltaTable(
        ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)
    } else {
      super.createTable(ident, schema, partitions, properties)
    }
  }
...
private def createDeltaTable(
      ident: Identifier,
      schema: StructType,
      partitions: Array[Transform],
      properties: util.Map[String, String],
      sourceQuery: Option[LogicalPlan],
      operation: TableCreationModes.CreationMode): Table = {
     ...
    val tableDesc = new CatalogTable(
      identifier = TableIdentifier(ident.name(), ident.namespace().lastOption),
      tableType = tableType,
      storage = storage,
      schema = schema,
      provider = Some("delta"),
      partitionColumnNames = partitionColumns,
      bucketSpec = maybeBucketSpec,
      properties = tableProperties.toMap,
      comment = Option(properties.get("comment")))
    // END: copy-paste from the super method finished.
    val withDb = verifyTableAndSolidify(tableDesc, None)
    ParquetSchemaConverter.checkFieldNames(tableDesc.schema.fieldNames)
    CreateDeltaTableCommand(
      withDb,
      getExistingTableIfExists(tableDesc),
      operation.mode,
      sourceQuery,
      operation,
      tableByPath = isByPath).run(spark)
    loadTable(ident)
      }
 override def loadTable(ident: Identifier): Table = {
    try {
      super.loadTable(ident) match {
        case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) =>
          DeltaTableV2(
            spark,
            new Path(v1.catalogTable.location),
            catalogTable = Some(v1.catalogTable),
            tableIdentifier = Some(ident.toString))
        case o => o
      }
  }

判断是否是delta数据源,如果是的话,跳到createDeltaTable方法,否则直接调用super.createTable方法,

createDeltaTable先会进行delta特有的CreateDeltaTableCommand.run()命令写入delta数据,之后载loadTable

loadTable则会调用super的loadTable,而方法会调用V2SessionCatalog的loadTable,而V2SessionCatalog最终会调用v1版本sessionCatalog的getTableMetadata方法,从而组成V1Table(catalogTable)返回,这样就把delta的元数据信息持久化到了v1 SessionCatalog管理的元数据库中

如果不是delta数据源,则调用super.createTable方法,该方法调用V2SessionCatalog的createTable,而最终还是调用v1版本sessionCatalog的createTable方法

我们这里重点分析了delta数据源到元数据的存储,非delta数据源的代码就没有粘贴过来,有兴趣的自己可以编译源码跟踪一下


我们还得提一下spark.sql.defaultCatalog的默认配置为spark_catalog,也就是sql的默认catalog为spark_catalog,对应到delta的话就是DeltaCatalog。


至此,我们就把delta为什么能够进行DDL和DML的原理结合spark的Catalog plugin API分析了一遍.其实搞懂了这些以后,自己也可以按照DeltaCatalog的方式扩展catalog,只不过是catalog的名字不要为spark_catalog,否则会出现异常信息。如果非要为spark_catalog的话,就得继承DelegatingCatalogExtension类,把所有的元数据信息委托给V2SessionCatalog


目录
打赏
0
0
0
0
9
分享
相关文章
解析:HTTPS通过SSL/TLS证书加密的原理与逻辑
HTTPS通过SSL/TLS证书加密,结合对称与非对称加密及数字证书验证实现安全通信。首先,服务器发送含公钥的数字证书,客户端验证其合法性后生成随机数并用公钥加密发送给服务器,双方据此生成相同的对称密钥。后续通信使用对称加密确保高效性和安全性。同时,数字证书验证服务器身份,防止中间人攻击;哈希算法和数字签名确保数据完整性,防止篡改。整个流程保障了身份认证、数据加密和完整性保护。
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
194 7
深入解析图神经网络注意力机制:数学原理与可视化实现
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
Tiktokenizer 是一款现代分词工具,旨在高效、智能地将文本转换为机器可处理的离散单元(token)。它不仅超越了传统的空格分割和正则表达式匹配方法,还结合了上下文感知能力,适应复杂语言结构。Tiktokenizer 的核心特性包括自适应 token 分割、高效编码能力和出色的可扩展性,使其适用于从聊天机器人到大规模文本分析等多种应用场景。通过模块化设计,Tiktokenizer 确保了代码的可重用性和维护性,并在分词精度、处理效率和灵活性方面表现出色。此外,它支持多语言处理、表情符号识别和领域特定文本处理,能够应对各种复杂的文本输入需求。
74 6
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
36 1
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
本期内容为「ximagine」频道《显示器测试流程》的规范及标准,我们主要使用Calman、DisplayCAL、i1Profiler等软件及CA410、Spyder X、i1Pro 2等设备,是我们目前制作内容数据的重要来源,我们深知所做的仍是比较表面的活儿,和工程师、科研人员相比有着不小的差距,测试并不复杂,但是相当繁琐,收集整理测试无不花费大量时间精力,内容不完善或者有错误的地方,希望大佬指出我们好改进!
137 16
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
100 12
企业级API集成方案:基于阿里云函数计算调用DeepSeek全解析
DeepSeek R1 是一款先进的大规模深度学习模型,专为自然语言处理等复杂任务设计。它具备高效的架构、强大的泛化能力和优化的参数管理,适用于文本生成、智能问答、代码生成和数据分析等领域。阿里云平台提供了高性能计算资源、合规与数据安全、低延迟覆盖和成本效益等优势,支持用户便捷部署和调用 DeepSeek R1 模型,确保快速响应和稳定服务。通过阿里云百炼模型服务,用户可以轻松体验满血版 DeepSeek R1,并享受免费试用和灵活的API调用方式。
223 12
解锁鸿蒙装饰器:应用、原理与优势全解析
ArkTS提供了多维度的状态管理机制。在UI开发框架中,与UI相关联的数据可以在组件内使用,也可以在不同组件层级间传递,比如父子组件之间、爷孙组件之间,还可以在应用全局范围内传递或跨设备传递。
54 2
【实战解析】smallredbook.item_get_video API:小红书视频数据获取与电商应用指南
本文介绍小红书官方API——`smallredbook.item_get_video`的功能与使用方法。该接口可获取笔记视频详情,包括无水印直链、封面图、时长、文本描述、标签及互动数据等,并支持电商场景分析。调用需提供`key`、`secret`和`num_iid`参数,返回字段涵盖视频链接、标题、标签及用户信息等。同时,文章提供了电商实战技巧,如竞品监控与个性化推荐,并列出合规注意事项及替代方案对比。最后解答了常见问题,如笔记ID获取与视频链接时效性等。
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~

热门文章

最新文章

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等