sparksql 参数调优

简介: sparksql 参数调优

异常调优

兼容性问题与规避方案

spark.sql.hive.convertMetastoreParquet
parquet是一种列式存储格式,可以用于spark-sql 和hive 的存储格式。在spark中,如果使用using parqeut的形式创建表,则创建的是spark 的DataSource表;而如果使用stored as parquet则创建的是hive表。
默认设置是true, 它代表使用spark-sql内置的parquet的reader和writer(即进行反序列化和序列化),它具有更好地性能,如果设置为false,则代表使用 Hive的序列化方式。
但是有时候当其设置为true时,会出现使用hive查询表有数据,而使用spark查询为空的情况.

但是,有些情况下在将spark.sql.hive.convertMetastoreParquet设为false,可能发生以下异常(spark-2.3.2)。这是因为在其为false时候,是使用hive-metastore使用的元数据进行读取数据,而如果此表是使用spark sql DataSource创建的parquet表,其数据类型可能出现不一致的情况,例如通过metaStore读取到的是IntWritable类型,其创建了一个WritableIntObjectInspector用来解析数据,而实际上value是LongWritable类型,因此出现了类型转换异常。

与该参数相关的一个参数是:

spark.sql.hive.convertMetastoreParquet.mergeSchema, 如果也是true,那么将会尝试合并各个parquet 文件的schema,以使得产生一个兼容所有parquet文件的schema.

精度丢失问题

spark.sql.decimalOperations.allowPrecisionLoss参数。
当该参数为true(默认),表示允许丢失精度,会根据Hive行为和SQL ANSI 2011规范来决定result的类型,即如果无法精确的表示,则舍入结果的小数部分。
当该参数为false时,代表不允许丢失精度,这样会将数据表示的更加精确。
spark.sql. decimalOperations.nullOnOverflow
对于Decimal类型,由于其整数部分位数是(precision-scale),因此该类型能表示的范围是有限的,一旦超出这个范围,就会发生Overflow。而在Spark中,如果Decimal计算发生了Overflow,就会默认返回Null值。
引入了参数spark.sql. decimalOperations.nullOnOverflow 用来控制在Decimal Operation 发生Overflow时候的处理方式。

遇到表路径下的文件缺失/corrupt 异常

spark.sql.files.ignoreMissingFiles && spark.sql.files.ignoreCorruptFiles
这两个参数是只有在进行spark DataSource 表查询的时候才有效,如果是对hive表进行操作是无效的。
在进行spark DataSource 表查询时候,可能会遇到非分区表中的文件缺失/corrupt 或者分区表分区路径下的文件缺失/corrupt 异常,这时候加这两个参数会忽略这两个异常,这两个参数默认都是false,建议在线上可以都设为true.

其源码逻辑如下,简单描述就是如果遇到FileNotFoundException, 如果设置了ignoreMissingFiles=true则忽略异常,否则抛出异常;如果不是FileNotFoundException 而是IOException(FileNotFoundException的父类)或者RuntimeException,则认为文件损坏,如果设置了ignoreCorruptFiles=true则忽略异常。
分区路径下面的文件不存在或者损坏的处理
上面的两个参数在分区表情况下是针对分区路径存在的情况下,分区路径下面的文件不存在或者损坏的处理。而有另一种情况就是这个分区路径都不存在了。这时候异常信息如下:

spark.sql.hive.verifyPartitionPath
参数默认是false,当设置为true的时候会在获得分区路径时对分区路径是否存在做一个校验,过滤掉不存在的分区路径,这样就会避免上面的错误。
spark.files.ignoreCorruptFiles && spark.files.ignoreMissingFiles
这两个参数和上面的spark.sql.files.ignoreCorruptFiles很像,但是区别是很大的。在spark进行DataSource表查询时候spark.sq.files.*才会生效,而spark如果查询的是一张hive表,其会走HadoopRDD这条执行路线。

所以就会出现,即使你设置了spark.sql.files.ignoreMissingFiles的情况下,仍然报FileNotFoundException的情况,异常栈如下, 可以看到这里面走到了HadoopRDD,而且后面是org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrappe可见是查询一张hive表。此时可以将spark.files.ignoreCorruptFiles && spark.files.ignoreMissingFiles设为true,其代码逻辑和上面的spark.sql.file.*逻辑没明显区别,此处不再赘述.

