SPARK中的FileSourceStrategy,DataSourceStrategy以及DataSourceV2Strategy规则

简介: SPARK中的FileSourceStrategy,DataSourceStrategy以及DataSourceV2Strategy规则

背景

本文基于SPARK 3.3.0

用于记录Spark中V1和V2的Datasource以及FileSource的作用以及区别,以及在Spark 3.3.0出现的更强的Datasource v2 JDBC的下推


分析

在spark 3.3.0中 出现了DS V2 push down的功能,该功能是能够更好的进行下推,比如说更加复杂的聚合下推和过滤下推。


v1中的DataSource和FileSource下推

这里就得提及到V1中的 DataSourceStrategy 和 FileSourceStrategy 这两个Rule。

其中FileSourceStrategy主要针对的是hdfs文件,比如说是hive表的转换,具体可见这里,

会把相应的LogicalRelation 转换为FileSourceScanExec

FileSourceScanExec(
          fsRelation,
          outputAttributes,
          outputSchema,
          partitionKeyFilters.toSeq,
          bucketSet,
          None,
          dataFilters,
          table.map(_.identifier))

涉及到filter过滤的地方为partitionKeyFilters 和dataFilters,partitionKeyFilters针对的是分区级别的过滤,比如说只选择某个个分区,或者动态分区裁剪涉及的分区,dataFilters涉及到非分区的列的过滤,这样在读取文件的时候,就会进行对应的过滤,如下:


  @transient lazy val selectedPartitions: Array[PartitionDirectory] = {
   ...
  @transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = {
   ...
lazy val inputRDD: RDD[InternalRow] = {
    val readFile: (PartitionedFile) => Iterator[InternalRow] =
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = relation.options,
        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    val readRDD = if (bucketedScan) {
      createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
        relation)
    } else {
      createReadRDD(readFile, dynamicallySelectedPartitions, relation)
    }
    sendDriverMetrics()
    readRDD
  }


这样在读取文件的时候,就只会读取对应的分区,从而减少IO,

而 dataFilters的用处就在于


private lazy val pushedDownFilters = {
    val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
    // `dataFilters` should not include any metadata col filters
    // because the metadata struct has been flatted in FileSourceStrategy
    // and thus metadata col filters are invalid to be pushed down
    dataFilters.filterNot(_.references.exists {
      case FileSourceMetadataAttribute(_) => true
      case _ => false
    }).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
  }

pushedDownFilters 会在具体读取数据的时候,进行过滤。对于不同的FileFormat有不同的处理方式


对于DataSourceStrategy,是处理使用source api的定义的Data Source,会把对应的LogicalRelation转换为RowDataSourceScanExec:


def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
    case ScanOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
      pruneFilterProjectRaw(
        l,
        projects,
        filters,
        (requestedColumns, allPredicates, _) =>
          toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
...

其中pruneFilterProjectRaw有方法:

 val candidatePredicates = filterPredicates.map { _ transform {
      case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
    }}
 val (unhandledPredicates, pushedFilters, handledFilters) =
      selectFilters(relation.relation, candidatePredicates)

selectFilters 就会把对应的Catalyst的Filters转换为data source Filters,

 val scan = RowDataSourceScanExec(
        requestedColumns,
        requestedColumns.toStructType,
        pushedFilters.toSet,
        handledFilters,
        PushedDownOperators(None, None, None, Seq.empty, Seq.empty),
        scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
        relation.relation,
        relation.catalogTable.map(_.identifier))
      filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)

这样pushedFilters就会传入scanBuilder作为scanBuilder的参数进行datasource级别的过滤

  • v2 中的 Datasource 下推
  • 目前从实现来看,支持JDBC Parquet Orc类型的下推(需要设置一下SQLConf.USE_V1_SOURCE_LIST),这里的实现和V1的不一样,DataSourceV2Strategy 只是做的物理计划的转换,对于下推操作是在优化rule V2ScanRelationPushDown中完成的:
object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper with AliasHelper {
  import DataSourceV2Implicits._
  def apply(plan: LogicalPlan): LogicalPlan = {
    val pushdownRules = Seq[LogicalPlan => LogicalPlan] (
      createScanBuilder,
      pushDownSample,
      pushDownFilters,
      pushDownAggregates,
      pushDownLimits,
      pruneColumns)
    pushdownRules.foldLeft(plan) { (newPlan, pushDownRule) =>
      pushDownRule(newPlan)
    }
  }

这里面就涉及到了filter和aggregate等下推的判断,

其中createScanBuilder会创建一个ScanBuilderHolder,在pushDownFilters中会调用pushFilters方法,从而调用scanBuilder的pushPredicates方法从而把需要下推的谓词给记录下来,如JDBCScanBuilder

override def build(): Scan = {
    val resolver = session.sessionState.conf.resolver
    val timeZoneId = session.sessionState.conf.sessionLocalTimeZone
    val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
    // the `finalSchema` is either pruned in pushAggregation (if aggregates are
    // pushed down), or pruned in pruneColumns (in regular column pruning). These
    // two are mutual exclusive.
    // For aggregate push down case, we want to pass down the quoted column lists such as
    // "DEPT","NAME",MAX("SALARY"),MIN("BONUS"), instead of getting column names from
    // prunedSchema and quote them (will become "MAX(SALARY)", "MIN(BONUS)" and can't
    // be used in sql string.
    JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), finalSchema, pushedPredicate,
      pushedAggregateList, pushedGroupBys, tableSample, pushedLimit, sortOrders)
  }

