Spark技术内幕:Sort Based Shuffle实现解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介:

在Spark 1.2.0中,Spark Core的一个重要的升级就是将默认的Hash Based Shuffle换成了Sort Based Shuffle,即spark.shuffle.manager 从hash换成了sort,对应的实现类分别是org.apache.spark.shuffle.hash.HashShuffleManager和org.apache.spark.shuffle.sort.SortShuffleManager。

这个方式的选择是在org.apache.spark.SparkEnv完成的:

    // Let the user specify short names forshuffle managers
    val shortShuffleMgrNames = Map(
      "hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",
      "sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")
    val shuffleMgrName =conf.get("spark.shuffle.manager", "sort") //获得Shuffle Manager的type,sort为默认
    val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager =instantiateClass[ShuffleManager](shuffleMgrClass)

那么Sort BasedShuffle“取代”Hash BasedShuffle作为默认选项的原因是什么?

正如前面提到的,Hashbased shuffle的每个mapper都需要为每个reducer写一个文件,供reducer读取,即需要产生M*R个数量的文件,如果mapper和reducer的数量比较大,产生的文件数会非常多。Hash based shuffle设计的目标之一就是避免不需要的排序(Hadoop Map Reduce被人诟病的地方,很多不需要sort的地方的sort导致了不必要的开销)。但是它在处理超大规模数据集的时候,产生了大量的DiskIO和内存的消耗,这无疑很影响性能。Hash based shuffle也在不断的优化中,正如前面讲到的Spark 0.8.1引入的file consolidation在一定程度上解决了这个问题。为了更好的解决这个问题,Spark 1.1 引入了Sort based shuffle。首先,每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理的数据。避免产生大量的文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件对系统带来的压力。

并且从作者ReynoldXin的几乎所有的测试来看,Sortbased shuffle在速度和内存使用方面优于Hashbased shuffle:“sort-basedshuffle has lower memory usage and seems to outperformhash-based in almost allof our testing.”

性能数据:from:https://issues.apache.org/jira/browse/SPARK-3280


Shuffle Map Task会按照key相对应的partition ID进行sort,其中属于同一个partition的key不会sort。因为对于不需要sort的操作来说,这个sort是负收益的;要知道之前Spark刚开始使用Hash based的shuffle而不是sort based就是为了避免Hadoop Map Reduce对于所有计算都会sort的性能损耗。对于那些需要sort的运算,比如sortByKey,这个sort在Spark 1.2.0里还是由reducer完成的。

如果这个过程内存不够用了,那么这些已经sort的内容会被spill到外部存储。然后在结束的时候将这些不同的文件进行merge sort。

为了便于下游的Taskfetch到其需要的partition,这里会生成一个index文件,去记录不同的partition的位置信息。当然了org.apache.spark.storage.BlockManager需要也有响应的实现以实现这种新的寻址方式。


核心实现的逻辑都在类org.apache.spark.shuffle.sort.SortShuffleWriter。下面简要分析一下它的实现:

1)          对于每个partition,创建一个scala.Array存储它所包含的key,value对。每个待处理的key,value对都会插入相应的scala.Array。

2)          如果scala.Array的大小超过阈值,那么需要将这个in memory的数据spill到外部存储。这个文件的开始部分会记录这个partition的ID,这个文件保存了多少个pair等信息。

3)          最后需要将所有spill到外部存储的文件进行mergesort。同时打开的文件不能过多,过多的话会消耗大量的内存,增加OOM或者GC的风险;也不能过少,过少的话就会影响性能,增大计算的延时。一般的话推荐每次同时打开10 – 100个文件。

4)          在生成最后的数据文件时,需要同时生成index索引文件。正如前面提到的,这个索引文件将记录不同partition的range。

当然了,你可能还有个疑问,就是Hash Based Shuffle说白了就是根据key需要写入的org.apache.spark.HashPartitioner,为每个Reducer写入单独的Partition。只不过对于同一个Core启动的Shuffle Map Task,如果选择spark.shuffle.consolidateFiles的话,第二个Shuffle Map Task会把结果append到上一个文件中去。那么sort的逻辑是完全可以整合到Hash Based Shuffle中去,为什么又要重新实现一种Shuffle Writer呢?我认为有以下几点:

  • Shuffle机制是所有类似计算模块的核心机制之一,要进行大的优化的风险非常高;比如一个看似简单的consolidation机制,在0.8.1就引入了,但是到1.2.0还是没有作为默认选项。
  • Hash Based Shuffle如果修改为Sort的逻辑,所谓的改进可能会影响原来已经稳定的Spark应用。比如一个应用在使用Hash Based Shuffle性能是完全符合预期的,那么迁移到Spark 1.2.0后,只需要将配置文件修改以下就可以完成这个无缝的迁移。
  • 作为一个通用的计算平台,你的测试的case永远cover不了所有的场景。那么,还是留给用户去选择吧。
  • Sort的机制还处理不断完善的阶段。比如很有的优化或者功能的改进会不断的完善。因此,期待Sort在以后的版本中更加完善吧。

