Spark如何对源端数据做切分?

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 典型的Spark作业读取位于OSS的Parquet外表时,源端的并发度(task/partition)如何确定?特别是在做TPCH测试时有一些疑问,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行分析进而解答这些疑问。

引言

典型的Spark作业读取位于OSS的Parquet外表时,源端的并发度(task/partition)如何确定?特别是在做TPCH测试时有一些疑问,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行分析进而解答这些疑问。

分析

数据源读取对应的物理执行节点为FileSourceScanExec,读取数据代码块如下


lazyvalinputRDD: RDD[InternalRow] = {
valreadFile: (PartitionedFile) =>Iterator[InternalRow] =relation.fileFormat.buildReaderWithPartitionValues(
sparkSession=relation.sparkSession,
dataSchema=relation.dataSchema,
partitionSchema=relation.partitionSchema,
requiredSchema=requiredSchema,
filters=pushedDownFilters,
options=relation.options,
hadoopConf=relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
valreadRDD=if (bucketedScan) {
createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
relation)
    } else {
createReadRDD(readFile, dynamicallySelectedPartitions, relation)
    }
sendDriverMetrics()
readRDD  }

主要关注非bucket的处理,对于非bucket的扫描调用createReadRDD方法定义如下

/*** Create an RDD for non-bucketed reads.* The bucketed variant of this function is [[createBucketedReadRDD]].** @param readFile a function to read each (part of a) file.* @param selectedPartitions Hive-style partition that are part of the read.* @param fsRelation [[HadoopFsRelation]] associated with the read.*/privatedefcreateReadRDD(
readFile: (PartitionedFile) =>Iterator[InternalRow],
selectedPartitions: Array[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
// 文件打开开销,每次打开文件最少需要读取的字节    valopenCostInBytes=fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes// 最大切分分片大小valmaxSplitBytes=FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, "+s"open cost is considered as scanning $openCostInBytes bytes.")
// Filter files with bucket pruning if possiblevalbucketingEnabled=fsRelation.sparkSession.sessionState.conf.bucketingEnabledvalshouldProcess: Path=>Boolean=optionalBucketSetmatch {
caseSome(bucketSet) ifbucketingEnabled=>// Do not prune the file if bucket file name is invalidfilePath=>BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
case_=>_=>true    }
// 对分区下文件进行切分并按照从大到小进行排序valsplitFiles=selectedPartitions.flatMap { partition=>partition.files.flatMap { file=>// getPath() is very expensive so we only want to call it once in this block:valfilePath=file.getPathif (shouldProcess(filePath)) {
// 文件是否可split,parquet/orc/avro均可被splitvalisSplitable=relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
// 切分文件PartitionedFileUtil.splitFiles(
sparkSession=relation.sparkSession,
file=file,
filePath=filePath,
isSplitable=isSplitable,
maxSplitBytes=maxSplitBytes,
partitionValues=partition.values          )
        } else {
Seq.empty        }
      }
    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
valpartitions=FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
newFileScanRDD(fsRelation.sparkSession, readFile, partitions)
  }


可以看到确定最大切分分片大小maxSplitBytes对于后续切分为多少个文件非常重要,其核心逻辑如下

defmaxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Long= {
// 读取文件时打包成最大的partition大小,默认为128MB,对应一个block大小valdefaultMaxSplitBytes=sparkSession.sessionState.conf.filesMaxPartitionBytes// 打开每个文件的开销,默认为4MBvalopenCostInBytes=sparkSession.sessionState.conf.filesOpenCostInBytes// 建议的(不保证)最小分割文件分区数,默认未设置,从leafNodeDefaultParallelism获取// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism -> SparkContext#defaultParallelism// -> TaskSchedulerImpl#defaultParallelism -> CoarseGrainedSchedulerBackend#defaultParallelism// -> 总共多少核max(executor core总和, 2),最少为2valminPartitionNum=sparkSession.sessionState.conf.filesMinPartitionNum      .getOrElse(sparkSession.leafNodeDefaultParallelism)
// 总共读取的大小valtotalBytes=selectedPartitions.flatMap(_.files.map(_.getLen+openCostInBytes)).sum// 单core读取的大小valbytesPerCore=totalBytes/minPartitionNum// 计算大小,不会超过设置的128MBMath.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
  }

 