pruneColumns方法中会进行scan的构建, 这样在DataSourceV2Strategy规则的时候就会使用该scan构建对应的RDD:


override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case PhysicalOperation(project, filters, DataSourceV2ScanRelation(
      _, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _)) =>
      val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
      if (v1Relation.schema != scan.readSchema()) {
        throw QueryExecutionErrors.fallbackV1RelationReportsInconsistentSchemaError(
          scan.readSchema(), v1Relation.schema)
      }
      val rdd = v1Relation.buildScan()
      val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
      val dsScan = RowDataSourceScanExec(
        output,
        output.toStructType,
        Set.empty,
        pushed.toSet,
        pushedDownOperators,
        unsafeRowRDD,
        v1Relation,
        tableIdentifier = None)
      withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false) :: Nil
....
case class JDBCScan(
    relation: JDBCRelation,
    prunedSchema: StructType,
    pushedPredicates: Array[Predicate],
    pushedAggregateColumn: Array[String] = Array(),
    groupByColumns: Option[Array[String]],
    tableSample: Option[TableSampleInfo],
    pushedLimit: Int,
    sortOrders: Array[SortOrder]) extends V1Scan {
  override def readSchema(): StructType = prunedSchema
  override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = {
    new BaseRelation with TableScan {
      override def sqlContext: SQLContext = context
      override def schema: StructType = prunedSchema
      override def needConversion: Boolean = relation.needConversion
      override def buildScan(): RDD[Row] = {
        val columnList = if (groupByColumns.isEmpty) {
          prunedSchema.map(_.name).toArray
        } else {
          pushedAggregateColumn
        }
        relation.buildScan(columnList, prunedSchema, pushedPredicates, groupByColumns, tableSample,
          pushedLimit, sortOrders)
      }
    }.asInstanceOf[T]
  }

看到对于JDBC的谓词下推,是调用toV1TableScan方法tableScan构建,之后再调用buildScan构建RDD,最后再构建RowDataSourceScanExec 物理计划,这样就完成了V2 Datasource的下推。

对于Parquet和Orc的下推,可以参考:


case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
      // projection and filters were already pushed down in the optimizer.
      // this uses PhysicalOperation to get the projection and ensure that if the batch scan does
      // not support columnar, a projection is added to convert the rows to UnsafeRow.
      val (runtimeFilters, postScanFilters) = filters.partition {
        case _: DynamicPruning => true
        case _ => false
      }
      val batchExec = BatchScanExec(relation.output, relation.scan, runtimeFilters,
        relation.keyGroupedPartitioning)
      withProjectAndFilter(project, postScanFilters, batchExec, !batchExec.supportsColumnar) :: Nil


v1 v2 Datasource下推的区别

V1中的下推在构建对应的scan 物理计划的时候一并生成的,而在V2中是单独在V2ScanRelationPushDown规则中进行构建,而在物理计划生成阶段只是调用生成RDD的方法。


对于SQLConf.USE_V1_SOURCE_LIST的解释

默认SQLConf.USE_V1_SOURCE_LIST的值为avro,csv,json,kafka,orc,parquet,text,这几个FileDataSource都是继承FileDataSourceV2:

trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
  /**
   * Returns a V1 [[FileFormat]] class of the same file data source.
   * This is a solution for the following cases:
   * 1. File datasource V2 implementations cause regression. Users can disable the problematic data
   *    source via SQL configuration and fall back to FileFormat.
   * 2. Catalog support is required, which is still under development for data source V2.
   */
  def fallbackFileFormat: Class[_ <: FileFormat]
  lazy val sparkSession = SparkSession.active


这里的fallbackFileFormat中的注释说的比较清楚,当然也可以参考SPARK-28396里的评论,具体的回退规则是在FallBackFileSourceV2 Rule中:

/* Replace the File source V2 table in InsertIntoStatement to V1 FileFormat. E.g, with temporary view t using    
   org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2, inserting into view t fails since there is no   
   corresponding physical plan. This is a temporary hack for making current data source V2 work. It should be 
   removed when Catalog support of file data source v2 is finished.*/
class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    case i @ InsertIntoStatement(
        d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _) =>
      val v1FileFormat = table.fallbackFileFormat.newInstance()
      val relation = HadoopFsRelation(
        table.fileIndex,
        table.fileIndex.partitionSchema,
        table.schema,
        None,
        v1FileFormat,
        d.options.asScala.toMap)(sparkSession)
      i.copy(table = LogicalRelation(relation))
  }
}


相关文章
|
3月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
152 1
|
3月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
3月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
存储 分布式计算 并行计算
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
存储 分布式计算 对象存储
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
SQL 分布式计算 HIVE
SPARK统计信息的来源-通过优化规则来分析
SPARK统计信息的来源-通过优化规则来分析
565 0
SPARK统计信息的来源-通过优化规则来分析
|
SQL 存储 分布式计算
spark outer join push down filter rule(spark 外连接中的下推规则)
spark outer join push down filter rule(spark 外连接中的下推规则)
232 0
spark outer join push down filter rule(spark 外连接中的下推规则)
|
1月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
83 1
Spark快速大数据分析PDF下载读书分享推荐
|
13天前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
56 3