「困惑」
- spark sql 读取 parquet 文件,stage 生成任务 4 个 task,只有一个 task 处理数据,其它无
- spark 任务执行 apache iceberg rewriteDataFiles 合并小文件(parquet 文件),发现偶然无变化
「Parquet 文件详解」
一个 Parquet 文件是由一个 header 以及一个或多个 block 块组成,以一个 footer 结尾。
header 中只包含一个 4 个字节的数字 PAR1 用来识别整个 Parquet 文件格式。
文件中所有的 metadata 都存在于 footer 中。
footer 中的 metadata 包含了格式的版本信息,schema 信息、key-value paris 以及所有 block 中的 metadata 信息。
footer 中最后两个字段为一个以 4 个字节长度的 footer 的 metadata,以及同 header 中包含的一样的 PAR1。
Parquet 文件格式
上图展示了一个 Parquet 文件的结构
- 一个文件中可以存储多个行组,文件的首位都是该文件的 Magic Code,用于校验它是否是一个 Parquet 文件。
- Footer length 存储了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和当前文件的 Schema 信息。
- 每一页的开始都会存储该页的元数据,在 Parquet 中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引。
- 存储格式:Parquet 的存储模型主要由行组(Row Group 默认 128M)、列块(Column Chuck)、页(Page)组成。
- 支持数据嵌套模型:Parquet 支持嵌套的数据模型,类似于 Protocol Buffers。
可以看出在 Schema 中所有的基本类型字段都是叶子节点,在这个 Schema 中一共存在 6 个叶子节点,如果把这样的 Schema 转换成扁平式的关系模型,就可以理解为该表包含六个列。
❝Parquet 中没有 Map、Array 这样的复杂数据结构每一个数据模型的 schema 包含多个字段,每一个字段又可以包含多个字段,每一个字段有三个属性:重复数、数据类型和字段名, 重复数可以是以下三种:required(出现 1 次),repeated(出现 0 次或多次),optional(出现 0 次或 1 次)。每一个字段的数据类型可以分成两种:group(复杂类型)和 primitive(基本类型)。以上实现列式存储,但是无法将其恢复到原来的数据行的结构形式,Parquet 采用了 Dremel 中(R, D, V)模型 R,即 Repetition Level,用于表达一个列有重复,即有多个值的情况,其值为重复是在第几层上发生。D,即 Definition Level,用于表达某个列是否为空、在哪里为空,其值为当前列在第几层上有值 V,表示数据值
❞
- 行组,Row Group:Parquet 在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。
- 列块,Column Chunk:行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。
- 页,Page:Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位,同一列块的不同页可以使用不同的编码方式。
小结
- Parquet 是一种支持嵌套结构的列式存储格式,非常适用于 OLAP 场景,按列存储和扫描。
- 列存使得更容易对每个列使用高效的压缩和编码(一个页是最小的编码的单位),降低磁盘空间。
- 映射下推,这是列式存储最突出的优势,是指在获取数据时只需要扫描需要的列,不用全部扫描。
- 谓词下推,是指通过将一些过滤条件尽可能的在最底层执行以减少结果集。谓词就是指这些过滤条件,即返回。
实战
spark 2.4.0 读取 parquet 文件
❝spark.read.parquet("")
❞
org.apache.spark.sql.DataFrameReader.java val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) val jdbc = classOf[JdbcRelationProvider].getCanonicalName val json = classOf[JsonFileFormat].getCanonicalName val parquet = classOf[ParquetFileFormat].getCanonicalName val csv = classOf[CSVFileFormat].getCanonicalName val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" val nativeOrc = classOf[OrcFileFormat].getCanonicalName val socket = classOf[TextSocketSourceProvider].getCanonicalName --->DataSourceV2 val rate = classOf[RateStreamProvider].getCanonicalName --->DataSourceV2 private def loadV1Source(paths: String*) = { // Code path for data source v1. sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap).resolveRelation()) } org.apache.spark.sql.execution.datasources.DataSource.resolveRelation ->getOrInferFileFormatSchema() **Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list.** private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = { val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) new InMemoryFileIndex( sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache) } InMemoryFileIndex.refresh0() InMemoryFileIndex.listLeafFiles() InMemoryFileIndex.bulkListLeafFiles() val parallelPartitionDiscoveryParallelism = private[sql] def bulkListLeafFiles( ... spark.sql.sources.parallelPartitionDiscovery.parallelism 默认10000 sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism **设置并行度来防止下面的文件列表生成许多任务** **in case of large defaultParallelism.** **val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)** val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) val statusMap = try { val description = paths.size match { case 0 => s"Listing leaf files and directories 0 paths" case 1 => s"Listing leaf files and directories for 1 path:<br/>${paths(0)}" case s => s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..." } sparkContext.setJobDescription(description) sparkContext .parallelize(serializedPaths, numParallelism) .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => (path, listLeafFiles(path, hadoopConf, filter, None)) }.iterator }.map { case (path, statuses) => val serializableStatuses = statuses.map { status => // Turn FileStatus into SerializableFileStatus so we can send it back to the driver val blockLocations = status match { case f: LocatedFileStatus => f.getBlockLocations.map { loc => SerializableBlockLocation( loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) } case _ => Array.empty[SerializableBlockLocation] } SerializableFileStatus( status.getPath.toString, status.getLen, status.isDirectory, status.getReplication, status.getBlockSize, status.getModificationTime, status.getAccessTime, blockLocations) } (path.toString, serializableStatuses) }.collect() ... )
真正读取数据是 DataSourceScanExec
❝注意:这里有 DataSourceV2ScanExec v2 版本,经上面代码分析,parquet,orc 使用的是 v1 版 org.apache.spark.sql.execution.DataSourceScanExec.scala
❞
Physical plan node for scanning data from HadoopFsRelations. FileSourceScanExec private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) case _ => createNonBucketedReadRDD(readFile, selectedPartitions, relation) } } private def createNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { 128M val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes 4M val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes 上面代码sparkcontent设置的 val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") 切文件 val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => val blockLocations = getBlockLocations(file) if (fsRelation.fileFormat.isSplitable( fsRelation.sparkSession, fsRelation.options, file.getPath)) { (0L until file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(blockLocations, offset, size) PartitionedFile( partition.values, file.getPath.toUri.toString, offset, size, hosts) } } else { val hosts = getBlockHosts(blockLocations, 0, file.getLen) Seq(PartitionedFile( partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) } } }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) val partitions = new ArrayBuffer[FilePartition] val currentFiles = new ArrayBuffer[PartitionedFile] var currentSize = 0L /** Close the current partition and move to the next. */ 合并小文件,大文件就直接变为partition了。一路下来会以为会切大文件,然而并不会。 def closePartition(): Unit = { if (currentFiles.nonEmpty) { val newPartition = FilePartition( partitions.size, currentFiles.toArray.toSeq) // Copy to a new Array. partitions += newPartition } currentFiles.clear() currentSize = 0 } // Assign files to partitions using "Next Fit Decreasing" splitFiles.foreach { file => 这里遇到大文件直接放入partitions分区,小文件是几个大小达到maxSplitBytes,放入一个分区提高 if (currentSize + file.length > maxSplitBytes) { closePartition() } // Add the given file to the current partition. currentSize += file.length + openCostInBytes currentFiles += file } closePartition() new FileScanRDD(fsRelation.sparkSession, readFile, partitions) }
小结
- spark 2.4.0 读取 parquet,使用的是 loadV1Source
- spark 读取文件默认 task 任务数(分区数)最大 10000,最小是 path 的个数(注意并行度和任务数分区数区别)
- createNonBucketedReadRDD 中 Bucketed 理解,是指 hive 表中的分区下面的分桶
- rdd 分区数确认:合并小文件,大文件就直接变为 partition 了,注意大文件没有切,目的提高 cpu 利用率
FileScanRDD 和 parquetjar 本身提供的读写 api
org.apache.spark.sql.execution.datasources.FileScanRDD private def readCurrentFile(): Iterator[InternalRow] = { try { readFunction(currentFile) } catch { case e: FileNotFoundException => throw new FileNotFoundException( e.getMessage + "\n" + "It is possible the underlying files have been updated. " + "You can explicitly invalidate the cache in Spark by " + "running 'REFRESH TABLE tableName' command in SQL or " + "by recreating the Dataset/DataFrame involved.") } } ParquetFileFormat.buildReaderWithPartitionValues(该方法上面有提)构造reader, override def buildReaderWithPartitionValues( ... if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion lister before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) vectorizedReader.initialize(split, hadoopAttemptContext) vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { vectorizedReader.enableReturningBatches() } } else { ... reader.initialize(split, hadoopAttemptContext) } vectorizedReader.initialize(split, hadoopAttemptContext) ->SpecificParquetRecordReaderBase.initialize ->ParquetMetadata footer = readFooter(config, file, range(0, length));注意这里传入的range ->ParquetMetadataConverter.converter.readParquetMetadata(f, filter) public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException { FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() { @Override public FileMetaData visit(NoFilter filter) throws IOException { return readFileMetaData(from); } @Override public FileMetaData visit(SkipMetadataFilter filter) throws IOException { return readFileMetaData(from, true); } @Override public FileMetaData visit(OffsetMetadataFilter filter) throws IOException { return filterFileMetaDataByStart(readFileMetaData(from), filter); } @Override public FileMetaData visit(RangeMetadataFilter filter) throws IOException { return filterFileMetaDataByMidpoint(readFileMetaData(from), filter); } }); LOG.debug("{}", fileMetaData); ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData); if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata)); return parquetMetadata; } RangeMetadataFilter filterFileMetaDataByMidpoint(readFileMetaData(from), filter); static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) { List<RowGroup> rowGroups = metaData.getRow_groups(); List<RowGroup> newRowGroups = new ArrayList<RowGroup>(); for (RowGroup rowGroup : rowGroups) { long totalSize = 0; long startIndex = getOffset(rowGroup.getColumns().get(0)); for (ColumnChunk col : rowGroup.getColumns()) { totalSize += col.getMeta_data().getTotal_compressed_size(); } long midPoint = startIndex + totalSize / 2; if (filter.contains(midPoint)) { newRowGroups.add(rowGroup); } } metaData.setRow_groups(newRowGroups); return metaData; } 到这里分割的关键点找到 现在假设我们有一个40m 的文件, 只有一个 row group, 10m 一分, 那么将会有4个 partitions 但是只有一个 partition 会占有这个 row group 的中点, 所以也只有这一个 partition 会有数据
小结
- spark 读取 parquet 文件默认用 enableVectorizedReader,向量读
- 根据 DataSourceScanExec 代码中划分的 partitions, 但不是所有 partitions 最后都会有数据
- 对于 parquet 文件,对于一个大的文件只含有一个 rowgroup,task 中谁拥有这个文件的中点谁处理这个 rowgroup,这样解决文章开头的疑惑!!!