本文基于SPARK 3.3.0
用于记录Spark中V1和V2的Datasource以及FileSource的作用以及区别,以及在Spark 3.3.0出现的更强的Datasource v2 JDBC的下推
在spark 3.3.0中 出现了DS V2 push down的功能,该功能是能够更好的进行下推,比如说更加复杂的聚合下推和过滤下推。
这里就得提及到V1中的 DataSourceStrategy 和 FileSourceStrategy 这两个Rule。
会把相应的LogicalRelation 转换为FileSourceScanExec
FileSourceScanExec( fsRelation, outputAttributes, outputSchema, partitionKeyFilters.toSeq, bucketSet, None, dataFilters, table.map(_.identifier))
涉及到filter过滤的地方为partitionKeyFilters 和dataFilters,partitionKeyFilters针对的是分区级别的过滤,比如说只选择某个个分区,或者动态分区裁剪涉及的分区,dataFilters涉及到非分区的列的过滤,这样在读取文件的时候,就会进行对应的过滤,如下:
@transient lazy val selectedPartitions: Array[PartitionDirectory] = { ... @transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = { ... lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) val readRDD = if (bucketedScan) { createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, relation) } else { createReadRDD(readFile, dynamicallySelectedPartitions, relation) } sendDriverMetrics() readRDD }
而 dataFilters的用处就在于
private lazy val pushedDownFilters = { val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) // `dataFilters` should not include any metadata col filters // because the metadata struct has been flatted in FileSourceStrategy // and thus metadata col filters are invalid to be pushed down dataFilters.filterNot(_.references.exists { case FileSourceMetadataAttribute(_) => true case _ => false }).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) }
pushedDownFilters 会在具体读取数据的时候,进行过滤。对于不同的FileFormat有不同的处理方式
对于DataSourceStrategy,是处理使用source api的定义的Data Source,会把对应的LogicalRelation转换为RowDataSourceScanExec:
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { case ScanOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) => pruneFilterProjectRaw( l, projects, filters, (requestedColumns, allPredicates, _) => toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil ...
val candidatePredicates = filterPredicates.map { _ transform { case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. }} val (unhandledPredicates, pushedFilters, handledFilters) = selectFilters(relation.relation, candidatePredicates)
selectFilters 就会把对应的Catalyst的Filters转换为data source Filters,
val scan = RowDataSourceScanExec( requestedColumns, requestedColumns.toStructType, pushedFilters.toSet, handledFilters, PushedDownOperators(None, None, None, Seq.empty, Seq.empty), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, relation.catalogTable.map(_.identifier)) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
- v2 中的 Datasource 下推
- 目前从实现来看,支持JDBC Parquet Orc类型的下推(需要设置一下SQLConf.USE_V1_SOURCE_LIST),这里的实现和V1的不一样,DataSourceV2Strategy 只是做的物理计划的转换,对于下推操作是在优化rule V2ScanRelationPushDown中完成的:
object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper with AliasHelper { import DataSourceV2Implicits._ def apply(plan: LogicalPlan): LogicalPlan = { val pushdownRules = Seq[LogicalPlan => LogicalPlan] ( createScanBuilder, pushDownSample, pushDownFilters, pushDownAggregates, pushDownLimits, pruneColumns) pushdownRules.foldLeft(plan) { (newPlan, pushDownRule) => pushDownRule(newPlan) } }
override def build(): Scan = { val resolver = session.sessionState.conf.resolver val timeZoneId = session.sessionState.conf.sessionLocalTimeZone val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions) // the `finalSchema` is either pruned in pushAggregation (if aggregates are // pushed down), or pruned in pruneColumns (in regular column pruning). These // two are mutual exclusive. // For aggregate push down case, we want to pass down the quoted column lists such as // "DEPT","NAME",MAX("SALARY"),MIN("BONUS"), instead of getting column names from // prunedSchema and quote them (will become "MAX(SALARY)", "MIN(BONUS)" and can't // be used in sql string. JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), finalSchema, pushedPredicate, pushedAggregateList, pushedGroupBys, tableSample, pushedLimit, sortOrders) }
在pruneColumns方法中会进行scan的构建, 这样在DataSourceV2Strategy规则的时候就会使用该scan构建对应的RDD:
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(project, filters, DataSourceV2ScanRelation( _, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _)) => val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext) if (v1Relation.schema != scan.readSchema()) { throw QueryExecutionErrors.fallbackV1RelationReportsInconsistentSchemaError( scan.readSchema(), v1Relation.schema) } val rdd = v1Relation.buildScan() val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) val dsScan = RowDataSourceScanExec( output, output.toStructType, Set.empty, pushed.toSet, pushedDownOperators, unsafeRowRDD, v1Relation, tableIdentifier = None) withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false) :: Nil .... case class JDBCScan( relation: JDBCRelation, prunedSchema: StructType, pushedPredicates: Array[Predicate], pushedAggregateColumn: Array[String] = Array(), groupByColumns: Option[Array[String]], tableSample: Option[TableSampleInfo], pushedLimit: Int, sortOrders: Array[SortOrder]) extends V1Scan { override def readSchema(): StructType = prunedSchema override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = { new BaseRelation with TableScan { override def sqlContext: SQLContext = context override def schema: StructType = prunedSchema override def needConversion: Boolean = relation.needConversion override def buildScan(): RDD[Row] = { val columnList = if (groupByColumns.isEmpty) { prunedSchema.map(_.name).toArray } else { pushedAggregateColumn } relation.buildScan(columnList, prunedSchema, pushedPredicates, groupByColumns, tableSample, pushedLimit, sortOrders) } }.asInstanceOf[T] }
看到对于JDBC的谓词下推,是调用toV1TableScan方法tableScan构建,之后再调用buildScan构建RDD,最后再构建RowDataSourceScanExec 物理计划,这样就完成了V2 Datasource的下推。
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) => // projection and filters were already pushed down in the optimizer. // this uses PhysicalOperation to get the projection and ensure that if the batch scan does // not support columnar, a projection is added to convert the rows to UnsafeRow. val (runtimeFilters, postScanFilters) = filters.partition { case _: DynamicPruning => true case _ => false } val batchExec = BatchScanExec(relation.output, relation.scan, runtimeFilters, relation.keyGroupedPartitioning) withProjectAndFilter(project, postScanFilters, batchExec, !batchExec.supportsColumnar) :: Nil
v1 v2 Datasource下推的区别
V1中的下推在构建对应的scan 物理计划的时候一并生成的,而在V2中是单独在V2ScanRelationPushDown规则中进行构建,而在物理计划生成阶段只是调用生成RDD的方法。
trait FileDataSourceV2 extends TableProvider with DataSourceRegister { /** * Returns a V1 [[FileFormat]] class of the same file data source. * This is a solution for the following cases: * 1. File datasource V2 implementations cause regression. Users can disable the problematic data * source via SQL configuration and fall back to FileFormat. * 2. Catalog support is required, which is still under development for data source V2. */ def fallbackFileFormat: Class[_ <: FileFormat] lazy val sparkSession = SparkSession.active
这里的fallbackFileFormat中的注释说的比较清楚,当然也可以参考SPARK-28396里的评论,具体的回退规则是在FallBackFileSourceV2 Rule中:
/* Replace the File source V2 table in InsertIntoStatement to V1 FileFormat. E.g, with temporary view t using org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2, inserting into view t fails since there is no corresponding physical plan. This is a temporary hack for making current data source V2 work. It should be removed when Catalog support of file data source v2 is finished.*/ class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoStatement( d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _) => val v1FileFormat = table.fallbackFileFormat.newInstance() val relation = HadoopFsRelation( table.fileIndex, table.fileIndex.partitionSchema, table.schema, None, v1FileFormat, d.options.asScala.toMap)(sparkSession) i.copy(table = LogicalRelation(relation)) } }