引言
典型的Spark作业读取位于OSS的Parquet外表时,源端的并发度(task/partition)如何确定?特别是在做TPCH测试时有一些疑问,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行分析进而解答这些疑问。
分析
数据源读取对应的物理执行节点为FileSourceScanExec,读取数据代码块如下
lazyvalinputRDD: RDD[InternalRow] = { valreadFile: (PartitionedFile) =>Iterator[InternalRow] =relation.fileFormat.buildReaderWithPartitionValues( sparkSession=relation.sparkSession, dataSchema=relation.dataSchema, partitionSchema=relation.partitionSchema, requiredSchema=requiredSchema, filters=pushedDownFilters, options=relation.options, hadoopConf=relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) valreadRDD=if (bucketedScan) { createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, relation) } else { createReadRDD(readFile, dynamicallySelectedPartitions, relation) } sendDriverMetrics() readRDD }
主要关注非bucket的处理,对于非bucket的扫描调用createReadRDD方法定义如下
/*** Create an RDD for non-bucketed reads.* The bucketed variant of this function is [[createBucketedReadRDD]].** @param readFile a function to read each (part of a) file.* @param selectedPartitions Hive-style partition that are part of the read.* @param fsRelation [[HadoopFsRelation]] associated with the read.*/privatedefcreateReadRDD( readFile: (PartitionedFile) =>Iterator[InternalRow], selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { // 文件打开开销,每次打开文件最少需要读取的字节 valopenCostInBytes=fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes// 最大切分分片大小valmaxSplitBytes=FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, "+s"open cost is considered as scanning $openCostInBytes bytes.") // Filter files with bucket pruning if possiblevalbucketingEnabled=fsRelation.sparkSession.sessionState.conf.bucketingEnabledvalshouldProcess: Path=>Boolean=optionalBucketSetmatch { caseSome(bucketSet) ifbucketingEnabled=>// Do not prune the file if bucket file name is invalidfilePath=>BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) case_=>_=>true } // 对分区下文件进行切分并按照从大到小进行排序valsplitFiles=selectedPartitions.flatMap { partition=>partition.files.flatMap { file=>// getPath() is very expensive so we only want to call it once in this block:valfilePath=file.getPathif (shouldProcess(filePath)) { // 文件是否可split,parquet/orc/avro均可被splitvalisSplitable=relation.fileFormat.isSplitable( relation.sparkSession, relation.options, filePath) // 切分文件PartitionedFileUtil.splitFiles( sparkSession=relation.sparkSession, file=file, filePath=filePath, isSplitable=isSplitable, maxSplitBytes=maxSplitBytes, partitionValues=partition.values ) } else { Seq.empty } } }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) valpartitions=FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) newFileScanRDD(fsRelation.sparkSession, readFile, partitions) }
可以看到确定最大切分分片大小maxSplitBytes对于后续切分为多少个文件非常重要,其核心逻辑如下
defmaxSplitBytes( sparkSession: SparkSession, selectedPartitions: Seq[PartitionDirectory]): Long= { // 读取文件时打包成最大的partition大小,默认为128MB,对应一个block大小valdefaultMaxSplitBytes=sparkSession.sessionState.conf.filesMaxPartitionBytes// 打开每个文件的开销,默认为4MBvalopenCostInBytes=sparkSession.sessionState.conf.filesOpenCostInBytes// 建议的(不保证)最小分割文件分区数,默认未设置,从leafNodeDefaultParallelism获取// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism -> SparkContext#defaultParallelism// -> TaskSchedulerImpl#defaultParallelism -> CoarseGrainedSchedulerBackend#defaultParallelism// -> 总共多少核max(executor core总和, 2),最少为2valminPartitionNum=sparkSession.sessionState.conf.filesMinPartitionNum .getOrElse(sparkSession.leafNodeDefaultParallelism) // 总共读取的大小valtotalBytes=selectedPartitions.flatMap(_.files.map(_.getLen+openCostInBytes)).sum// 单core读取的大小valbytesPerCore=totalBytes/minPartitionNum// 计算大小,不会超过设置的128MBMath.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) }
对于PartitionedFileUtil#splitFiles,其核心逻辑如下,较为简单,直接按照最大切分大小切分大文件来进行分片
defsplitFiles( sparkSession: SparkSession, file: FileStatus, filePath: Path, isSplitable: Boolean, maxSplitBytes: Long, partitionValues: InternalRow): Seq[PartitionedFile] = { if (isSplitable) { // 切分为多个分片 (0Luntilfile.getLenbymaxSplitBytes).map { offset=>valremaining=file.getLen-offsetvalsize=if (remaining>maxSplitBytes) maxSplitByteselseremainingvalhosts=getBlockHosts(getBlockLocations(file), offset, size) PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts) } } else { Seq(getPartitionedFile(file, filePath, partitionValues)) } }
在获取到Seq[PartitionedFile]列表后,还并没有完成对文件的切分,还需要调用FilePartition#getFilePartitions做最后的处理,方法核心逻辑如下
defgetFilePartitions( sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile], maxSplitBytes: Long): Seq[FilePartition] = { valpartitions=newArrayBuffer[FilePartition] valcurrentFiles=newArrayBuffer[PartitionedFile] varcurrentSize=0L/** Close the current partition and move to the next. */defclosePartition(): Unit= { if (currentFiles.nonEmpty) { // Copy to a new Array.// 重新生成一个新的PartitionFilevalnewPartition=FilePartition(partitions.size, currentFiles.toArray) partitions+=newPartition } currentFiles.clear() currentSize=0 } // 打开文件开销,默认为4MBvalopenCostInBytes=sparkSession.sessionState.conf.filesOpenCostInBytes// Assign files to partitions using "Next Fit Decreasing"partitionedFiles.foreach { file=>if (currentSize+file.length>maxSplitBytes) { // 如果累加的文件大小大于的最大切分大小,则关闭该分区,表示完成一个Task读取的数据切分closePartition() } // Add the given file to the current partition.currentSize+=file.length+openCostInBytescurrentFiles+=file } // 最后关闭一次分区,文件可能较小closePartition() partitions.toSeq }
可以看到经过这一步后,会把一些小文件做合并,生成maxSplitBytes大小的PartitionFile,这样可以避免拉起太多task读取太多小的文件。
生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的并发度为partitions的长度,也即最后Spark生成的Task个数
override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray
整体流程图如下图所示
拆分、合并过程如下图所示
实战
对于TPCH 10G生成的customer parquet表
共8个Parquet文件,总文件大小为113.918MB
Spark作业配置如下,executor只有1core
conf spark.driver.resourceSpec=small;conf spark.executor.instances=1;conf spark.executor.resourceSpec=small;conf spark.app.name=Spark SQL Test;conf spark.adb.connectors=oss;use tpcd;select*from customer orderby C_CUSTKEY desclimit100;
根据前面的公式计算
defaultMaxSplitBytes = 128MB openCostInBytes = 4MB minPartitionNum = max(1, 2) = 2 totalBytes = 113.918 + 8 * 4MB = 145.918MB bytesPerCore = 145.918MB / 2 = 72.959MB maxSplitBytes = 72.959MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
得到maxSplitBytes为72.959MB,从日志中也可看到对应大小
经过排序后的文件顺序为(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007),再次经过合并后得到3个FilePartitioned,分别对应
- FilePartitioned 1: 00000, 00001, 00002
- FilePartitioned 2: 00003, 00004, 00006
- FilePartitioned 3: 00005, 00007
即总共会生成3个Task
从Spark UI查看确实生成3个Task
从日志查看也是生成3个Task
变更Spark作业配置,5个executor共10core
conf spark.driver.resourceSpec=small;conf spark.executor.instances=5;conf spark.executor.resourceSpec=medium;conf spark.app.name=Spark SQL Test;conf spark.adb.connectors=oss;use tpcd;select*from customer orderby C_CUSTKEY desclimit100;
根据前面的公式计算
defaultMaxSplitBytes = 128MB openCostInBytes = 4MB minPartitionNum = max(10, 2) = 10 totalBytes = 113.918 + 8 * 4MB = 145.918MB bytesPerCore = 145.918MB / 10 = 14.5918MB maxSplitBytes = 14.5918MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
查看日志
此时可以看到14.5918MB会对源文件进行切分,会对00001, 00002,00003,00004,00005,00006进行切分,切分成两份,00007由于小于14.5918MB,因此不会进行切分,经过PartitionedFileUtil#splitFiles后,总共存在7 * 2 + 1 = 15个PartitionedFile
- 00000(0 -> 14.5918MB), 00000(14.5918MB -> 15.698MB)
- 00001(0 -> 14.5918MB), 00001(14.5918MB -> 15.632MB)
- 00002(0 -> 14.5918MB), 00002(14.5918MB -> 15.629MB)
- 00003(0 -> 14.5918MB), 00003(14.5918MB -> 15.624MB)
- 00004(0 -> 14.5918MB), 00004(14.5918MB -> 15.617MB)
- 00005(0 -> 14.5918MB), 00005(14.5918MB -> 15.536MB)
- 00006(0 -> 14.5918MB), 00006(14.5918MB -> 15.539MB)
- 00007(0 -> 4.634MB)
经过排序后得到如下以及合并后得到10个FilePartitioned,分别对应
- FilePartitioned 1: 00000(0 -> 14.5918MB)
- FilePartitioned 2: 00001(0 -> 14.5918MB)
- FilePartitioned 3: 00002(0 -> 14.5918MB)
- FilePartitioned 4: 00003(0 -> 14.5918MB)
- FilePartitioned 5: 00004(0 -> 14.5918MB)
- FilePartitioned 6: 00005(0 -> 14.5918MB)
- FilePartitioned 7: 00006(0 -> 14.5918MB)
- FilePartitioned 8: 00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)
- FilePartitioned 9: 00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)
- FilePartitioned 10: 00004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)
即总共会生成10个Task
通过Spark UI也可查看到生成了10个Task
查看日志,000004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)在同一个Task中
00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)
00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)在同一个Task中
总结
通过源码可知Spark对于源端Partition切分,会考虑到分区下所有文件大小以及打开每个文件的开销,同时会涉及对大文件的切分以及小文件的合并,最后得到一个相对合理的Partition。