spark shuffle几个常用相关参数

spark.shuffle.file.buffer
default: 32k
shuffle write端写磁盘文件时缓冲区大小,适量增大可以减少磁盘I/O次数,进而提升性能。
spark.reducer.maxSizeInFlight
default: 48M
shuffle read端拉取对应分区数据缓冲区大小,适量增大可以减少网络传输次数,进而提升性能。
spark.shuffle.io.maxRetries
default: 3
shuffle read端拉取对应数据时,因为网络异常拉取失败重新尝试的最大次数。针对超大数据量的应用,可以增大重试次数,大幅度提升稳定性。
spark.maxRemoteBlockSizeFetchToMem
代表着可以从远端拉取数据放入内存的最大size.这个参数作用是在reduce task读取map task block时放入内存中的最大值;默认是没有限制全放内存
spark.shuffle.sort.bypassMergeThreshold 
map端不进行排序的分区阈值
spark.shuffle.io.retryWait
重试2次的最大等待时间
spark.shuffle.spill.compress=true
默认使用spark.io.compression.codec
注:根据不同的spark版本有不同的个别shuffle配置如spark.maxRemoteBlockSizeFetchToMem等,根据不同的spark版本,查询对应的功能,如果详细查看逻辑,查看源码

sparksql CBO相关几个参数

性能调优

除了遇到异常需要被动调整参数之外,我们还可以主动调整参数从而对性能进行调优。

spark.hadoopRDD.ignoreEmptySplits
默认是false,如果是true,则会忽略那些空的splits,减小task的数量。
spark.hadoop.mapreduce.input.fileinputformat.split.minsize
是用于聚合input的小文件,用于控制每个mapTask的输入文件,防止小文件过多时候,产生太多的task.
spark.sql.autoBroadcastJoinThreshold && spark.sql.broadcastTimeout
用于控制在spark sql中使用BroadcastJoin时候表的大小阈值,适当增大可以让一些表走BroadcastJoin,提升性能,但是如果设置太大又会造成driver内存压力,而broadcastTimeout是用于控制Broadcast的Future的超时时间,默认是300s,可根据需求进行调整。
spark.sql.adaptive.enabled && spark.sql.adaptive.shuffle.targetPostShuffleInputSize
该参数是用于开启spark的自适应执行,这是spark比较老版本的自适应执行,后面的targetPostShuffleInputSize是用于控制之后的shuffle 阶段的平均输入数据大小,防止产生过多的task。
intel大数据团队开发的adaptive-execution相较于目前spark的ae更加实用,该特性也已经加入到社区3.0之后的roadMap中,令人期待。
spark.sql.parquet.mergeSchema
默认false。当设为true,parquet会聚合所有parquet文件的schema,否则是直接读取parquet summary文件,或者在没有parquet summary文件时候随机选择一个文件的schema作为最终的schema。
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
1或者2,默认是1. MapReduce-4815 详细介绍了 fileoutputcommitter 的原理,实践中设置了 version=2 的比默认 version=1 的减少了70%以上的 commit 时间,但是1更健壮,能处理一些情况下的异常。
spark.sql.files.maxPartitionBytes
默认128MB,单个分区读取的最大文件大小

spark AQE相关:

