【Parquet】Spark读取Parquet问题详解……

简介: 【Parquet】Spark读取Parquet问题详解……


「困惑」

  1. spark sql 读取 parquet 文件,stage 生成任务 4 个 task,只有一个 task 处理数据,其它无
  2. spark 任务执行 apache iceberg rewriteDataFiles 合并小文件(parquet 文件),发现偶然无变化

「Parquet 文件详解」

一个 Parquet 文件是由一个 header 以及一个或多个 block 块组成,以一个 footer 结尾。

header 中只包含一个 4 个字节的数字 PAR1 用来识别整个 Parquet 文件格式。

文件中所有的 metadata 都存在于 footer 中。

footer 中的 metadata 包含了格式的版本信息,schema 信息、key-value paris 以及所有 block 中的 metadata 信息。

footer 中最后两个字段为一个以 4 个字节长度的 footer 的 metadata,以及同 header 中包含的一样的 PAR1。

640.png

Parquet 文件格式

640.png

上图展示了一个 Parquet 文件的结构

  • 一个文件中可以存储多个行组,文件的首位都是该文件的 Magic Code,用于校验它是否是一个 Parquet 文件。
  • Footer length 存储了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和当前文件的 Schema 信息。
  • 每一页的开始都会存储该页的元数据,在 Parquet 中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引。
  • 存储格式:Parquet 的存储模型主要由行组(Row Group 默认 128M)、列块(Column Chuck)、页(Page)组成。
  • 支持数据嵌套模型:Parquet 支持嵌套的数据模型,类似于 Protocol Buffers。

640.png

可以看出在 Schema 中所有的基本类型字段都是叶子节点,在这个 Schema 中一共存在 6 个叶子节点,如果把这样的 Schema 转换成扁平式的关系模型,就可以理解为该表包含六个列。

Parquet 中没有 Map、Array 这样的复杂数据结构每一个数据模型的 schema 包含多个字段,每一个字段又可以包含多个字段,每一个字段有三个属性:重复数、数据类型和字段名, 重复数可以是以下三种:required(出现 1 次),repeated(出现 0 次或多次),optional(出现 0 次或 1 次)。每一个字段的数据类型可以分成两种:group(复杂类型)和 primitive(基本类型)。以上实现列式存储,但是无法将其恢复到原来的数据行的结构形式,Parquet 采用了 Dremel 中(R, D, V)模型 R,即 Repetition Level,用于表达一个列有重复,即有多个值的情况,其值为重复是在第几层上发生。D,即 Definition Level,用于表达某个列是否为空、在哪里为空,其值为当前列在第几层上有值 V,表示数据值

  1. 行组,Row Group:Parquet 在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。
  2. 列块,Column Chunk:行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。
  3. 页,Page:Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位,同一列块的不同页可以使用不同的编码方式。

小结

  • Parquet 是一种支持嵌套结构的列式存储格式,非常适用于 OLAP 场景,按列存储和扫描。
  • 列存使得更容易对每个列使用高效的压缩和编码(一个页是最小的编码的单位),降低磁盘空间。
  • 映射下推,这是列式存储最突出的优势,是指在获取数据时只需要扫描需要的列,不用全部扫描。
  • 谓词下推,是指通过将一些过滤条件尽可能的在最底层执行以减少结果集。谓词就是指这些过滤条件,即返回。

实战

spark 2.4.0 读取 parquet 文件

spark.read.parquet("")

org.apache.spark.sql.DataFrameReader.java
    val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
    val json = classOf[JsonFileFormat].getCanonicalName
    val parquet = classOf[ParquetFileFormat].getCanonicalName
    val csv = classOf[CSVFileFormat].getCanonicalName
    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
    val nativeOrc = classOf[OrcFileFormat].getCanonicalName
    val socket = classOf[TextSocketSourceProvider].getCanonicalName   --->DataSourceV2
    val rate = classOf[RateStreamProvider].getCanonicalName     --->DataSourceV2
