Spark性能调优-RDD算子调优篇(深度好文,面试常问,建议收藏) (一)

简介: Spark性能调优-RDD算子调优

RDD算子调优


不废话,直接进入正题!


1. RDD复用


在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,如下图所示:


image.png


对上图中的RDD计算架构进行修改,得到如下图所示的优化结果:


image.png


2. 尽早filter


获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。


3. 读取大量小文件-用wholeTextFiles


当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为RDD的一个元素。

也可以将多个完整的文本文件一次性读取为一个pairRDD,其中键是文件名,值是文件内容。


val input:RDD[String] = sc.textFile("dir/*.log")


如果传递目录,则将目录下的所有文件读取作为RDD。文件路径支持通配符。


但是这样对于大量的小文件读取效率并不高,应该使用 wholeTextFiles


返回值为RDD[(String, String)],其中Key是文件的名称,Value是文件的内容。


def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])


wholeTextFiles读取小文件:


val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)


4. mapPartition和foreachPartition


  • mapPartitions


map(_....) 表示每一个元素


mapPartitions(_....) 表示每个分区的数据组成的迭代器


普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。


如果是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每个元素进行操作。


image.png


如果是mapPartition算子,由于一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收所有的partition数据,效率比较高。


image.png


比如,当要把RDD中的所有数据通过JDBC写入数据,如果使用map算子,那么需要对RDD中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用mapPartitions算子,那么针对一个分区的数据,只需要建立一个数据库连接。


mapPartitions算子也存在一些缺点:对于普通的map操作,一次处理一条数据,如果在处理了2000条数据后内存不足,那么可以将已经处理完的2000条数据从内存中垃圾回收掉;但是如果使用mapPartitions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出。


因此,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM)


在项目中,应该首先估算一下RDD的数据量、每个partition的数据量,以及分配给每个Executor的内存资源,如果资源允许,可以考虑使用mapPartitions算子代替map。


  • foreachPartition


rrd.foreache(_....) 表示每一个元素


rrd.forPartitions(_....) 表示每个分区的数据组成的迭代器


在生产环境中,通常使用foreachPartition算子来完成数据库的写入,通过foreachPartition算子的特性,可以优化写数据库的性能。


如果使用foreach算子完成数据库的操作,由于foreach算子是遍历RDD的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用foreachPartition算子。


与mapPartitions算子非常相似,foreachPartition是将RDD的每个分区作为遍历对象,一次处理一个分区的数据,也就是说,如果涉及数据库的相关操作,一个分区的数据只需要创建一次数据库连接,如下图所示:


image.png


使用了foreachPartition 算子后,可以获得以下的性能提升:


  1. 对于我们写的function函数,一次处理一整个分区的数据;


  1. 对于一个分区内的数据,创建唯一的数据库连接;


  1. 只需要向数据库发送一次SQL语句和多组参数;


在生产环境中,全部都会使用foreachPartition算子完成数据库操作。


foreachPartition算子存在一个问题,与mapPartitions算子类似,如果一个分区的数据量特别大,可能会造成OOM,即内存溢出。


5. filter+coalesce/repartition(减少分区)


在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter过滤后,每个分区的数据量有可能会存在较大差异,如下图所示:


image.png


根据上图我们可以发现两个问题:


  1. 每个partition的数据量变小了,如果还按照之前与partition相等的task个数去处理当前数据,有点浪费task的计算资源;


  1. 每个partition的数据量不一样,会导致后面的每个task处理每个partition数据的时候,每个task要处理的数据量不同,这很有可能导致数据倾斜问题。


如上图所示,第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在相同的处理逻辑下,第二个分区对应的task处理的数据量与第三个分区对应的task处理的数据量差距达到了8倍,这也会导致运行速度可能存在数倍的差距,这也就是数据倾斜问题。


针对上述的两个问题,我们分别进行分析:


  1. 针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,比如将原来4个分区的数据转化到2个分区中,这样只需要用后面的两个task进行处理即可,避免了资源的浪费。


  1. 针对第二个问题,解决方法和第一个问题的解决方法非常相似,对分区数据重新分配,让每个partition中的数据量差不多,这就避免了数据倾斜问题。


那么具体应该如何实现上面的解决思路?我们需要coalesce算子。


repartition与coalesce都可以用来进行重分区,其中repartition只是coalesce接口中shuffle为true的简易实现,coalesce默认情况下不进行shuffle,但是可以通过参数进行设置。


假设我们希望将原本的分区个数A通过重新分区变为B,那么有以下几种情况:


  1. A > B(多数分区合并为少数分区)


  • A与B相差值不大
    此时使用coalesce即可,无需shuffle过程。


  • A与B相差值很大
    此时可以使用coalesce并且不启用shuffle过程,但是会导致合并过程性能低下,所以推荐设置coalesce的第二个参数为true,即启动shuffle过程。

2.A < B(少数分区分解为多数分区)


此时使用repartition即可,如果使用coalesce需要将shuffle设置为true,否则coalesce无效。


我们可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能。


注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。

相关文章
|
存储 SQL 关系型数据库
京东面试:mysql深度分页 严重影响性能?根本原因是什么?如何优化?
京东面试:mysql深度分页 严重影响性能?根本原因是什么?如何优化?
京东面试:mysql深度分页 严重影响性能?根本原因是什么?如何优化?
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
637 0
|
消息中间件 分布式计算 监控
从InfluxDB到StarRocks:Grab实现Spark监控平台10倍性能提升
Grab 是东南亚领先的超级应用,其 Spark 可观测平台 Iris 核心存储迁移到 StarRocks 后性能显著提升。新架构统一了实时与历史数据分析,减少多平台切换复杂性,查询速度提升 10 倍以上,资源使用效率提高 40%。通过物化视图、动态分区和直接 Kafka 摄取数据等优化,简化数据管道并降低运维成本。未来 Grab 将进一步增强推荐系统、集成机器学习,持续优化用户体验与系统可扩展性。
|
消息中间件 存储 Java
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
45岁资深架构师尼恩在其读者群中分享了关于如何提升RocketMQ顺序消费性能的高并发面试题解析。面对10W QPS的高并发场景,尼恩详细讲解了RocketMQ的调优策略,包括专用方案如增加ConsumeQueue数量、优化Topic设计等,以及通用方案如硬件配置(CPU、内存、磁盘、网络)、操作系统调优、Broker配置调整、客户端配置优化、JVM调优和监控与日志分析等方面。通过系统化的梳理,帮助读者在面试中充分展示技术实力,获得面试官的认可。相关真题及答案将收录于《尼恩Java面试宝典PDF》V175版本中,助力求职者提高架构、设计和开发水平。
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
819 15
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
470 0
【赵渝强老师】Spark RDD的缓存机制
|
人工智能 自然语言处理 架构师
字节面试: es怎么提升性能和精准度?(尼恩独家,史上最全)
本文由40岁老架构师尼恩撰写,针对ES(Elasticsearch)提升搜索性能和精准度的面试题进行详细解析。文章首先指出,提升ES速度和精准度是两个独立的问题,分别涉及性能优化和精准度优化。这些内容不仅有助于应对面试中的难题,还能帮助开发者在实际项目中构建更高效的搜索系统。尼恩强调,掌握这些知识后可以在面试中“吊打”面试官,轻松获得理想Offer。同时,他还提供了《尼恩Java面试宝典PDF》等资源供读者学习参考。
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
291 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
335 0
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
471 0