对于PartitionedFileUtil#splitFiles,其核心逻辑如下,较为简单,直接按照最大切分大小切分大文件来进行分片

defsplitFiles(
sparkSession: SparkSession,
file: FileStatus,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
if (isSplitable) {
// 切分为多个分片      (0Luntilfile.getLenbymaxSplitBytes).map { offset=>valremaining=file.getLen-offsetvalsize=if (remaining>maxSplitBytes) maxSplitByteselseremainingvalhosts=getBlockHosts(getBlockLocations(file), offset, size)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
      }
    } else {
Seq(getPartitionedFile(file, filePath, partitionValues))
    }
  }

在获取到Seq[PartitionedFile]列表后,还并没有完成对文件的切分,还需要调用FilePartition#getFilePartitions做最后的处理,方法核心逻辑如下

defgetFilePartitions(
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition] = {
valpartitions=newArrayBuffer[FilePartition]
valcurrentFiles=newArrayBuffer[PartitionedFile]
varcurrentSize=0L/** Close the current partition and move to the next. */defclosePartition(): Unit= {
if (currentFiles.nonEmpty) {
// Copy to a new Array.// 重新生成一个新的PartitionFilevalnewPartition=FilePartition(partitions.size, currentFiles.toArray)
partitions+=newPartition      }
currentFiles.clear()
currentSize=0    }
// 打开文件开销,默认为4MBvalopenCostInBytes=sparkSession.sessionState.conf.filesOpenCostInBytes// Assign files to partitions using "Next Fit Decreasing"partitionedFiles.foreach { file=>if (currentSize+file.length>maxSplitBytes) {
// 如果累加的文件大小大于的最大切分大小,则关闭该分区,表示完成一个Task读取的数据切分closePartition()
      }
// Add the given file to the current partition.currentSize+=file.length+openCostInBytescurrentFiles+=file    }
// 最后关闭一次分区,文件可能较小closePartition()
partitions.toSeq  }

可以看到经过这一步后,会把一些小文件做合并,生成maxSplitBytes大小的PartitionFile,这样可以避免拉起太多task读取太多小的文件。

生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的并发度为partitions的长度,也即最后Spark生成的Task个数

override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray

整体流程图如下图所示


拆分、合并过程如下图所示

image.png

实战

对于TPCH 10G生成的customer parquet表

https://oss.console.aliyun.com/bucket/oss-cn-hangzhou/fengzetest/object?path=rt_spark_test%2Fcustomer-parquet%2F


共8个Parquet文件,总文件大小为113.918MB


Spark作业配置如下,executor只有1core

conf spark.driver.resourceSpec=small;conf spark.executor.instances=1;conf spark.executor.resourceSpec=small;conf spark.app.name=Spark SQL Test;conf spark.adb.connectors=oss;use tpcd;select*from customer orderby C_CUSTKEY desclimit100;

根据前面的公式计算

defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(1, 2) = 2
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 2 = 72.959MB
maxSplitBytes = 72.959MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

得到maxSplitBytes为72.959MB,从日志中也可看到对应大小

经过排序后的文件顺序为(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007),再次经过合并后得到3个FilePartitioned,分别对应

  • FilePartitioned 1: 00000, 00001, 00002
  • FilePartitioned 2: 00003, 00004, 00006
  • FilePartitioned 3: 00005, 00007

即总共会生成3个Task

从Spark UI查看确实生成3个Task

从日志查看也是生成3个Task

变更Spark作业配置,5个executor共10core

conf spark.driver.resourceSpec=small;conf spark.executor.instances=5;conf spark.executor.resourceSpec=medium;conf spark.app.name=Spark SQL Test;conf spark.adb.connectors=oss;use tpcd;select*from customer orderby C_CUSTKEY desclimit100;

根据前面的公式计算

defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(10, 2) = 10
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 10 = 14.5918MB
maxSplitBytes = 14.5918MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

查看日志

此时可以看到14.5918MB会对源文件进行切分,会对00001, 00002,00003,00004,00005,00006进行切分,切分成两份,00007由于小于14.5918MB,因此不会进行切分,经过PartitionedFileUtil#splitFiles后,总共存在7 * 2 + 1 = 15个PartitionedFile

  • 00000(0 -> 14.5918MB), 00000(14.5918MB -> 15.698MB)
  • 00001(0 -> 14.5918MB), 00001(14.5918MB -> 15.632MB)
  • 00002(0 -> 14.5918MB), 00002(14.5918MB -> 15.629MB)
  • 00003(0 -> 14.5918MB), 00003(14.5918MB -> 15.624MB)
  • 00004(0 -> 14.5918MB), 00004(14.5918MB -> 15.617MB)
  • 00005(0 -> 14.5918MB), 00005(14.5918MB -> 15.536MB)
  • 00006(0 -> 14.5918MB), 00006(14.5918MB -> 15.539MB)
  • 00007(0 -> 4.634MB)

经过排序后得到如下以及合并后得到10个FilePartitioned,分别对应

  • FilePartitioned 1: 00000(0 -> 14.5918MB)
  • FilePartitioned 2: 00001(0 -> 14.5918MB)
  • FilePartitioned 3: 00002(0 -> 14.5918MB)
  • FilePartitioned 4: 00003(0 -> 14.5918MB)
  • FilePartitioned 5: 00004(0 -> 14.5918MB)
  • FilePartitioned 6: 00005(0 -> 14.5918MB)
  • FilePartitioned 7: 00006(0 -> 14.5918MB)
  • FilePartitioned 8: 00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)
  • FilePartitioned 9: 00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)
  • FilePartitioned 10: 00004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)


即总共会生成10个Task

通过Spark UI也可查看到生成了10个Task

查看日志,000004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)在同一个Task中

00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)


00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)在同一个Task中


总结

通过源码可知Spark对于源端Partition切分,会考虑到分区下所有文件大小以及打开每个文件的开销,同时会涉及对大文件的切分以及小文件的合并,最后得到一个相对合理的Partition。

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
5天前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
9天前
|
SQL 分布式计算 NoSQL
使用Spark高效将数据从Hive写入Redis (功能最全)
使用Spark高效将数据从Hive写入Redis (功能最全)
|
26天前
|
SQL 分布式计算 关系型数据库
使用 Spark 抽取 MySQL 数据到 Hive 时某列字段值出现异常(字段错位)
在 MySQL 的 `order_info` 表中,包含 `order_id` 等5个字段,主要存储订单信息。执行按 `create_time` 降序的查询,显示了部分结果。在 Hive 中复制此表结构时,所有字段除 `order_id` 外设为 `string` 类型,并添加了 `etl_date` 分区字段。然而,由于使用逗号作为字段分隔符,当 `address` 字段含逗号时,数据写入 Hive 出现错位,导致 `create_time` 值变为中文字符串。问题解决方法包括更换字段分隔符或使用 Hive 默认分隔符 `\u0001`。此案例提醒在建表时需谨慎选择字段分隔符。
|
26天前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
|
26天前
|
机器学习/深度学习 分布式计算 算法
【机器学习】Spark ML 对数据特征进行 One-Hot 编码
One-Hot 编码是机器学习中将离散特征转换为数值表示的方法,每个取值映射为一个二进制向量,常用于避免特征间大小关系影响模型。Spark ML 提供 OneHotEncoder 进行编码,输入输出列可通过 `inputCol` 和 `outputCol` 参数设置。在示例中,先用 StringIndexer 对类别特征编码,再用 OneHotEncoder 转换,最后展示编码结果。注意 One-Hot 编码可能导致高维问题,可结合实际情况选择编码方式。
|
9天前
|
分布式计算 定位技术 Scala
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
|
9天前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
|
1月前
|
分布式计算 Java 关系型数据库
|
1月前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
1月前
|
新零售 分布式计算 数据可视化
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析