private def loadV1Source(paths: String*) = {
    // Code path for data source v1.
    sparkSession.baseRelationToDataFrame(
      DataSource.apply(
        sparkSession,
        paths = paths,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap).resolveRelation())
  }
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation
->getOrInferFileFormatSchema()
**Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list.**
  private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
    new InMemoryFileIndex(
      sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
  }
InMemoryFileIndex.refresh0()
InMemoryFileIndex.listLeafFiles()
InMemoryFileIndex.bulkListLeafFiles()
val parallelPartitionDiscoveryParallelism =
private[sql] def bulkListLeafFiles(
...
spark.sql.sources.parallelPartitionDiscovery.parallelism 默认10000
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
    **设置并行度来防止下面的文件列表生成许多任务**
    **in case of large defaultParallelism.**
    **val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)**
    val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
    val statusMap = try {
      val description = paths.size match {
        case 0 =>
          s"Listing leaf files and directories 0 paths"
        case 1 =>
          s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
        case s =>
          s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
      }
      sparkContext.setJobDescription(description)
      sparkContext
        .parallelize(serializedPaths, numParallelism)
        .mapPartitions { pathStrings =>
          val hadoopConf = serializableConfiguration.value
          pathStrings.map(new Path(_)).toSeq.map { path =>
            (path, listLeafFiles(path, hadoopConf, filter, None))
          }.iterator
        }.map { case (path, statuses) =>
        val serializableStatuses = statuses.map { status =>
          // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
          val blockLocations = status match {
            case f: LocatedFileStatus =>
              f.getBlockLocations.map { loc =>
                SerializableBlockLocation(
                  loc.getNames,
                  loc.getHosts,
                  loc.getOffset,
                  loc.getLength)
              }
            case _ =>
              Array.empty[SerializableBlockLocation]
          }
          SerializableFileStatus(
            status.getPath.toString,
            status.getLen,
            status.isDirectory,
            status.getReplication,
            status.getBlockSize,
            status.getModificationTime,
            status.getAccessTime,
            blockLocations)
        }
        (path.toString, serializableStatuses)
      }.collect()
...
)

真正读取数据是 DataSourceScanExec

注意:这里有 DataSourceV2ScanExec v2 版本,经上面代码分析,parquet,orc 使用的是 v1 版 org.apache.spark.sql.execution.DataSourceScanExec.scala

Physical plan node for scanning data from HadoopFsRelations.
FileSourceScanExec
 private lazy val inputRDD: RDD[InternalRow] = {
    val readFile: (PartitionedFile) => Iterator[InternalRow] =
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = relation.options,
        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    relation.bucketSpec match {
      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
      case _ =>
        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    }
  }
private def createNonBucketedReadRDD(
      readFile: (PartitionedFile) => Iterator[InternalRow],
      selectedPartitions: Seq[PartitionDirectory],
      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
128M
    val defaultMaxSplitBytes =
      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
4M
    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
上面代码sparkcontent设置的
    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
    val bytesPerCore = totalBytes / defaultParallelism
    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
      s"open cost is considered as scanning $openCostInBytes bytes.")
