Spark技术内幕:Shuffle Pluggable框架详解,你怎么开发自己的Shuffle Service?

简介:

首先介绍一下需要实现的接口。框架的类图如图所示(今天CSDN抽风,竟然上传不了图片。如果需要实现新的Shuffle机制,那么需要实现这些接口。


1.1.1  org.apache.spark.shuffle.ShuffleManager

Driver和每个Executor都会持有一个ShuffleManager,这个ShuffleManager可以通过配置项spark.shuffle.manager指定,并且由SparkEnv创建。Driver中的ShuffleManager负责注册Shuffle的元数据,比如Shuffle ID,map task的数量等。Executor中的ShuffleManager 则负责读和写Shuffle的数据。

需要实现的函数及其功能说明:

1)       由Driver注册元数据信息

   defregisterShuffle[K, V, C](

     shuffleId: Int,

     numMaps: Int,

dependency:ShuffleDependency[K, V, C]): ShuffleHandle

一般如果没有特殊的需求,可以使用下面的实现,实际上Hash BasedShuffle 和Sort BasedShuffle都是这么实现的。

  override def registerShuffle[K, V, C](

 

      shuffleId: Int,

      numMaps: Int,

      dependency: ShuffleDependency[K, V, C]):ShuffleHandle = {

    new BaseShuffleHandle(shuffleId, numMaps,dependency)

  }

2)       获得Shuffle Writer, 根据Shuffle Map Task的ID为其创建Shuffle Writer。

def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context:TaskContext): ShuffleWriter[K, V]

3)       获得Shuffle Reader,根据Shuffle ID和partition的ID为其创建ShuffleReader。

  def getReader[K, C](

      handle: ShuffleHandle,

      startPartition: Int,

      endPartition: Int,

      context: TaskContext): ShuffleReader[K,C]

4)       为数据成员shuffleBlockManager赋值,以保存实际的ShuffleBlockManager

5)       defunregisterShuffle(shuffleId: Int): Boolean,删除本地的Shuffle的元数据。

6)       def stop(): Unit,停止Shuffle Manager。

每个接口的具体实现的例子,可以参照org.apache.spark.shuffle.sort.SortShuffleManager 和org.apache.spark.shuffle.hash.HashShuffleManager。

1.1.2  org.apache.spark.shuffle.ShuffleWriter

Shuffle Map Task通过ShuffleWriter将Shuffle数据写入本地。这个Writer主要通过ShuffleBlockManager来写入数据,因此它的功能是比较轻量级的。

1)         def write(records: Iterator[_ <:Product2[K, V]]): Unit, 写入所有的数据。需要注意的是如果需要在Map端做聚合。(aggregate),那么写入前需要将records做聚合。

2)         def stop(success: Boolean): Option[MapStatus],写入完成后提交本次写入。

对于Hash BasedShuffle,请查看org.apache.spark.shuffle.hash.HashShuffleWriter;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.sort.SortShuffleWriter。

1.1.3  org.apache.spark.shuffle.ShuffleBlockManager

主要使用从本地读取Shuffle数据的功能。这些接口都是通过org.apache.spark.storage.BlockManager调用的。

1)       def getBytes(blockId: ShuffleBlockId):Option[ByteBuffer], 一般通过调用下一个接口实现,只不过将ManagedBuffer转换成了ByteBuffer。

2)       def getBlockData(blockId:ShuffleBlockId): ManagedBuffer,核心读取逻辑。比如Hash Based Shuffle的从本地读取文件都是通过这个接口实现的。因为不同的实现可能文件的组织方式是不一样的,比如Sort Based Shuffle需要通过先读取Index索引文件获得每个partition的起始位置后,才能读取真正的数据文件。

3)       def stop(): Unit,停止该Manager。

对于Hash Based Shuffle,请查看org.apache.spark.shuffle.FileShuffleBlockManager;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.IndexShuffleBlockManager。

1.1.4  org.apache.spark.shuffle.ShuffleReader

ShuffleReader实现了下游的Task如何读取上游的ShuffleMapTask的Shuffle输出的逻辑。这个逻辑比较复杂,简单来说就是通过org.apache.spark.MapOutputTracker获得数据的位置信息,然后如果数据在本地那么调用org.apache.spark.storage.BlockManager的getBlockData读取本地数据(实际上getBlockData最终会调用org.apache.spark.shuffle.ShuffleBlockManager的getBlockData)。具体的Shuffle Read的逻辑请查看下面的章节。

1)       def read():Iterator[Product2[K, C]]


如何开发自己的Shuffle机制?到这里你应该知道怎么做了。不知道? 再看一遍吧。



如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。


目录
相关文章
|
8月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
771 1
|
5月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
111 2
|
3天前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
152 2
|
2月前
|
分布式计算 监控 大数据
如何优化Spark中的shuffle操作?
【10月更文挑战第18天】
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
137 1
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
106 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
117 0
|
5月前
|
分布式计算 资源调度 Shell
如何开始使用Spark框架?
【8月更文挑战第31天】如何开始使用Spark框架?
111 2
|
5月前
|
SQL 机器学习/深度学习 分布式计算
Spark框架
【8月更文挑战第31天】Spark框架
56 2