Spark性能优化指南—思路梳理

简介: Spark性能优化指南—思路梳理

前言

Spark作业的优化其实是泛的话题,因为往往有时候表现出来都是慢,但是解法却不一样,我想把优化的方方面盘点出来,以便系统性地去制定整体的优化方案。

优化思路梳理

到底怎样去看待所谓慢的问题呢,我做了一个整理:

主题
资源优化
并行度优化
代码优化
Shuffle优化
内存优化
堆外内存优化
数据倾斜处理
读写介质优化

资源优化

绝大部分作业变慢其实就是资源吃紧导致的,这就是为什么啥都没变怎么就慢了呢,去查问题的时候又查不出个所以然来。换句话来说绝大部分作业其实加资源可以解决问题,甚至有些其他问题的时候加了资源也可以抗过去。资源优化涉及两块,一块是集群的资源优化,一个是作业内部的资源分配。

集群资源优化

1.资源模式选择 Standalone or Yarn,大部分情况在实际看到的都是Yarn的模式,这个是因为Yarn在整个企业承担着统一分配资源的任务,历史上大部分Spark作业是Hive切换过来的,Yarn的调度方式是比较合理的,但是Yarn其实分配需要增加调度的开销,在生产上,涉及到高频的调度,需要去掉Yarn的申请资源延迟,往往是单独搭建一个Standalone环境的。

资源使用的上限是在配置中决定的,在分配资源初期需要做一次压测,保证分配的资源没有超配;

Standalone中的资源配置:

SPARK_WORKER_CORES
SPARK_WORKER_MEMORY

Yarn中的资源配置:

yarn.nodemanager.resource.cpu-vcores
yarn.nodemanager.resource.memory-mb

2.提交任务时候的资源分配,这个在提交的时候指定参数

./spark-submit 
--master spark://xxx  
--executor-cores xxx 
--executor-memory xx 

–executor-cores : 启动一个Executor使用多少core

–executor-memory : 启动一个Executor使用多少内存

–total-executor-cores : 启动一个Application一共使用多少core -standalone集群下

–num-executor : 指定启动一个Application 启动多少executor – yarn集群

建议以上参数设置在提交任务命令中

另外,Spark作业其实是支持动态参数调整的,在做作业测试的时候建议暂时关闭,真正上线之后开启即可

spark.dynamicAllocation.enabled

并行度优化

并行度优化其实是有两个点,一个是并行度过低,带来大量的慢task以及shuffle溢出,一个是并行度过高,带来大量小任务,资源消耗巨大,调度成本高导致还是慢。主要通过编码时候参赛控制即可,常见的控制点在控制rdd的并行参数上:

sc.textFile(xx,minnumpartitions)
sc.parallelize(xx,num)
sc.markRDD(xx,num)
sc.parallelizePairs(List[Tuple2<String,String>],num)
reduceByKey(xx,num),distinct(xx,num),join(xx,num),groupByKey(xx,num),repartition(num)
spark.default.parallelism 调整默认的并行度
spark.sql.shuffle.partitions = 200 
自定义分区器
SparkStreaming中 Direct模式 : 与读取的topic的分区数保持一致

代码优化

这个就是Spark程序员要的基本功了,也是有规律可循:

RDD复用

我们都知道rdd是一连串血缘操作计算出来的,所谓的复用其实就是不要重复去做计算,自然会减少消耗,一般是下面两招:

1.尽量复用同一个RDD,避免创建重复的RDD

2.对常用的RDD进行持久化,这样子下次重复计算直接从已经计算好的结果拉取数据了,常用的代码如下:

cache() = persist() = persist(StorageLevel.MEMORY_ONLY)
persist策略枚举:
  MEMORY_ONLY
  MEMORY_AND_DISK
  MEMORY_ONLY_SER
  MEMORY_AND_DISK_SER

尽量避免使用shuffle算子

Shuffle本身带来消耗巨大,我们其实尽量不进行shuffle,常见的手段主要是使用map 类的算子 + 广播变量代替 join

使用map端有预聚合的操作

使用map端预聚合有以下好处

1.减少map端的shuffle数据量

2.减少reduce端读取的数据量

3.减少 reduce端 聚合次数

但是也需要保障计算结果一致性的前提,常见算子:

reduceByKey
  aggreagateByKey
  combineByKey

使用高性能的算子