切文件
    val splitFiles = selectedPartitions.flatMap { partition =>
      partition.files.flatMap { file =>
        val blockLocations = getBlockLocations(file)
        if (fsRelation.fileFormat.isSplitable(
            fsRelation.sparkSession, fsRelation.options, file.getPath)) {
          (0L until file.getLen by maxSplitBytes).map { offset =>
            val remaining = file.getLen - offset
            val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
            val hosts = getBlockHosts(blockLocations, offset, size)
            PartitionedFile(
              partition.values, file.getPath.toUri.toString, offset, size, hosts)
          }
        } else {
          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
          Seq(PartitionedFile(
            partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
        }
      }
    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
    val partitions = new ArrayBuffer[FilePartition]
    val currentFiles = new ArrayBuffer[PartitionedFile]
    var currentSize = 0L
    /** Close the current partition and move to the next. */
合并小文件,大文件就直接变为partition了。一路下来会以为会切大文件,然而并不会。
    def closePartition(): Unit = {
      if (currentFiles.nonEmpty) {
        val newPartition =
          FilePartition(
            partitions.size,
            currentFiles.toArray.toSeq) // Copy to a new Array.
        partitions += newPartition
      }
      currentFiles.clear()
      currentSize = 0
    }
    // Assign files to partitions using "Next Fit Decreasing"
    splitFiles.foreach { file =>
这里遇到大文件直接放入partitions分区,小文件是几个大小达到maxSplitBytes,放入一个分区提高
      if (currentSize + file.length > maxSplitBytes) {
        closePartition()
      }
      // Add the given file to the current partition.
      currentSize += file.length + openCostInBytes
      currentFiles += file
    }
    closePartition()
    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
  }

小结

  1. spark 2.4.0 读取 parquet,使用的是 loadV1Source
  2. spark 读取文件默认 task 任务数(分区数)最大 10000,最小是 path 的个数(注意并行度和任务数分区数区别)
  3. createNonBucketedReadRDD 中 Bucketed 理解,是指 hive 表中的分区下面的分桶
  4. rdd 分区数确认:合并小文件,大文件就直接变为 partition 了,注意大文件没有切,目的提高 cpu 利用率

FileScanRDD 和 parquetjar 本身提供的读写 api

org.apache.spark.sql.execution.datasources.FileScanRDD
 private def readCurrentFile(): Iterator[InternalRow] = {
        try {
          readFunction(currentFile)
        } catch {
          case e: FileNotFoundException =>
            throw new FileNotFoundException(
              e.getMessage + "\n" +
                "It is possible the underlying files have been updated. " +
                "You can explicitly invalidate the cache in Spark by " +
                "running 'REFRESH TABLE tableName' command in SQL or " +
                "by recreating the Dataset/DataFrame involved.")
        }
      }
ParquetFileFormat.buildReaderWithPartitionValues(该方法上面有提)构造reader,
override def buildReaderWithPartitionValues(
...
if (enableVectorizedReader) {
        val vectorizedReader = new VectorizedParquetRecordReader(
          convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
        val iter = new RecordReaderIterator(vectorizedReader)
        // SPARK-23457 Register a task completion lister before `initialization`.
        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
        vectorizedReader.initialize(split, hadoopAttemptContext)
        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
        if (returningBatch) {
          vectorizedReader.enableReturningBatches()
        }
      } else {
...
        reader.initialize(split, hadoopAttemptContext)
}
vectorizedReader.initialize(split, hadoopAttemptContext)
->SpecificParquetRecordReaderBase.initialize
 ->ParquetMetadata footer = readFooter(config, file, range(0, length));注意这里传入的range
->ParquetMetadataConverter.converter.readParquetMetadata(f, filter)
public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
    FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
      @Override
      public FileMetaData visit(NoFilter filter) throws IOException {
        return readFileMetaData(from);
      }
      @Override
      public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
        return readFileMetaData(from, true);
      }
      @Override
      public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
        return filterFileMetaDataByStart(readFileMetaData(from), filter);
      }
      @Override
      public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
        return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
      }
    });
    LOG.debug("{}", fileMetaData);
    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
    if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
    return parquetMetadata;
  }
RangeMetadataFilter  filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
 static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
    List<RowGroup> rowGroups = metaData.getRow_groups();
    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
    for (RowGroup rowGroup : rowGroups) {
      long totalSize = 0;
      long startIndex = getOffset(rowGroup.getColumns().get(0));
      for (ColumnChunk col : rowGroup.getColumns()) {
        totalSize += col.getMeta_data().getTotal_compressed_size();
      }
      long midPoint = startIndex + totalSize / 2;
      if (filter.contains(midPoint)) {
        newRowGroups.add(rowGroup);
      }
    }
    metaData.setRow_groups(newRowGroups);
    return metaData;
  }
到这里分割的关键点找到
现在假设我们有一个40m 的文件, 只有一个 row group, 10m 一分, 那么将会有4个 partitions
但是只有一个 partition 会占有这个 row group 的中点, 所以也只有这一个 partition 会有数据

