利用闪存优化在Cosco基础上的Spark Shuffle

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: SPARK+AI SUMMIT 2020中文精华版线上峰会将会带领大家一起回顾2020年的SPARK又产生了怎样的最佳实践,技术上取得了哪些突破,以及周边的生态发展。本文中,来自Databricks开源项目组的软件工程师吴一介绍了利用Flash闪存优化在Cosco基础上的Spark Shuffle。原标题:Flash for Spark Shuffle with Cosco

Cosco是Facebook开发的一种服务,主要用于优化Spark Shuffle的性能,下文主要介绍用Flash闪存(以下简称:闪存)进一步优化Cosco。

一、Cosco

Cosco作为一种服务主要优化Spark Shuffle的性能,其优势有:

  • 相较于原生的Spark Shuffle,能够提升大约3倍的I/O性能,能够有效降低磁盘的读写时间;
  • 引入闪存以后Cosco能够以更少的资源支撑更多的场景;
  • 引入闪存之后有更大的可能降低Query的延迟;
  • 利用闪存优化Cosco的过程中用到的技术也可以用于Cosco之外的领域。

(一)Cosco产生背景

在Spark Shuffle中有Map Task和Reduce Task两种Task,每个Map Task都会生成Map Output Files,然后根据Partition进行分组,决定将文件写入本地磁盘还是分布式文件系统中;所有Map Task执行完毕之后,Reduce Task就会去读取Map Output Files中某个分区的数据,将其合并成某个大的Partition,在必要的时候还会进行排序。在上面的过程中,主要会存在两个与I/O性能相关的问题:

(1)Write amplification problem

如下图,这个问题也就说在最坏的情况下,同一个shuffle产生的字节会被写入磁盘三次:

  • 第一次是在Map Task产生shuffle data的过程中如果内存不足,会先把Shuffle Data Spill到磁盘;
  • 第二次是写入Map Output Files的过程;
  • 第三次是在Reduce Task中,如果进行sort的时候内存不够,也会先Spill到磁盘。

image.png

(2)small IOs problem

在Spark Shuffle模型中,其I/O请求总数会有M x R个,而在生产中观察到的I/O请求的size平均为200kb,相对于磁盘来说是非常小的,当整个作业的并行度提升之后会产生大量的小I/O请求,会急剧增加磁盘开销,比如寻址时间等,从而导致Shuffle的性能变差。

image.png

基于上述问题,Cosco应运而生,其工作原理如下图所示。相较于原生的Spark每个Task生成一个自己的Map Output Files,Cosco允许不同的Map Task将同一个Partition写入到同一个内存缓存中,缓存到达一个阈值后,会将这部分数据Flush到分布式文件系统的文件中。这种情况下,同一个Partition可能产生多个对应的Flush文件,等到Reduce Task执行的时候,只需要读取HDFS系统中的文件即可,且文件的数据量在十几M的级别,且文件数量远小于之前的M x R数量级。因此,也就解决了小I/O的问题。

image.png

(二)用Flash替换内存缓冲

使用Flash来作为缓冲的话是通过追加写的方式将Shuffle的数据写入缓存中,之所以这么做是因为闪存的可擦写次数是有限的,追加写可以延长闪存的寿命。闪存相对于内存存在着一定的延迟,但是总体而言这个延迟相对于整个shuffle在Cosco中缓存的时间是可以忽略不计的。

image.png

现在存在一个如何选择的问题,比如在1GB的内存和每天能够写100GB的闪存之间让我们用来部署集群,我们如何抉择呢?这里有一条不精确的经验:1GB的内存和每天能够写100GB的闪存这两种选择的效果是一样的,也就是说能够支撑同样的场景,但是内存比闪存需要更多的能耗。大家在部署集群的时候可以根据这条经验来实际操作,选择最优的配置。

(三)基于内存和闪存混合的缓存优化

基于内存和闪存混合的缓存优化技术主要有两种:

  • 第一种是优先缓存内存,当内存达到一定阈值之后再Flush到闪存;
  • 第二种是利用partition加载速度不一样的特性,对于加载速度快的partition用内存缓存,对于加载速度慢的partition用闪存缓存。

(1)第一种

第一种优化技术利用了Shuffle数据随时间变化的特性,如下图所示,我们发现Shuffle数据随时间变化的统计中,峰值情况是占据小部分的,于是我们用闪存来处理峰值情况,最终只用250GB的内存和每天能写25TB的闪存就能达到和原来一样的效果,这样就实现了Cosco的优势,用更少的硬件资源来支撑了同样的场景。这种混合存储的优化技术比之纯用内存有着更强的伸缩性,不会在某些特殊情况下造成系统崩溃。比如单纯用内存的时候,如果出现内存不足就会崩溃,但是混合使用的时候就可以用闪存来处理异常情况,避免造成严重后果。

