Spark技术内幕:Sort Based Shuffle实现解析

简介:

在Spark 1.2.0中,Spark Core的一个重要的升级就是将默认的Hash Based Shuffle换成了Sort Based Shuffle,即spark.shuffle.manager 从hash换成了sort,对应的实现类分别是org.apache.spark.shuffle.hash.HashShuffleManager和org.apache.spark.shuffle.sort.SortShuffleManager。

这个方式的选择是在org.apache.spark.SparkEnv完成的:

    // Let the user specify short names forshuffle managers
    val shortShuffleMgrNames = Map(
      "hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",
      "sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")
    val shuffleMgrName =conf.get("spark.shuffle.manager", "sort") //获得Shuffle Manager的type,sort为默认
    val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager =instantiateClass[ShuffleManager](shuffleMgrClass)

那么Sort BasedShuffle“取代”Hash BasedShuffle作为默认选项的原因是什么?

正如前面提到的,Hashbased shuffle的每个mapper都需要为每个reducer写一个文件,供reducer读取,即需要产生M*R个数量的文件,如果mapper和reducer的数量比较大,产生的文件数会非常多。Hash based shuffle设计的目标之一就是避免不需要的排序(Hadoop Map Reduce被人诟病的地方,很多不需要sort的地方的sort导致了不必要的开销)。但是它在处理超大规模数据集的时候,产生了大量的DiskIO和内存的消耗,这无疑很影响性能。Hash based shuffle也在不断的优化中,正如前面讲到的Spark 0.8.1引入的file consolidation在一定程度上解决了这个问题。为了更好的解决这个问题,Spark 1.1 引入了Sort based shuffle。首先,每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理的数据。避免产生大量的文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件对系统带来的压力。

并且从作者ReynoldXin的几乎所有的测试来看,Sortbased shuffle在速度和内存使用方面优于Hashbased shuffle:“sort-basedshuffle has lower memory usage and seems to outperformhash-based in almost allof our testing.”

性能数据:from:https://issues.apache.org/jira/browse/SPARK-3280


Shuffle Map Task会按照key相对应的partition ID进行sort,其中属于同一个partition的key不会sort。因为对于不需要sort的操作来说,这个sort是负收益的;要知道之前Spark刚开始使用Hash based的shuffle而不是sort based就是为了避免Hadoop Map Reduce对于所有计算都会sort的性能损耗。对于那些需要sort的运算,比如sortByKey,这个sort在Spark 1.2.0里还是由reducer完成的。

如果这个过程内存不够用了,那么这些已经sort的内容会被spill到外部存储。然后在结束的时候将这些不同的文件进行merge sort。

为了便于下游的Taskfetch到其需要的partition,这里会生成一个index文件,去记录不同的partition的位置信息。当然了org.apache.spark.storage.BlockManager需要也有响应的实现以实现这种新的寻址方式。


核心实现的逻辑都在类org.apache.spark.shuffle.sort.SortShuffleWriter。下面简要分析一下它的实现:

1)          对于每个partition,创建一个scala.Array存储它所包含的key,value对。每个待处理的key,value对都会插入相应的scala.Array。

2)          如果scala.Array的大小超过阈值,那么需要将这个in memory的数据spill到外部存储。这个文件的开始部分会记录这个partition的ID,这个文件保存了多少个pair等信息。

3)          最后需要将所有spill到外部存储的文件进行mergesort。同时打开的文件不能过多,过多的话会消耗大量的内存,增加OOM或者GC的风险;也不能过少,过少的话就会影响性能,增大计算的延时。一般的话推荐每次同时打开10 – 100个文件。

4)          在生成最后的数据文件时,需要同时生成index索引文件。正如前面提到的,这个索引文件将记录不同partition的range。

当然了,你可能还有个疑问,就是Hash Based Shuffle说白了就是根据key需要写入的org.apache.spark.HashPartitioner,为每个Reducer写入单独的Partition。只不过对于同一个Core启动的Shuffle Map Task,如果选择spark.shuffle.consolidateFiles的话,第二个Shuffle Map Task会把结果append到上一个文件中去。那么sort的逻辑是完全可以整合到Hash Based Shuffle中去,为什么又要重新实现一种Shuffle Writer呢?我认为有以下几点:

  • Shuffle机制是所有类似计算模块的核心机制之一,要进行大的优化的风险非常高;比如一个看似简单的consolidation机制,在0.8.1就引入了,但是到1.2.0还是没有作为默认选项。
  • Hash Based Shuffle如果修改为Sort的逻辑,所谓的改进可能会影响原来已经稳定的Spark应用。比如一个应用在使用Hash Based Shuffle性能是完全符合预期的,那么迁移到Spark 1.2.0后,只需要将配置文件修改以下就可以完成这个无缝的迁移。
  • 作为一个通用的计算平台,你的测试的case永远cover不了所有的场景。那么,还是留给用户去选择吧。
  • Sort的机制还处理不断完善的阶段。比如很有的优化或者功能的改进会不断的完善。因此,期待Sort在以后的版本中更加完善吧。

