SPARK中 DS V2 push down(下推)的一些说明

简介: SPARK中 DS V2 push down(下推)的一些说明

背景


本文基于 SPARK 3.3.0

在之前的文章 SPARK中的FileSourceStrategy,DataSourceStrategy以及DataSourceV2Strategy规则 我们有提到 DS V2 push down的功能,如JDBC 复杂下推,以及Parquet的聚合下推等等。其实这里面有个比较大的背景–就是TableCatalog类。


结论


先说结论,这些聚合下推的大前提是,在spark中已经配置了对应的catalog,如下:

spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog

分析


在Rule V2ScanRelationPushDown一系列的规则中,第一个规则createScanBuilder:

private def createScanBuilder(plan: LogicalPlan) = plan.transform {
    case r: DataSourceV2Relation =>
      ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options))
  }

只有是DataSourceV2Relation类型,也就是 DS v2,才会转换为 ScanBuilderHolder, 而后续的pushDownFilters,pushDownAggregates规则则是基于ScanBuilderHolder来做转换的(如果有遇到ScanBuilderHolder类型才会进行DS v2特有的规则转换),所以DataSourceV2Relation是从哪里来的是关键。

直接说重点:

在RULE ResolveRelations中会进行 UnresolvedRelation到DataSourceV2Relation或是UnresolvedCatalogRelation的转换:

object ResolveRelations extends Rule[LogicalPlan] {
  ...
def apply(plan: LogicalPlan)
        : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
      case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
        val relation = table match {
          case u: UnresolvedRelation if !u.isStreaming =>
            lookupRelation(u).getOrElse(u)
          case other => other
        }

这里的lookupRelation会根据是否有对应的Catalog的注册来判断是DS V1还是DS V2:

private def lookupRelation(
        u: UnresolvedRelation,
        timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
      lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
        expandIdentifier(u.multipartIdentifier) match {
          case CatalogAndIdentifier(catalog, ident) =>
            val key = catalog.name +: ident.namespace :+ ident.name
            AnalysisContext.get.relationCache.get(key).map(_.transform {
              case multi: MultiInstanceRelation =>
                val newRelation = multi.newInstance()
                newRelation.copyTagsFrom(multi)
                newRelation
            }).orElse {
              val table = CatalogV2Util.loadTable(catalog, ident, timeTravelSpec)
              val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming)
              loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
              loaded
            }
          case _ => None
        }
      }
    }
    ...
private def expandIdentifier(nameParts: Seq[String]): Seq[String] = {
    if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts
    if (nameParts.length == 1) {
      AnalysisContext.get.catalogAndNamespace :+ nameParts.head
    } else if (catalogManager.isCatalogRegistered(nameParts.head)) {
      nameParts
    } else {
      AnalysisContext.get.catalogAndNamespace.head +: nameParts
    }
  }
object CatalogAndIdentifier {
    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
    private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
    def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
      assert(nameParts.nonEmpty)
      if (nameParts.length == 1) {
        Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
      } else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
        // Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
        // API does not support view yet, and we have to use v1 commands to deal with global temp
        // views. To simplify the implementation, we put global temp views in a special namespace
        // in the session catalog. The special namespace has higher priority during name resolution.
        // For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
        // this custom catalog can't be accessed.
        Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
      } else {
        try {
          Some((catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier))
        } catch {
          case _: CatalogNotFoundException =>
            Some((currentCatalog, nameParts.asIdentifier))
        }
      }
    }
  }


expandIdentifier方法结合CatalogAndIdentifier.unapply方法,判断:


1.如果没有指定catalog,则 默认catalog 为v2SessionCatalog,catalog的名称为"spark_catalog",这也是spark默认的sessionCatalog 名称,跳到步骤3

如以下SQL: select a from table

2.如果指定了catalog,且catalog已经注册了(如以spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog),则catalog为指定的(如为JDBCTableCatalog,catalog的名称为"h2",跳到步骤3

如以下SQL:select a from h2.table

3.调用CatalogV2Util.loadTable方法也就是对应的Catalog的loadTable方法来获取对应的Table:

V2SessionCatalog catalog返回是的V1Table

JDBCTableCatalog catalog 返回的是JDBCTable

这样在下一步的createRelation 方法中就会根据不同的case转换为不同的relation:


private def createRelation(
        catalog: CatalogPlugin,
        ident: Identifier,
        table: Option[Table],
        options: CaseInsensitiveStringMap,
        isStreaming: Boolean): Option[LogicalPlan] = {
      ...
      case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) =>
          if (isStreaming) {
            if (v1Table.v1Table.tableType == CatalogTableType.VIEW) {
              throw QueryCompilationErrors.permanentViewNotSupportedByStreamingReadingAPIError(
                ident.quoted)
            }
            SubqueryAlias(
              catalog.name +: ident.asMultipartIdentifier,
              UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true))
          } else {
            v1SessionCatalog.getRelation(v1Table.v1Table, options)
          }
      ...
      case table =>
        ...
         } else {
            SubqueryAlias(
              catalog.name +: ident.asMultipartIdentifier,
              DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))

如果是V1Table,则会转换为UnresolvedCatalogRelation,继而在 Rule FindDataSourceTable中转为LogicalRelation,这里就会涉及lookupDataSource,也就是注册的datasource(如:“org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider” 或者 "org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2”(目前没有进行cast匹配))发生作用了(在providingInstance()方法中实现)

如果是其他的,则会转换为DataSourceV2Relation,继而在Rule V2ScanRelationPushDown中做一系列的下推优化

所以说 对于JDBC的catalog来说,想要进行DS V2的优化,就得配置:


spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog

如果想要对于其他DS v2的优化,如Parquet,就得实现对应的TableCatalog,再进行配置:

spark.sql.catalog.parquet=org.apache.spark.sql.execution.datasources.v2.jdbc.xxxx

关于TableCatalog


目前 jdbc的datasource和TableCatalog 在spark都是已经实现了:

## datasource
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
## TableCatalog
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog

如果想实现其他的datasource以及catalog,可以参考JDBC的实现(目前的JDBC的source实现还是基于 DS V1,最好是基于DS V2实现,如参考:ParquetDataSourceV2)。


在SPARK-28396也有这方面的讨论。

更进一步DS V2 Push Down的特性,参考技术前沿|Spark 3.3.0 中 DS V2 Push-down 的重构与新特性

相关文章
|
SQL 存储 分布式计算
spark outer join push down filter rule(spark 外连接中的下推规则)
spark outer join push down filter rule(spark 外连接中的下推规则)
289 0
spark outer join push down filter rule(spark 外连接中的下推规则)
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
180 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
82 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
56 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
115 0
|
2月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
118 6
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
145 2
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
117 1
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
79 1
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
77 1