SPARK中 会对Scan的小文件做合并到一个Task去处理么?

简介: SPARK中 会对Scan的小文件做合并到一个Task去处理么?

背景


本文基于SPARK 3.1.2

在之前查看SQL物理计划的时候,发现一个很奇怪的现象,文件的个数很多,但是启动的Task却很少。


结论


SPARK在scan文件的时候,会把小文件合并到一个Task上去处理。


分析


这里的SQL很简单:就是select col from table语句我们直接查看对应的计划:

image.png

image.png

可以看到对于有50000多个文件的source,最终却只有6000多个任务运行。

我们直接看对应的代码FileSourceScanExec实现:

    val splitFiles = selectedPartitions.flatMap { partition =>
      partition.files.flatMap { file =>
        // getPath() is very expensive so we only want to call it once in this block:
        val filePath = file.getPath
        val isSplitable = relation.fileFormat.isSplitable(
          relation.sparkSession, relation.options, filePath)
        PartitionedFileUtil.splitFiles(
          sparkSession = relation.sparkSession,
          file = file,
          filePath = filePath,
          isSplitable = isSplitable,
          maxSplitBytes = maxSplitBytes,
          partitionValues = partition.values
        )
      }
    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
    val partitions =
      FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
   new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

PartitionedFileUtil.splitFiles就是对每个文件进行遍历,如果一个文件超过了maxSplitBytes,这个可以参考Spark-读取Parquet-为什么task数量会多于Row Group的数量,就进行切分,否则就直接返回整个文件,

关键的在FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes):

...
 partitionedFiles.foreach { file =>
      if (currentSize + file.length > maxSplitBytes) {
        closePartition()
      }
      // Add the given file to the current partition.
      currentSize += file.length + openCostInBytes
      currentFiles += file
 }
...

她会根据maxSplitBytes来判断,如果文件小于该阈值,就会放到同一个FilePartition中,从而让一个Task去处理,这样就会出现了图上所展示的小文件很多但是Task缺比较小的现象。

相关文章
|
JavaScript CDN
js:spark-md5分片计算文件的md5值
js:spark-md5分片计算文件的md5值
903 0
|
16天前
|
分布式计算 监控 大数据
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径
|
1月前
|
SQL 分布式计算 Java
IDEA 打包 Spark 项目 POM 文件依赖
这是一个 Maven POM 示例,用于构建一个使用 Spark 与 Hive 的项目,目标是将数据从 Hive 导入 ClickHouse。POM 文件设置了 Scala 和 Spark 的依赖,包括 `spark-core_2.12`, `spark-sql_2.12`, 和 `spark-hive_2.12`。`maven-assembly-plugin` 插件用于打包,生成包含依赖的和不含依赖的两种 JAR 包。`scope` 说明了依赖的使用范围,如 `compile`(默认),`provided`,`runtime`,`test` 和 `system`。
|
1月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之spark3.1.1通过resource目录下的conf文件配置,报错如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
1月前
|
SQL 分布式计算 HIVE
[已解决]Job failed with org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in st
[已解决]Job failed with org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in st
154 0
|
SQL 缓存 分布式计算
SPARK中InMemoryFileIndex文件缓存导致的REFRESH TABLE tableName问题
SPARK中InMemoryFileIndex文件缓存导致的REFRESH TABLE tableName问题
197 0
|
存储 分布式计算 Java
Spark文件的读取和保存
Spark文件的读取和保存
236 0
|
JSON 分布式计算 监控
Spark案例读取不同格式文件以及读取输入数据存入Mysql
Spark案例读取不同格式文件以及读取输入数据存入Mysql
|
SQL 分布式计算 HIVE
spark sql编程之实现合并Parquet格式的DataFrame的schema
spark sql编程之实现合并Parquet格式的DataFrame的schema
310 0
spark sql编程之实现合并Parquet格式的DataFrame的schema
|
SQL JSON 分布式计算
spark2 sql读取json文件的格式要求
spark2 sql读取json文件的格式要求
139 0
spark2 sql读取json文件的格式要求