小结

  1. spark 读取 parquet 文件默认用 enableVectorizedReader,向量读
  2. 根据 DataSourceScanExec 代码中划分的 partitions, 但不是所有 partitions 最后都会有数据
  3. 对于 parquet 文件,对于一个大的文件只含有一个 rowgroup,task 中谁拥有这个文件的中点谁处理这个 rowgroup,这样解决文章开头的疑惑!!!
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
阿里云实时数仓实战 - 项目介绍及架构设计
课程简介 1)学习搭建一个数据仓库的过程,理解数据在整个数仓架构的从采集、存储、计算、输出、展示的整个业务流程。 2)整个数仓体系完全搭建在阿里云架构上,理解并学会运用各个服务组件,了解各个组件之间如何配合联动。 3&nbsp;)前置知识要求 &nbsp; 课程大纲 第一章&nbsp;了解数据仓库概念 初步了解数据仓库是干什么的 第二章&nbsp;按照企业开发的标准去搭建一个数据仓库 数据仓库的需求是什么 架构 怎么选型怎么购买服务器 第三章&nbsp;数据生成模块 用户形成数据的一个准备 按照企业的标准,准备了十一张用户行为表 方便使用 第四章&nbsp;采集模块的搭建 购买阿里云服务器 安装 JDK 安装 Flume 第五章&nbsp;用户行为数据仓库 严格按照企业的标准开发 第六章&nbsp;搭建业务数仓理论基础和对表的分类同步 第七章&nbsp;业务数仓的搭建&nbsp; 业务行为数仓效果图&nbsp;&nbsp;
相关文章
|
存储 SQL JSON
Spark - 一文搞懂 parquet
parquet 文件常见于 Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲parquet 文件在 spark 场景下的存储,读取与使用中可能遇到的坑。......
1328 0
Spark - 一文搞懂 parquet
|
SQL 分布式计算 HIVE
spark sql编程之实现合并Parquet格式的DataFrame的schema
spark sql编程之实现合并Parquet格式的DataFrame的schema
290 0
spark sql编程之实现合并Parquet格式的DataFrame的schema
|
存储 分布式计算 大数据
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
425 0
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
|
SQL 存储 JSON
Apache Spark,Parquet和麻烦的Null
  关于类型安全性的经验教训,并承担过多   介绍   在将SQL分析ETL管道迁移到客户端的新Apache Spark批处理ETL基础结构时,我注意到了一些奇特的东西。 开发的基础结构具有可为空的DataFrame列架构的概念。 乍看起来似乎并不奇怪。 大多数(如果不是全部)SQL数据库都允许列为可空或不可空,对吗? 让我们研究一下在创建Spark DataFrame时,这种看似明智的概念为什么会带来问题。   from pyspark.sql import types   schema=types.StructType([
760 0
|
SQL 存储 缓存
Spark SQL的Parquet那些事儿
Parquet是一种列式存储格式,很多种处理引擎都支持这种存储格式,也是sparksql的默认存储格式。Spark SQL支持灵活的读和写Parquet文件,并且对parquet文件的schema可以自动解析。当Spark SQL需要写成Parquet文件时,处于兼容的原因所有的列都被自动转化为了nullable。 1读写Parquet文件 // Encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._ val peop
688 0
|
存储 SQL 分布式计算
数据湖实操讲解【 JindoTable 计算加速】第十九讲:Spark 对 OSS 上的 Parquet 数据进行查询加速
数据湖 JindoFS+OSS 实操干货 36讲 每周二16点准时直播! 扫文章底部二维码入钉群,线上准时观看~ Github链接: https://github.com/aliyun/alibabacloud-jindofs
数据湖实操讲解【 JindoTable 计算加速】第十九讲:Spark 对 OSS 上的 Parquet 数据进行查询加速
|
存储 分布式计算 大数据
# Apache Spark系列技术直播# 第七讲 【 大数据列式存储之 Parquet/ORC 】
主讲人:诚历(孙大鹏)阿里巴巴计算平台事业部EMR技术专家 简介:Parquet 和 ORC 是大数据生态里最常用到的两个列式存储引擎,这两者在实现上有什异同,哪个效率更好,哪个性能更优,本次分享将和您一起探索两大列式存储。
1555 0
|
分布式计算 Spark Go