在保存数据或者插入数据库中数据时,可以有以下操作:

1.使用mapPartition代替map,这样子其实可以按照 partition为单位批量写入,减少jdbc链接次数

2.foreachPartition代替foreach,也是批量操作

3.对于大量小文件处理,可以先使用coalesce来减少分区

4.对与大量的数据过滤之后可以考虑使用coalesce减少分区

5.对与数据量多,分区少情况下的数据可以使用repartition来增多分区

6.使用reduceByKey 代替groupByKey

使用广播变量

当Exeutor端使用到Driver端的变量时,可以使用广播变量来减少Executor端内存的占用

如果不使用广播变量,那每个Executor中有多少task就有多少Driver端的变量副本

注意

1.Executor端的内存需要够用

2.广播变量不能在Executor中修改

使用Kryo序列化

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");//设置序列化方式
conf.registerKryoClasses(new Class[]{_KryoBean.class});//注册使用kryo序列化的类

1.RDD[自定义类型]

2.数据持久化 : StorageLevel.MEMORY_ADK_DISK_SER

3.task 节点之间发送

注意问题: 如果一个类使用了java序列化就不能使用kryo序列化了

优化数据结构

数据结构的优化其实就是为了减少存储,举例说明int 类型0 和1 和字符串的"0","1"空间其实差异很大的,在做序列化传输的时候成本同时上去了,所以数据结构的定义也有以下建议:

1.尽量使用原生数据类型代替字符串

2.尽量使用字符串代替对象

3.尽量使用数组代替Map集合

总之,代码优化的层面其实就是性能的直接体现,目标就是高效,,减少内存使用、减少shuffle,减少节点之间数据传输,最优的方式达到需要的结果。

Shuffle优化

这里提到的Shuffle是说在shuffle过程中,我们还可以做一些调整,需要跟踪shuffe的元数据来进行调整:

spark.reducer.maxSizeInFlight  48M shuffle一次拉取数据的缓存
spark.shuffle.io.maxRetries 3 : shuffle 拉取数据task 失败重试次数
spark.shuffle.io.retryWait 5s : shuffle拉取数据task 重试等待间隔
spark.shuffle.sort.bypassMergeThreshold : 200 ,bypass机制开启的条件之一,另一个使用bypass条件是map端不能有预聚合

内存优化

Spark一直号称内存计算,但是实际上来说数据体量大的时候内存也成为一个负担,内存的合理使用才能达到最优,相反的话会开始使用磁盘进行数据交换,整体性能下降,内存分配有以下原则:

1.task运行内存多一些,减少磁盘溢出

2.合理调整Spark内存分布

静态内存分布–过去的使用方式,导致内存不能合理使用,造成很大程度的浪费

统一内存分布–2.x之后的方式,目的是为了统一内存的分配

3. 堆外内存调整 spark.executor.memoryOverhead=2048M

举例说明如下:

总共300M预留,会按照如下配置进行分配
      (总-300M) * 0.6 -- spark.memory.fraction
             0.5 : RDD缓存和广播变量  --spark.memory.storageFraction 
             0.5 : shuffle聚合内存
      (总-300M) * 0.4  task运行内存

数据倾斜处理

数据倾斜出现有以下的场景:

MapReduce :reduce task处理的数据相对于其他 task来说处理的数据量多 – shuffle导致

Hive :Hive中某列下对应的相同key非常多,这张Hive表有数据倾斜

Spark : Spark rdd中某个分区的数据相对于其他分区来说数据量多–shuffle导致

解决方法

1.使用Hive ETL预处理
2.过滤少数倾斜的key
3.增加并行度
  场景:不同的key多,分区少,可以直接增加并行度
4.双重聚合
  场景:相同的key多,分区少
  解决思路:将key 打散(随机加前缀),聚合,再去前缀,再聚合
5.使用map join 代替reduce join 
  场景:两个RDD要join, 一个RDD大,有数据倾斜,一个RDD小
  解决方式:可以考虑直接将小的RDD回收广播,对有数据倾斜的RDD直接使用map类的算子操作
6.找出倾斜key ,分拆join
  场景:两个RDD要join,一个RDD大,少量的key有数据倾斜,另一个RDD小,但是无法采用第五种方案
  解决方式:找出倾斜的key,分成倾斜的RDD与不倾斜的RDD,倾斜RDD中,一个随机加前缀一个膨胀处理,正常RDD正常join,最后结果union一起