spark AQE 自动分区合并
1.spark.sql.adaptive.enabled:Spark3.0 AQE开关,默认是false,如果要使用AQE功能,得先设置为true。
2.spark.sql.adaptive.coalescePartitions.enabled:动态缩小分区参数,默认值是true,但是得先保证spark.sql.adaptive.enabled为true。
3.spark.sql.adaptive.coalescePartitions.initialPartitionNum:任务刚启动时的初始分区,此参数可以设置了大点,默认值与spark.sql.shuffle.partition一样为200。
4.spark.sql.adaptive.coalescePartitions.minPartitionNum:进行动态缩小分区,最小缩小至多少分区,最终分区数不会小于此参数。
5.spark.sql.adaptive.advisoryPartitionSizeInBytes:缩小分区或进行拆分分区操作后所期望的每个分区的大小(数据量)。
spark AQE 动态join
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.join.enabled 是否开启AQE 动态join优化
spark.sql.adaptive.localShuffleReader.enabled:在不需要进行shuffle重分区时,尝试使用本地shuffle读取器。将sort-meger join 转换为广播join
spark.sql.autoBroadcastJoinThreshold 中间文件尺寸总和小于广播阈值
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 空文件占比小于配置项
spark AQE 自动倾斜处理
1.spark.sql.adaptive.enabled :开启aqe功能,默认关闭
2.spark.sql.adaptive.skewJoin.enabled:开启aqe倾斜join,需要先将spark.sql.adaptive.enabled设置为true。
3.spark.sql.adaptive.skewJoin.skewedPartitionFactor :倾斜因子,如果分区的数据量大于 此因子乘以分区的中位数,并且也大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,那么认为是数据倾斜的,默认值为5
4.spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:每个分区的阀值,默认256mb,此参数应该大于spark.sql.adaptive.advisoryPartitionSizeInBytes
5.spark.sql.adaptive.advisoryPartitionSizeInBytes:缩小分区或进行拆分分区操作后所期望的每个分区的大小(数据量)。
spark AQE、动态申请资源: 当计算过程中资源不足会自动申请资源
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.dynamicAllocation.enabled: 是否开启动态资源申请
spark.dynamicAllocation.shuffleTracking.enabled: 是否开启shuffle状态跟踪。为执行程序启用随机文件跟踪,从而无需外部随机服务即可动态分配。 此选项将尝试保持为活动作业存储随机数据的执行程序。
spark.dynamicAllocation.shuffleTracking.timeout  启用随机跟踪时,控制保存随机数据的执行程序的超时。 默认值意味着Spark将依靠垃圾回收中的shuffle来释放执行程序。 如果由于某种原因垃圾回收无法足够快地清理随机数据,则此选项可用于控制执行者何时超时,即使它们正在存储随机数据。

Spark Sql 参数表(spark-2.3.2)

相关文章
|
9月前
|
消息中间件 缓存 分布式计算
如何调优Spark Steraming
如何调优Spark Steraming
109 1
|
分布式计算 编译器 Spark
Spark中combineByKey算子使用出现报错
Spark中combineByKey算子使用出现报错
88 0
Spark中combineByKey算子使用出现报错
|
SQL 缓存 分布式计算
PySpark数据分析基础:pyspark.sql.SparkSession类方法详解及操作+代码展示
PySpark数据分析基础:pyspark.sql.SparkSession类方法详解及操作+代码展示
586 0
PySpark数据分析基础:pyspark.sql.SparkSession类方法详解及操作+代码展示
|
分布式计算 Java Spark
Spark面试题(八)——Spark的Shuffle配置调优
对Spark的Shuffle配置调优建议。
436 0
|
缓存 分布式计算 资源调度
Spark面试题(六)——Spark资源调优
Spark资源调优的方法。
238 0
Spark面试题(六)——Spark资源调优
|
分布式计算 资源调度 DataWorks
MaxComputeSpark Spark 与 SparkSQL 对比分析及使用注意事项 | 学习笔记
快速学习 MaxComputeSpark Spark 与 SparkSQL 对比分析及使用注意事项
230 0
|
SQL 分布式计算 运维
技本功|Hive优化之Spark执行引擎参数调优(二)
影响Hive效率的主要有数据倾斜、数据冗余、job的IO以及不同底层引擎配置情况和Hive本身参数和HiveSQL的执行等因素。 本文主要结合实际业务情况,在使用Spark作为底层引擎时,通过一些常见的配置参数对报错任务进行调整优化,主要包含以下两个方面:
1111 0
技本功|Hive优化之Spark执行引擎参数调优(二)
|
消息中间件 缓存 分布式计算
Spark调优策略
在利用Spark处理数据时,如果数据量不大,那么Spark的默认配置基本就能满足实际的业务场景。但是当数据量大的时候,就需要做一定的参数配置调整和优化,以保证业务的安全、稳定的运行。并且在实际优化中,要考虑不同的场景,采取不同的优化策略。
如何使用X-Pack Spark的YarnUI、SparkUI、Spark日志、任务运行状况的分析
概述 X-Pack Spark目前是通过Yarn管理资源,在提交Spark 任务后我们经常需要知道任务的运行状况,例如在哪里看日志、怎么查看每个Executor的运行状态、每个task的运行状态,性能瓶颈点在哪里等信息。
3443 0
SparkSQL实践与优化
SQL实践:1.多数据源支持 2.多数据类型支持 3.多组件对接
2159 0