浅析Hive/Spark SQL读文件时的输入任务划分

简介: 本文最后留个思考题给读者们:如何设置参数彻底关闭Spark SQL data source表的文件合并?积极回答问题即可获得社区礼物。

作者:
王道远,花名健身,阿里云EMR技术专家,Apache Spark活跃贡献者,主要关注大数据计算优化相关工作。


Hive以及Spark SQL等大数据计算引擎为我们操作存储在HDFS上结构化数据提供了易于上手的SQL接口,大大降低了ETL等操作的门槛,也因此在实际生产中有着广泛的应用。SQL是非过程化语言,我们写SQL的时候并不能控制具体的执行过程,它们依赖执行引擎决定。而Hive和Spark SQL作为Map-Reduce模型的分布式执行引擎,其执行过程首先就涉及到如何将输入数据切分成一个个任务,分配给不同的Map任务。在本文中,我们就来讲解Hive和Spark SQL是如何切分输入路径的。

Hive

Hive是起步较早的SQL on Hadoop项目,最早也是诞生于Hadoop中,所以输入划分这部分的代码与Hadoop相关度非常高。现在Hive普遍使用的输入格式是CombineHiveInputFormat,它继承于HiveInputFormat,而HiveInputFormat实现了Hadoop的InputFormat接口,其中的getSplits方法用来获取具体的划分结果,划分出的一份输入数据被称为一个“Split”。在执行时,每个Split对应到一个map任务。在划分Split时,首先挑出不能合并到一起的目录——比如开启了事务功能的路径。这些不能合并的目录必须单独处理,剩下的路径交给私有方法getCombineSplits,这样Hive的一个map task最多可以处理多个目录下的文件。在实际操作中,我们一般只要通过set mapred.max.split.size=xx;即可控制文件合并的大小。当一个文件过大时,父类的getSplits也会帮我们完成相应的切分工作。

Spark SQL

Spark的表有两种:DataSource表和Hive表。另外Spark后续版本中DataSource V2也将逐渐流行,目前还在不断发展中,暂时就不在这里讨论。我们知道Spark SQL其实底层是Spark RDD,而RDD执行时,每个map task会处理RDD的一个Partition中的数据(注意这里的Partition是RDD的概念,要和表的Partition进行区分)。因此,Spark SQL作业的任务切分关键在于底层RDD的partition如何切分。

Data Source表

Spark SQL的DataSource表在最终执行的RDD类为FileScanRDD,由FileSourceScanExec创建出来。在创建这种RDD的时候,具体的Partition直接作为参数传给了构造函数,因此划分输入的方法也在DataSourceScanExec.scala文件中。具体分两步:首先把文件划分为PartitionFile,再将较小的PartitionFile进行合并。

第一步部分代码如下:

  if (fsRelation.fileFormat.isSplitable(
    fsRelation.sparkSession, fsRelation.options, file.getPath)) {
    (0L until file.getLen by maxSplitBytes).map { offset =>
    val remaining = file.getLen - offset
    val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
    val hosts = getBlockHosts(blockLocations, offset, size)
    PartitionedFile(
      partition.values, file.getPath.toUri.toString,
      offset, size, partitionDeleteDeltas, hosts)
    }
  } else {
    val hosts = getBlockHosts(blockLocations, 0, file.getLen)
    Seq(PartitionedFile(partition.values, file.getPath.toUri.toString,
    0, file.getLen, partitionDeleteDeltas, hosts))
  }

我们可以看出,Spark SQL首先根据文件类型判断单个文件是否能够切割,如果可以则按maxSplitBytes进行切割。如果一个文件剩余部分无法填满maxSplitBytes,也单独作为一个Partition。

第二部分代码如下所示:

  splitFiles.foreach { file =>
    if (currentSize + file.length > maxSplitBytes) {
      closePartition()
    }
    // Add the given file to the current partition.
    currentSize += file.length + openCostInBytes
    currentFiles += file
  }

这样我们就可以依次遍历第一步切好的块,再按照maxSplitBytes进行合并。注意合并文件时还需加上打开文件的预估代价openCostInBytes。那么maxSplitBytesopenCostInBytes这两个关键参数怎么来的呢?

  val defaultMaxSplitBytes =
    fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
  val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
  val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
  val bytesPerCore = totalBytes / defaultParallelism

  val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

不难看出,主要是spark.sql.files.maxPartitionBytesspark.sql.files.openCostInBytes、调度器默认并发度以及所有输入文件实际大小所控制。

Hive表

Spark SQL中的Hive表底层的RDD类为HadoopRDD,由HadoopTableReader类实现。不过这次,具体的Partition划分还是依赖HadoopRDDgetPartitions方法,具体实现如下:

  override def getPartitions: Array[Partition] = {
    ...
    try {
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      ...
    }
  }

不难看出,在处理Hive表的时候,Spark SQL把任务划分又交给了Hadoop的InputFormat那一套。不过需要注意的是,并不是所有Hive表都归为这一类,Spark SQL会默认对ORC和Parquet的表进行转化,用自己的Data Source实现OrcFileFormatParquetFileFormat来把这两种表作为Data Source表来处理。

总结

切分输入路径只是大数据处理的第一步,虽然不起眼,但是也绝对不可或缺。低效的文件划分可能会给端到端的执行速度带来巨大的负面影响,更有可能影响到输出作业的文件布局,从而影响到整个数据流水线上所有作业的执行效率。万事开头难,为程序输入选择合适的配置参数,可以有效改善程序执行效率。

留个思考题给读者们:如何设置参数彻底关闭Spark SQL data source表的文件合并?

加入钉群或微信群,微信联系小编回复你的方案,即可收到社区礼物。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。image.png

相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
4月前
|
分布式计算 监控 Spark
Spark 任务运行时日志分析
Spark 任务运行时日志分析
50 0
|
4月前
|
SQL HIVE
Hive sql 执行原理
Hive sql 执行原理
43 0
|
2天前
|
SQL 分布式计算 资源调度
一文解析 ODPS SQL 任务优化方法原理
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。
|
1月前
|
SQL 数据可视化 Apache
阿里云数据库内核 Apache Doris 兼容 Presto、Trino、ClickHouse、Hive 等近十种 SQL 方言,助力业务平滑迁移
阿里云数据库 SelectDB 内核 Doris 的 SQL 方言转换工具, Doris SQL Convertor 致力于提供高效、稳定的 SQL 迁移解决方案,满足用户多样化的业务需求。兼容 Presto、Trino、ClickHouse、Hive 等近十种 SQL 方言,助力业务平滑迁移。
阿里云数据库内核 Apache Doris 兼容 Presto、Trino、ClickHouse、Hive 等近十种 SQL 方言,助力业务平滑迁移
|
1月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
分布式计算 Spark 索引
Spark学习---day07、Spark内核(Shuffle、任务执行)
Spark学习---day07、Spark内核(源码提交流程、任务执行)
40 2
|
1月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
40 2
|
1月前
|
SQL 分布式计算 DataWorks
dataworks常见问题之通过sql查询查看任务依赖关系如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
29 1
|
2月前
|
SQL 消息中间件 关系型数据库
Flink报错问题之提交flink sql任务报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
SQL 存储 分布式计算
Spark与Hive的集成与互操作
Spark与Hive的集成与互操作