如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。


目录
相关文章
|
14天前
|
JSON 前端开发 JavaScript
【JavaScript技术专栏】JavaScript异步编程:Promise、async/await解析
【4月更文挑战第30天】JavaScript中的异步编程通过Promise和async/await来解决回调地狱问题。Promise代表可能完成或拒绝的异步操作,有pending、fulfilled和rejected三种状态。它支持链式调用和Promise.all()、Promise.race()等方法。async/await是ES8引入的语法糖,允许异步代码以同步风格编写,提高可读性和可维护性。两者结合使用能更高效地处理非阻塞操作。
|
2天前
|
机器学习/深度学习 人工智能 算法
构建高效AI系统:深度学习优化技术解析
【5月更文挑战第12天】 随着人工智能技术的飞速发展,深度学习已成为推动创新的核心动力。本文将深入探讨在构建高效AI系统中,如何通过优化算法、调整网络结构及使用新型硬件资源等手段显著提升模型性能。我们将剖析先进的优化策略,如自适应学习率调整、梯度累积技巧以及正则化方法,并讨论其对模型训练稳定性和效率的影响。文中不仅提供理论分析,还结合实例说明如何在实际项目中应用这些优化技术。
|
3天前
|
负载均衡 关系型数据库 MySQL
MySQL读写分离技术深度解析
在高并发、大数据量的互联网应用环境中,数据库作为数据存储的核心组件,其性能直接影响着整个系统的运行效率。MySQL作为最常用的开源关系型数据库之一,虽然功能强大,但在处理大量并发读写请求时,单点服务器的性能瓶颈逐渐显现。为了解决这一问题,MySQL读写分离技术应运而生,成为提升数据库性能、实现负载均衡的有效手段。
|
5天前
|
存储 SQL 自然语言处理
RAG技术全解析:打造下一代智能问答系统
一、RAG简介 大型语言模型(LLM)已经取得了显著的成功,尽管它们仍然面临重大的限制,特别是在特定领域或知识密集型任务中,尤其是在处理超出其训练数据或需要当前信息的查询时,常会产生“幻觉”现象。为了克服这些挑战,检索增强生成(RAG)通过从外部知识库检索相关文档chunk并进行语义相似度计算,增强了LLM的功能。通过引用外部知识,RAG有效地减少了生成事实不正确内容的问题。RAG目前是基于LLM系统中最受欢迎的架构,有许多产品基于RAG构建,使RAG成为推动聊天机器人发展和增强LLM在现实世界应用适用性的关键技术。 二、RAG架构 2.1 RAG实现过程 RAG在问答系统中的一个典型
38 2
|
7天前
|
机器学习/深度学习 算法 物联网
LISA微调技术解析:比LoRA更低的显存更快的速度
LISA是Layerwise Importance Sampling for Memory-Efficient Large Language Model Fine-Tuning的简写,由UIUC联合LMFlow团队于近期提出的一项LLM微调技术,可实现把全参训练的显存使用降低到之前的三分之一左右,而使用的技术方法却是非常简单。
|
13天前
|
供应链 Java API
Java 8新特性解析及应用区块链技术在供应链管理中的应用与挑战
【4月更文挑战第30天】本文将深入探讨Java 8的新特性,包括Lambda表达式、Stream API和Optional类等。通过对这些新特性的详细解析和应用实例,帮助读者更好地理解和掌握Java 8的新技术。
|
14天前
|
NoSQL 大数据 数据处理
MongoDB聚合框架与复杂查询优化:技术深度解析
【4月更文挑战第30天】本文深入探讨了MongoDB的聚合框架和复杂查询优化技术。聚合框架包含$match、$group、$sort和$project阶段,用于数据处理和分析,提供灵活性和高性能。优化查询涉及创建合适索引、使用聚合框架、简化查询语句、限制返回结果数、避免跨分片查询、只查询所需字段及使用$inc操作符。理解这些技术有助于提升MongoDB在大数据和复杂查询场景下的性能。
|
14天前
|
Dart 前端开发 开发者
【Flutter前端技术开发专栏】Flutter Dart语言基础语法解析
【4月更文挑战第30天】Dart是Google为Flutter框架打造的高效编程语言,具有易学性、接口、混入、抽象类等特性。本文概述了Dart的基础语法,包括静态类型(如int、String)、控制流程(条件、循环)、函数、面向对象(类与对象)和异常处理。此外,还介绍了库导入与模块使用,帮助开发者快速入门Flutter开发。通过学习Dart,开发者能创建高性能的应用。
【Flutter前端技术开发专栏】Flutter Dart语言基础语法解析
|
15天前
|
编解码 前端开发 JavaScript
网页设计的艺术与技术:深入解析与代码实践
网页设计的艺术与技术:深入解析与代码实践
18 1
|
15天前
|
算法 计算机视觉 Python
DSP技术深度解析:原理、实践与应用
DSP技术深度解析:原理、实践与应用
21 1

推荐镜像

更多