Spark中几种ShuffleWriter的区别你都知道吗?

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 一.前言在Spark中有三种shuffle写,分别是BypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter。分别对应三种不同的shuffleHandle。

一.前言

在Spark中有三种shuffle写,分别是BypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter。分别对应三种不同的shuffleHandle。

这三者和ShuffleHandle的对应关系如下:

UnsafeShuffleWriter:SerializedShuffleHandle

BypassMergeSortShuffleWriter:BypassMergeSortShuffleHandle,

SortShuffleWriter:BaseShuffleHandle

那么这些shuffle写内部的实现细节有何不同,在什么场景下使用什么样的shuffleWriter呢,接下来我们对着三种ShuffleWriter的实现细节做一个比较。

二.不同shuffleHandle的使用时机

不同shuffleWrite的使用其实是根据shuffleHandle来决定的,在构建shuffleDependence时都会构建shuffleHandle,在registerShuffle方法中,有着对shuffleHandle使用的一个条件约束,因此使用条件也有所不同。

1.对于BypassMergeSortShuffleHandle

map端没有聚合操作,且分区必须小于200

在许多使用场景下,有些算子会在map端先进行一次combine,减少数据传输,而BypassMergeSortShuffleHandle不支持这种操作,因为该handle对应的BypassMergeSortShuffleWriter是开辟和后续RDD分区数量一样数量的小文件,读取每条记录算出它的分区号,然后根据分区号判断应该追加到该文件中,此外这个过程也有缓冲区的概念,但一般这个缓冲区都不会特别大,默认为32k。这也是这种shuffle写不支持map端聚合的一个原因,因为聚合必然要在内存中储存一批数据,将相同key的数据做聚合,而这里是直接开辟多个I/O流,根据分区号往文件中追加数据。

而正因为要同时打开多个文件,所以后续RDD的分区数也不能太多,否则同时打开多个文件,产生多个IO,消耗的资源成本很高。

2.对于SerializedShuffleHandle

map端没有聚合操作,需要Serializer支持relocation,分区数目必须小于 16777216

序列化方式需要支持重定位,即使用KryoSerializer等一些序列化方式。这种方式下用到了Tungsten优化,排序的是二进制的数据,不会对数据进行反序列化操作,所以不支持aggregation。至于为什么我们在后续的实现细节做一个解释。

分区数目必须小于 16777216的原因是,partition number是使用24bit 表示的。

3.对于BaseShuffleHandle

以上情况都不满足时,采用这种ShuffleHandle,对应的ShuffleWrite是SortShuffleWriter,这种形式的支持map端聚合操作,而且也支持排序操作。

三.不同shuffleWrite的实现细节

1.BypassMergeSortShuffleWriter 实现细节

BypassMergeSortShuffleWriter会根据RDD的分区数打开此数量的文件,然后通过rdd的迭代器,迭代出每一条数据,对这些record的分区号进行计算,到当前这条数据写入的分区号,然后写入到该分区对应的文件中。

最后数据迭代完毕,会生成许多分区记录文件,之后将所有分区的数据会合并为同一个文件。此外还会生成一个索引文件,是为了索引到每个分区的起始地址,可以随机访问某个partition的所有数据。

但是需要注意的是,这种方式不宜有太多分区,因为过程中会并发打开所有分区对应的临时文件,会对文件系统造成很大的压力。这种模式下为了减少IO次数,会采用buffer,但是buffer的大小默认为32k,当然这个大小是可以通过spark.shuffle.file.buffer参数自定义配置的。

2.UnsafeShuffleWriter 实现细节

UnsafeShuffleWriter 在内部维护了一块内存,这里的内存分为两部分。一部分是以Page(默认4KB)的形式存在的,存储是真正的记录。另一部分是一个存储数据指针的LongArray数组。这些数据都是被序列化存储的,其中指针是采用了8个字节来代表一条数据,8个字节的定义的数据结构具体存储的信息为24 bit partition number[27 bit offset in page] ,其中存储了这条数据的partition和真实记录数据的指针。

数据记录被传入,先进行序列化,写入到内存页Page中,同时对该数据产生一条指针存储到LongArray数组中,做排序操作,排序操作使用的算法是默认是 RadixSort。

在每次排序比较的时候,只需要线性的查找指针区域的数据,不用根据指针去找真实的记录数据做比较,同时序列化器支持对二进制的数据进行排序比较,不会对数据进行反序列化操作,这样避免了反序列化和随机读取带来的开销,因为不会序列化成对象,可以减少内存的消耗和GC的开销。

UnsafeShuffleWriter中内存管理(申请、释放)工作,由ShuffleExternalSorter来完成。ShuffleExternalSorter还有一个作用就是当内存中数据太多的时候,会先spill到磁盘,防止内存溢出。

之后,如果一个Page内存满了,就会申请内存,如果申请不到内存,就 spill到文件中。

在spill时,会根据指针的顺序溢写,这样就保证了每次溢写的文件都是根据Partition来进行排序的。一个文件里不同的partiton的数据用fileSegment来表示,对应的信息存在 SpillInfo 数据结构中。