7.使用随机前缀和扩容RDD进行join
  场景:两个RDD要join,一个RDD大,大量的key有数据倾斜,另一个RDD小,无法采用第五种方案解决
  解决:直接对RDD进行随机加前缀与膨胀处理 

存储介质优化

对于spark内存计算来说,存储的rpc请求往往是瓶颈,在spark的作业中其实目标就是优化读取速度,我们会按照不同的读写频率存储,我们生产实践中把数据直接cache内存,用的就是alluxio,存储的优化归纳如下:

StoragePolicies
    存储类型
      ARCHIVE
      DISK
      SSD
      RAM_DISK
    存储策略
      hot
      Cold
      Warm

资源隔离

这点在生产上体会比较多,那就是我们需要发现热点节点,经常出的问题就是在某一台的节点上一直task很慢,这种时候需要去怀疑这台机器了,热点的数据,长任务,高io都会影响,这种时候可以考虑做一些资源隔离的方案,yarn的node-lables进行打标处理,做到在大数据层面的灰度机制。

yarn.scheduler.capacity.root.accessible-node-labels=*
yarn.scheduler.capacity.root.xxxx.accessible-node-labels=label_xx
yarn.scheduler.capacity.root.yyyy.accessible-node-labels=label_xx,label_yy
yarn.scheduler.capacity.root.default.default-node-label-expression=
yarn.scheduler.capacity.root.xxxx.default-node-label-expression=label_xx
yarn.scheduler.capacity.root.yyyy.default-node-label-expression=label_yy

后记

原始的RDD级别的优化其实大家接触不会太多,就其原因是这个难度其实是比较大的,引擎级别也在不断改进,到了SparkSQL的时代,多了很多智能化的调整机制,例如AQE等,但是spark内核级别的调整有更加广泛的适应性,期待能带来一些有效的帮助。

目录
相关文章
|
2月前
|
分布式计算 算法 Spark
Spark中的性能优化有哪些方法?请举例说明
Spark中的性能优化有哪些方法?请举例说明
45 1
|
4天前
|
分布式计算 大数据 数据处理
深度解密Spark性能优化之道
课程通过实战案例解析和性能调优技巧的讲解,帮助学员提升大数据处理系统的性能和效率。课程内容涵盖了Spark性能调优的各个方面,包括内存管理、并行度设置、数据倾斜处理、Shuffle调优、资源配置等关键技术和策略。学员将通过实际案例的演示和分析,掌握解决Spark应用性能问题的方法和技巧,从而提升数据处理效率,优化应用性能。无论您是初学者还是有一定经验的大数据工程师,本课程都将为您提供宝贵的实战经验和实用技能,助您成为Spark性能调优的专家。
16 7
深度解密Spark性能优化之道
|
2月前
|
存储 分布式计算 资源调度
Spark性能优化之SparkUI
Spark性能优化之SparkUI
41 0
|
2月前
|
存储 SQL 分布式计算
性能优化:Spark SQL中的谓词下推和列式存储
性能优化:Spark SQL中的谓词下推和列式存储
|
2月前
|
缓存 分布式计算 监控
Spark RDD操作性能优化技巧
Spark RDD操作性能优化技巧
|
SQL 存储 机器学习/深度学习
基于英特尔® 优化分析包(OAP)的 Spark 性能优化方案
Spark SQL 作为 Spark 用来处理结构化数据的一个基本模块,已经成为多数企业构建大数据应用的重要选择。但是,在大规模连接(Join)、聚合(Aggregate)等工作负载下,Spark 性能会面临稳定性和性能方面的挑战。
基于英特尔® 优化分析包(OAP)的 Spark 性能优化方案
EMR Spark Runtime Filter性能优化 | 7月5号云栖夜读
今天的首篇文章,讲述了:Join是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描/shuffle/Join等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等),在同样的资源的情况下可以支撑更多的查询。
3888 0
|
存储 SQL 分布式计算
EMR Spark Runtime Filter性能优化
Join是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描/shuffle/Join等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等),在同样的资源的情况下可以支撑更多的查询。
|
存储 SQL 分布式计算
EMR Spark Runtime Filter性能优化
Join是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描/shuffle/Join等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等),在同样的资源的情况下可以支撑更多的查询。
5208 0