如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。


目录
相关文章
|
10天前
|
机器学习/深度学习 人工智能 自然语言处理
AI技术深度解析:从基础到应用的全面介绍
人工智能(AI)技术的迅猛发展,正在深刻改变着我们的生活和工作方式。从自然语言处理(NLP)到机器学习,从神经网络到大型语言模型(LLM),AI技术的每一次进步都带来了前所未有的机遇和挑战。本文将从背景、历史、业务场景、Python代码示例、流程图以及如何上手等多个方面,对AI技术中的关键组件进行深度解析,为读者呈现一个全面而深入的AI技术世界。
67 10
|
17天前
|
机器学习/深度学习 人工智能 自然语言处理
秒级响应 + 99.9%准确率:法律行业文本比对技术解析
本工具基于先进AI技术,采用自然语言处理和语义匹配算法,支持PDF、Word等格式,实现法律文本的智能化比对。具备高精度语义匹配、多格式兼容、高性能架构及智能化标注与可视化等特点,有效解决文本复杂性和法规更新难题,提升法律行业工作效率。
|
14天前
|
数据采集 存储 JavaScript
网页爬虫技术全解析:从基础到实战
在信息爆炸的时代,网页爬虫作为数据采集的重要工具,已成为数据科学家、研究人员和开发者不可或缺的技术。本文全面解析网页爬虫的基础概念、工作原理、技术栈与工具,以及实战案例,探讨其合法性与道德问题,分享爬虫设计与实现的详细步骤,介绍优化与维护的方法,应对反爬虫机制、动态内容加载等挑战,旨在帮助读者深入理解并合理运用网页爬虫技术。
|
20天前
|
机器学习/深度学习 自然语言处理 监控
智能客服系统集成技术解析和价值点梳理
在 2024 年的智能客服系统领域,合力亿捷等服务商凭借其卓越的技术实力引领潮流,它们均积极应用最新的大模型技术,推动智能客服的进步。
55 7
|
25天前
|
负载均衡 网络协议 算法
Docker容器环境中服务发现与负载均衡的技术与方法,涵盖环境变量、DNS、集中式服务发现系统等方式
本文探讨了Docker容器环境中服务发现与负载均衡的技术与方法,涵盖环境变量、DNS、集中式服务发现系统等方式,以及软件负载均衡器、云服务负载均衡、容器编排工具等实现手段,强调两者结合的重要性及面临挑战的应对措施。
58 3
|
28天前
|
网络协议 网络性能优化 数据处理
深入解析:TCP与UDP的核心技术差异
在网络通信的世界里,TCP(传输控制协议)和UDP(用户数据报协议)是两种核心的传输层协议,它们在确保数据传输的可靠性、效率和实时性方面扮演着不同的角色。本文将深入探讨这两种协议的技术差异,并探讨它们在不同应用场景下的适用性。
68 4
|
29天前
|
Kubernetes Java 微服务
微服务上下线动态感知实现的技术解析
随着微服务架构的广泛应用,服务的动态管理和监控变得尤为重要。在微服务架构中,服务的上下线是一个常见的操作,如何实时感知这些变化,确保系统的稳定性和可靠性,成为了一个关键技术挑战。本文将深入探讨微服务上下线动态感知的实现方式,从技术基础、场景案例、解决思路和底层原理等多个维度进行阐述,并分别使用Java和Python进行演示介绍。
62 4
|
28天前
|
安全 持续交付 Docker
深入理解并实践容器化技术——Docker 深度解析
深入理解并实践容器化技术——Docker 深度解析
53 2
|
28天前
|
供应链 算法 安全
深度解析区块链技术的分布式共识机制
深度解析区块链技术的分布式共识机制
48 0
|
28天前
|
存储 供应链 算法
深入解析区块链技术的核心原理与应用前景
深入解析区块链技术的核心原理与应用前景
53 0

推荐镜像

更多