image.png

原来的Cosco集群有负载均衡的逻辑,更了获得更好的效果,我们使用插件的方式将闪存的优化集成到负载均衡逻辑中,如下图所示,引入一个阈值,当Shuffle Service内存不够的时候,就会利用阈值来进行判断是否将数据Flush到闪存中,这样有两个好处:

  • 实现简单;
  • 便于对集群的性能做评估。

image.png

总的来说,第一种优化技术有如下特点:

  • 利用了Shuffle Data的分布随时间变化而变化的特性;
  • 采用了优先在内存内进行缓存的策略;
  • 巧妙的适配了原来的负载均衡逻辑。

(2)第二种

第二种是利用partition加载速度不一样的特性,对于加载速度快的partition用内存缓存,对于加载速度慢的partition用闪存缓存。其策略主要是周期性的检测Partition的加载速率,当速率小于某个阈值的时候,就使用闪存来缓存,当速率大于某个阈值的时候就使用内存来缓存。从下图中可以看出,在实际生产中大多数Partition的加载速度是比较慢的,少部分加载速度比较快,加载速度比较慢的Partition占用了少部分内存,造成内存的低使用率,因此我们用闪存来承载这些Partition,达到优化的目的。

image.png

二、未来工作

(一)低延迟查询

引入闪存之后,我们可以让Reduce Task直接从闪存中读取缓存的数据,而不是从HDFS中的文件读取数据,这样子提高了数据的读取速率。另外,在引入闪存之后,Shuffle的数据块会变得更大,在Reduce端合并数据块的次数会变少,让整个查询变得更快。

(二)性能提升

当前,Cosco为了保证容错性,每一份Shuffle数据在写入持久化的文件之前,会在不同节点的Shuffle Service保存两份数据。如果我们引入闪存之后,因为闪存具有不易失性,这样子在Shuffle Service在恢复之后可以从闪存恢复数据,减少了拷贝的副本。另外,在引入闪存之后,数据块变得更大,在整个DFS上的读写也会更加高效。

三、性能评估技术

在上述的优化过程中,主要有如下四种类型的评估技术:

  • Discrete event simulation
  • Synthetic load generation on a test cluster
  • Shadow testing on a test cluster
  • Special canary in a production cluster

(1)Discrete event simulation

Discrete event simulation,也就是离散时间模拟的方法,是一种比较通用的评估方法。我们把每个Shuffle Data到达闪存的行为作为一个离散事件,记录其到达的时间、此时闪存中写入的数据总量以及最后闪存被Flush到DFS文件的数据总量。最终我们会得到如下图所示统计表,包含了最终数据块的大小和缓存的时间,由此我们就可以推算出数据块的加载速率,也就是对应Partition的加载速率,并且把这个速率应用于上文中讲到的第二种优化技术来进行决策。

image.png

(2)Special canary in a production cluster

如果我们要在一个生产环境中来验证我们所进行的优化是否有效是比较困难的,因为在Cosco中一个Task可以与多个Shuffle Service进行通信,所以很难确定是因为加入了闪存提升了性能还是因为其他原因而提升。因此,我们将整个生产集群分成两个互不干扰的子集群,然后进行对比试验,比如对子集群A增加闪存优化,而子集群B保持原来的部署模式。之后,我们再对两个子集群进行评估,就可以得知增加闪存优化是否起到了优化效果。


关键词:Cosco、Shuffle、FLash闪存、Spark、缓存优化、负载均衡

获取更多 Spark+AI SUMMIT 精彩演讲视频回放,立刻点击前往:
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月4日第一场<<
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月5日第二场<<

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
2月前
|
分布式计算 监控 大数据
如何优化Spark中的shuffle操作?
【10月更文挑战第18天】
|
3月前
|
存储 分布式计算 监控
Spark如何优化?需要注意哪些方面?
【10月更文挑战第10天】Spark如何优化?需要注意哪些方面?
55 6
|
3月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
52 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
98 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
115 0
|
3月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
58 0
|
5月前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
|
5月前
|
大数据 RDMA
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
57 0
|
分布式计算 资源调度 算法
Spark提交参数说明和常见优化
打开微信扫一扫,关注微信公众号【数据与算法联盟】 转载请注明出处:http://blog.csdn.net/gamer_gyt 博主微博:http://weibo.com/234654758 Github:https://github.com/thinkgamer 最近在搞一个价格分类模型,虽说是分类,用的是kmeans算法,求出聚类中心,对每个价格进行级别定级。
1329 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
186 2
ClickHouse与大数据生态集成:Spark & Flink 实战