前言
Spark作业的优化其实是泛的话题,因为往往有时候表现出来都是慢,但是解法却不一样,我想把优化的方方面盘点出来,以便系统性地去制定整体的优化方案。
优化思路梳理
到底怎样去看待所谓慢的问题呢,我做了一个整理:
主题 |
资源优化 |
并行度优化 |
代码优化 |
Shuffle优化 |
内存优化 |
堆外内存优化 |
数据倾斜处理 |
读写介质优化 |
资源优化
绝大部分作业变慢其实就是资源吃紧导致的,这就是为什么啥都没变怎么就慢了呢,去查问题的时候又查不出个所以然来。换句话来说绝大部分作业其实加资源可以解决问题,甚至有些其他问题的时候加了资源也可以抗过去。资源优化涉及两块,一块是集群的资源优化,一个是作业内部的资源分配。
集群资源优化
1.资源模式选择 Standalone or Yarn,大部分情况在实际看到的都是Yarn的模式,这个是因为Yarn在整个企业承担着统一分配资源的任务,历史上大部分Spark作业是Hive切换过来的,Yarn的调度方式是比较合理的,但是Yarn其实分配需要增加调度的开销,在生产上,涉及到高频的调度,需要去掉Yarn的申请资源延迟,往往是单独搭建一个Standalone环境的。
资源使用的上限是在配置中决定的,在分配资源初期需要做一次压测,保证分配的资源没有超配;
Standalone中的资源配置:
SPARK_WORKER_CORES SPARK_WORKER_MEMORY
Yarn中的资源配置:
yarn.nodemanager.resource.cpu-vcores yarn.nodemanager.resource.memory-mb
2.提交任务时候的资源分配,这个在提交的时候指定参数
./spark-submit --master spark://xxx --executor-cores xxx --executor-memory xx
–executor-cores : 启动一个Executor使用多少core
–executor-memory : 启动一个Executor使用多少内存
–total-executor-cores : 启动一个Application一共使用多少core -standalone集群下
–num-executor : 指定启动一个Application 启动多少executor – yarn集群
建议以上参数设置在提交任务命令中
另外,Spark作业其实是支持动态参数调整的,在做作业测试的时候建议暂时关闭,真正上线之后开启即可
spark.dynamicAllocation.enabled
并行度优化
并行度优化其实是有两个点,一个是并行度过低,带来大量的慢task以及shuffle溢出,一个是并行度过高,带来大量小任务,资源消耗巨大,调度成本高导致还是慢。主要通过编码时候参赛控制即可,常见的控制点在控制rdd的并行参数上:
sc.textFile(xx,minnumpartitions) sc.parallelize(xx,num) sc.markRDD(xx,num) sc.parallelizePairs(List[Tuple2<String,String>],num) reduceByKey(xx,num),distinct(xx,num),join(xx,num),groupByKey(xx,num),repartition(num) spark.default.parallelism 调整默认的并行度 spark.sql.shuffle.partitions = 200 自定义分区器 SparkStreaming中 Direct模式 : 与读取的topic的分区数保持一致
代码优化
这个就是Spark程序员要的基本功了,也是有规律可循:
RDD复用
我们都知道rdd是一连串血缘操作计算出来的,所谓的复用其实就是不要重复去做计算,自然会减少消耗,一般是下面两招:
1.尽量复用同一个RDD,避免创建重复的RDD
2.对常用的RDD进行持久化,这样子下次重复计算直接从已经计算好的结果拉取数据了,常用的代码如下:
cache() = persist() = persist(StorageLevel.MEMORY_ONLY) persist策略枚举: MEMORY_ONLY MEMORY_AND_DISK MEMORY_ONLY_SER MEMORY_AND_DISK_SER
尽量避免使用shuffle算子
Shuffle本身带来消耗巨大,我们其实尽量不进行shuffle,常见的手段主要是使用map 类的算子 + 广播变量代替 join
使用map端有预聚合的操作
使用map端预聚合有以下好处
1.减少map端的shuffle数据量
2.减少reduce端读取的数据量
3.减少 reduce端 聚合次数
但是也需要保障计算结果一致性的前提,常见算子:
reduceByKey aggreagateByKey combineByKey
使用高性能的算子
在保存数据或者插入数据库中数据时,可以有以下操作:
1.使用mapPartition代替map,这样子其实可以按照 partition为单位批量写入,减少jdbc链接次数
2.foreachPartition代替foreach,也是批量操作
3.对于大量小文件处理,可以先使用coalesce来减少分区
4.对与大量的数据过滤之后可以考虑使用coalesce减少分区
5.对与数据量多,分区少情况下的数据可以使用repartition来增多分区
6.使用reduceByKey 代替groupByKey
使用广播变量
当Exeutor端使用到Driver端的变量时,可以使用广播变量来减少Executor端内存的占用
如果不使用广播变量,那每个Executor中有多少task就有多少Driver端的变量副本
注意
1.Executor端的内存需要够用
2.广播变量不能在Executor中修改
使用Kryo序列化
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");//设置序列化方式 conf.registerKryoClasses(new Class[]{_KryoBean.class});//注册使用kryo序列化的类
1.RDD[自定义类型]
2.数据持久化 : StorageLevel.MEMORY_ADK_DISK_SER
3.task 节点之间发送
注意问题: 如果一个类使用了java序列化就不能使用kryo序列化了
优化数据结构
数据结构的优化其实就是为了减少存储,举例说明int 类型0 和1 和字符串的"0","1"空间其实差异很大的,在做序列化传输的时候成本同时上去了,所以数据结构的定义也有以下建议:
1.尽量使用原生数据类型代替字符串
2.尽量使用字符串代替对象
3.尽量使用数组代替Map集合
总之,代码优化的层面其实就是性能的直接体现,目标就是高效,,减少内存使用、减少shuffle,减少节点之间数据传输,最优的方式达到需要的结果。
Shuffle优化
这里提到的Shuffle是说在shuffle过程中,我们还可以做一些调整,需要跟踪shuffe的元数据来进行调整:
spark.reducer.maxSizeInFlight 48M shuffle一次拉取数据的缓存 spark.shuffle.io.maxRetries 3 : shuffle 拉取数据task 失败重试次数 spark.shuffle.io.retryWait 5s : shuffle拉取数据task 重试等待间隔 spark.shuffle.sort.bypassMergeThreshold : 200 ,bypass机制开启的条件之一,另一个使用bypass条件是map端不能有预聚合
内存优化
Spark一直号称内存计算,但是实际上来说数据体量大的时候内存也成为一个负担,内存的合理使用才能达到最优,相反的话会开始使用磁盘进行数据交换,整体性能下降,内存分配有以下原则:
1.task运行内存多一些,减少磁盘溢出
2.合理调整Spark内存分布
静态内存分布–过去的使用方式,导致内存不能合理使用,造成很大程度的浪费
统一内存分布–2.x之后的方式,目的是为了统一内存的分配
3. 堆外内存调整 spark.executor.memoryOverhead=2048M
举例说明如下:
总共300M预留,会按照如下配置进行分配 (总-300M) * 0.6 -- spark.memory.fraction 0.5 : RDD缓存和广播变量 --spark.memory.storageFraction 0.5 : shuffle聚合内存 (总-300M) * 0.4 task运行内存
数据倾斜处理
数据倾斜出现有以下的场景:
MapReduce :reduce task处理的数据相对于其他 task来说处理的数据量多 – shuffle导致
Hive :Hive中某列下对应的相同key非常多,这张Hive表有数据倾斜
Spark : Spark rdd中某个分区的数据相对于其他分区来说数据量多–shuffle导致
解决方法
1.使用Hive ETL预处理 2.过滤少数倾斜的key 3.增加并行度 场景:不同的key多,分区少,可以直接增加并行度 4.双重聚合 场景:相同的key多,分区少 解决思路:将key 打散(随机加前缀),聚合,再去前缀,再聚合 5.使用map join 代替reduce join 场景:两个RDD要join, 一个RDD大,有数据倾斜,一个RDD小 解决方式:可以考虑直接将小的RDD回收广播,对有数据倾斜的RDD直接使用map类的算子操作 6.找出倾斜key ,分拆join 场景:两个RDD要join,一个RDD大,少量的key有数据倾斜,另一个RDD小,但是无法采用第五种方案 解决方式:找出倾斜的key,分成倾斜的RDD与不倾斜的RDD,倾斜RDD中,一个随机加前缀一个膨胀处理,正常RDD正常join,最后结果union一起 7.使用随机前缀和扩容RDD进行join 场景:两个RDD要join,一个RDD大,大量的key有数据倾斜,另一个RDD小,无法采用第五种方案解决 解决:直接对RDD进行随机加前缀与膨胀处理
存储介质优化
对于spark内存计算来说,存储的rpc请求往往是瓶颈,在spark的作业中其实目标就是优化读取速度,我们会按照不同的读写频率存储,我们生产实践中把数据直接cache内存,用的就是alluxio,存储的优化归纳如下:
StoragePolicies 存储类型 ARCHIVE DISK SSD RAM_DISK 存储策略 hot Cold Warm
资源隔离
这点在生产上体会比较多,那就是我们需要发现热点节点,经常出的问题就是在某一台的节点上一直task很慢,这种时候需要去怀疑这台机器了,热点的数据,长任务,高io都会影响,这种时候可以考虑做一些资源隔离的方案,yarn的node-lables进行打标处理,做到在大数据层面的灰度机制。
yarn.scheduler.capacity.root.accessible-node-labels=* yarn.scheduler.capacity.root.xxxx.accessible-node-labels=label_xx yarn.scheduler.capacity.root.yyyy.accessible-node-labels=label_xx,label_yy yarn.scheduler.capacity.root.default.default-node-label-expression= yarn.scheduler.capacity.root.xxxx.default-node-label-expression=label_xx yarn.scheduler.capacity.root.yyyy.default-node-label-expression=label_yy
后记
原始的RDD级别的优化其实大家接触不会太多,就其原因是这个难度其实是比较大的,引擎级别也在不断改进,到了SparkSQL的时代,多了很多智能化的调整机制,例如AQE等,但是spark内核级别的调整有更加广泛的适应性,期待能带来一些有效的帮助。