最后的merge阶段会根据分区号去每个溢写文件中去拉取对应分区的数据,然后写入一个输出文件,最终合并成一个依据分区号全局有序的大文件。此外还会将每个partition的offset写入index文件方便reduce端拉取数据。

merge阶段,相同分区的都合并到一起,最终返回一个完整的文件,根据压缩,加密的需求有不同的合并方式,有以下两种合并方式:

mergeSpillsWithTransferTo方法,基于Java NIO,通过channel直接传输数据,内部通过两层循环遍历每个文件的每个分区,将分区相同的数据合并到一起。

对应mergeSpillsWithFileStream,使用Java标准的流式IO,它主要用于IO压缩的编解码器不支持级联压缩数据,加密被启动或用户已显示禁止使用transferTo的情况

默认配置情况下会使用mergeSpillsWithTransferTo方式,使用lz4进行压缩,不加密,直接通过NIO的transferTo机制合并。NIO的transferTo()方法在内部实现中,由native方法transferTo0()来实现,它依赖底层操作系统的支持。在UNIX和Linux系统中,调用这个方法将会引起sendfile()系统调用。

通过sendfile实现的零拷贝I/O会减少用户态和内核态之间的切换,同时减少不必要的数据拷贝,在这里在发送sendfile系统调用,会触发一次,用户态和内核态的切换,通过DMA将磁盘上文件的内存拷贝到内核空间缓冲区,然后在内核缓冲区触发文件的合并,之后直接将合并后的文件写入到磁盘中,这时又会触发一次数据的拷贝。等sendfile系统调用返回,又会触发一次用户态和内核态之间的切换。

3.SortShuffleWriter 实现细节

SortShuffleWriter 中的处理步骤如下:
1.使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在内存中进行排序, 排序的Key是(partitionId, hash(key)) 这样一个元组。

2.如果超过内存阈值,spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根据 hash(key)进行比较排序。

3.如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件和当前内存中的数据结构中的数据进行mergeSort,进行全局排序

SortShuffleWriter 中使用 ExternalSorter 来对内存中的数据进行排序,ExternalSorter内部维护了两个集合PartitionedAppendOnlyMap、PartitionedPairBuffer,两者的区别如下:

image

PartitionedAppendOnlyMap的处理逻辑为,根据key值插入数据,如果对应位置有值且等于原先key,直接进行aggregation操作,更新数据。如果对应位置有值且不等于原先key,则利用线性探查法处理Hash冲突,向后挪动一位插入。

内部的排序是使用优化的排序算法TimSort对PartitionedAppendOnlyMap、PartitionedPairBuffer底层的Array进行排序, 排序的逻辑是,先根据PartitionId,再根据Key的hashCode进行排序。

当每个缓冲区达到内存限制时,会将其写出(spill)到一个中间文件中。当用户请求迭代器或文件输出时,溢出文件将和剩余的内存数据合写成一个有序的文件。这里的核心是partitionedIterator,将已排序的文件序列和内存中的数据合并,返回迭代器,迭代器按partition分组,对每个partition,都有一个遍历其内容的迭代器,按顺序访问数据。

最后会删除所有中间文件。

5万人关注的大数据成神之路,不来了解一下吗?
5万人关注的大数据成神之路,真的不来了解一下吗?
5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
15天前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
80 1
|
6月前
|
SQL 分布式计算 Hadoop
干翻Hadoop系列文章【02】:Hadoop、Hive、Spark的区别和联系
干翻Hadoop系列文章【02】:Hadoop、Hive、Spark的区别和联系
|
20天前
|
分布式计算 资源调度 Hadoop
Spark Standalone与YARN的区别?
【10月更文挑战第5天】随着大数据处理需求的增长,Apache Spark 成为了广泛采用的大数据处理框架。本文详细解析了 Spark Standalone 与 YARN 两种常见部署模式的区别,并通过示例代码展示了如何在不同模式下运行 Spark 应用程序。Standalone 模式自带轻量级集群管理,适合小规模集群或独立部署;YARN 则作为外部资源管理器,能够与 Hadoop 生态系统中的其他应用共享资源,更适合大规模生产环境。文章对比了两者的资源管理、部署灵活性、扩展性和集成能力,帮助读者根据需求选择合适的部署模式。
14 1
|
3月前
|
缓存 分布式计算 数据处理
|
3月前
|
分布式计算 Serverless 数据处理
|
5月前
|
资源调度 分布式计算 监控
Spark Standalone与YARN的区别?
【6月更文挑战第17天】Spark Standalone与YARN的区别?
313 57
|
5月前
|
分布式计算 Hadoop 大数据
Spark与Hadoop的区别?
【6月更文挑战第15天】Spark与Hadoop的区别?
52 8
|
6月前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
6月前
|
SQL 分布式计算 Hadoop
Spark与Hadoop的关系和区别
Spark与Hadoop的关系和区别
|
6月前
|
分布式计算 Hadoop 数据处理
Spark与Hadoop的区别是什么?请举例说明。
Spark与Hadoop的区别是什么?请举例说明。
169 0