Hive引擎Spark优化配置参数2

简介: 上一篇内容讲了资源参数优化,本篇继续说明spark driver以及spark shuffle相关的参数调优。

扩展spark driver

动态资源分配

在 Facebook,Spark 集群启用了动态资源分配(Dynamic Executor Allocation),以便更好的使用集群资源,而且在 Facebook 内部,Spark 是运行在多租户的集群上,所以这个也是非常合适的。比如典型的配置如下:

spark.dynamicAllocation.enabled = true
spark.dynamicAllocation.executorIdleTimeout = 2m
spark.dynamicAllocation.minExecutors = 1
spark.dynamicAllocation.maxExecutors = 2000

多线程事件处理

在 Spark 2.3 版本之前,事件处理是单线程的架构,也就是说,事件队列里面的事件得一个一个处理。如果你的作业很大,并且有很多 tasks,很可能会导致事件处理出现延迟,进一步导致作业性能出现问题,甚至使当前作业失败。为了解决这个问题,SPARK-18838 这个 ISSUE 引入了多线程事件处理架构,每个事件都有其单独的单线程 executor service 去处理,这样就可以大大减少事件处理延时的问题。另外,由于每类事件都有单独的事件队列,所以会增加 Driver 端的内存占用。

更好的 Fetch 失败处理

在 Spark 2.3 版本之前,如果 Spark 探测到 fetch failure,那么它会把产生这个 shuffle 文件的 Executor 移除掉。但是如果这个 Executor 所在的机器有很多 Executor,而且是因为这台机器挂掉导致 fetch failure,那么会导致很多的 fetch 重试,这种处理机制很低下。SPARK-19753 这个 ISSUE 使得 Spark 可以把上述场景所有 Executor 的 shuffle 文件移除,也就是不再去重试就知道 shuffle 文件不可用。

另外,Spark 最大 Fetch 重试次数也可以通过 spark.max.fetch.failures.per.stage 参数配置。

FetchFailed 会在 ShuffleReader 取数据失败 N 次后抛出,然后由 executor 通过 statusUpdate 传到 driver 端,实际的处理会在 DAGScheduler.handleTaskCompletion,它会重新提交该 Stage 和该 Stage 对应的 ShuffleMapStage,重试次数超过 spark.stage.maxConsecutiveAttempts 时会退出。

RPC 服务线程调优

当 Spark 同时运行大量的 tasks 时,Driver 很容易出现 OOM,这是因为在 Driver 端的 Netty 服务器上产生大量 RPC 的请求积压,我们可以通过加大 RPC 服务的线程数解决 OOM 问题,比如 spark.rpc.io.serverThreads = 64。

spark shuffle相关的参数调优

**spark.shuffle.file.buffer
**
默认值:32k

参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

默认值:48m

参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

**spark.shuffle.io.maxRetries
**
默认值:3

参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

默认值:5s

参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。

调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

默认值:0.2

参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。

调优建议:如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

默认值:sort

参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

默认值:200

参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

默认值:false

参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

spark.reducer.maxBlocksInFlightPerAddress

默认值:Int.MaxValue(2的31次方-1)

限制了每个主机每次reduce可以被多少台远程主机拉取文件块,调低这个参数可以有效减轻node manager的负载。

**spark.reducer.maxReqsInFlight
**
默认值:Int.MaxValue(2的31次方-1)

限制远程机器拉取本机器文件块的请求数,随着集群增大,需要对此做出限制。否则可能会使本机负载过大而挂掉。。

spark.reducer.maxReqSizeShuffleToMem

默认值:Long.MaxValue

shuffle请求的文件块大小 超过这个参数值,就会被强行落盘,防止一大堆并发请求把内存占满。

spark.shuffle.compress

默认压缩 true

是否压缩map输出文件

spark.shuffle.spill.compress

默认:true

shuffle过程中溢出的文件是否压缩,使用spark.io.compression.codec压缩。

了解更多
大数据运维服务

目录
相关文章
|
6天前
|
分布式计算 并行计算 数据处理
|
1天前
|
分布式计算 DataWorks Java
DataWorks产品使用合集之如何引用在spark jar中引用密文的空间参数
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 分布式计算 NoSQL
使用Spark高效将数据从Hive写入Redis (功能最全)
使用Spark高效将数据从Hive写入Redis (功能最全)
156 1
|
2月前
|
缓存 分布式计算 监控
Spark 优化方案
Spark 优化方案
|
3月前
|
SQL 分布式计算 关系型数据库
使用 Spark 抽取 MySQL 数据到 Hive 时某列字段值出现异常(字段错位)
在 MySQL 的 `order_info` 表中,包含 `order_id` 等5个字段,主要存储订单信息。执行按 `create_time` 降序的查询,显示了部分结果。在 Hive 中复制此表结构时,所有字段除 `order_id` 外设为 `string` 类型,并添加了 `etl_date` 分区字段。然而,由于使用逗号作为字段分隔符,当 `address` 字段含逗号时,数据写入 Hive 出现错位,导致 `create_time` 值变为中文字符串。问题解决方法包括更换字段分隔符或使用 Hive 默认分隔符 `\u0001`。此案例提醒在建表时需谨慎选择字段分隔符。
|
2月前
|
SQL 资源调度 数据库连接
Hive怎么调整优化Tez引擎的查询?在Tez上优化Hive查询的指南
在Tez上优化Hive查询,包括配置参数调整、理解并行化机制以及容器管理。关键步骤包括YARN调度器配置、安全阀设置、识别性能瓶颈(如mapper/reducer任务和连接操作),理解Tez如何动态调整mapper和reducer数量。例如,`tez.grouping.max-size` 影响mapper数量,`hive.exec.reducers.bytes.per.reducer` 控制reducer数量。调整并发和容器复用参数如`hive.server2.tez.sessions.per.default.queue` 和 `tez.am.container.reuse.enabled`
49 0
|
3月前
|
SQL 存储 大数据
Hive的查询、数据加载和交换、聚合、排序、优化
Hive的查询、数据加载和交换、聚合、排序、优化
82 2
|
3月前
|
分布式计算 Scala Spark
Spark参数解析之MasterArguments
Spark参数解析之MasterArguments
28 0
|
3月前
|
SQL 分布式计算 资源调度
一文看懂 Hive 优化大全(参数配置、语法优化)
以下是对提供的内容的摘要,总长度为240个字符: 在Hadoop集群中,服务器环境包括3台机器,分别运行不同的服务,如NodeManager、DataNode、NameNode等。集群组件版本包括jdk 1.8、mysql 5.7、hadoop 3.1.3和hive 3.1.2。文章讨论了YARN的配置优化,如`yarn.nodemanager.resource.memory-mb`、`yarn.nodemanager.vmem-check-enabled`和`hive.map.aggr`等参数,以及Map-Side聚合优化、Map Join和Bucket Map Join。