大数据 Shuffle 原理与实践|青训营笔记

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 本文包括:1.shuffle概述;2.spark中的shuffle算子的基本特性;3.spark中的shuffle的过程;4.push shuffle的原理与实现

课程资料

课程视频:https://live.juejin.cn/4354/yc_Shuffle

课程PPT:https://bytedance.feishu.cn/file/boxcnQaV9uaxTp4xF0d1vEK5W3c

学员手册:https://juejin.cn/post/7123908203590451207/#heading-46

完整手册:https://bytedance.feishu.cn/docx/doxcnECGEFkCKYqbxaDipK1qrVf

一、shuffle概述

1.1 MapReduce概述

image-20220731125607821

  • map阶段:在单机上进行的针对一小块数据的计算过程
  • shuffle阶段:在map的基础上,进行数据移动,为后续的reduce阶段做准备
  • reduce阶段:对移动后的数据进行处理,依然是在单机上处理一小份数据

1.2 为什么shuffle对性能非常重要

  • M * R次网络连接
  • 大量的数据移动
  • 数据丢失风险
  • 可能存在大量的排序操作
  • 大量的数据序列化、反序列化操作
  • 数据压缩

二、Shuffle算子

2.1 shuffle算子概述

  • Spark中会产生的算子大概可以分为四类

image-20220731130426277

  • 算子使用例子

image-20220731130522855

2.2 shuffle算子构造

算子内部依赖关系

  • ShuffleDependency

    • CoGroupedRDD

      • Cogroup

        • fullOuterJoin、rightOuterJoin、leftOuterJoin
        • join
    • ShuffledRDD

      • combileByKeyWithClassTag

        • combineByKey
        • reduceByKey
      • Coalesce
      • sortByKey

        • sortBy

Shuffle Dependency 构造

创建会产生shuffle的RDD时,RDD会创建Shuffle Dependency来描述Shuffle相关的信息
  • A single key-value pair RDD, i.e. RDD[Product2[K, V]],
  • Partitioner (available as partitioner property),
  • Serializer,
  • Optional key ordering (of Scala’s scala.math.Ordering type),
  • Optional Aggregator,
  • mapSideCombine flag which is disabled (i.e. false) by default.
构造Partitioner

用来将record映射到具体的partition的方法(partition指的是map映射之后的多个数据存储文件

image-20220731131953598

Aggregator
  • 在map侧合并部分record的函数
  • 接口

    • createCombiner:只有一个value的时候初始化的方法
    • mergeValue:合并一个value到Aggregator中
    • mergeCombiners:合并两个Aggregator

三、Shuffle过程

3.1 spark中的shuffle变迁

HashShuffle

  • 优点:不需要排序
  • 缺点:打开,创建的文件过多(每个partition会映射到一个独立的文件)

image-20220731132925960

SortShuffle

  • 优点:打开的文件少、支持map-side combine(每个task生成一个包含所有partition数据的文件)
  • 缺点:需要排序

image-20220731133011075

  • 每个reduce task分别获取所有map task生成的属于自己的片段

image-20220731133249544

TungstenSortShuffle

  • 优点:更快的排序效率,更高的内存利用效率
  • 缺点:不支持map-side combine

3.2 Register Shuffle

  • 由action算子触发DAG Scheduler进行shuffle register
  • Shuffle Register会根据不同的条件决定注册不同的ShuffleHandle

image-20220731153631037

  • 三种ShuffleHandle对应了三种不同的ShuffleWriter的实现

    • BypassMergeSortShuffleWriter:HashShuffle
    • UnsafeShuffleWriter:TunstonShuffle
    • SortSHuffleWriter:SortShuffle

image-20220731153719714

3.3 ShuffleReader网络请求流程

image-20220731154145640

  • 使用netty作为网络框架提供网络服务,并接受reducetask的fetch请求
  • 首先发起openBlocks请求获得streamId,然后再处理stream或者chunk请求
  • Reader的实现—ShuffleBlockFetchIterator

    • 区分local和remote节省网络消耗
    • 防止OOM

      • maxBytesInFlight
      • maxReqsInFlight
      • maxBlocksInFlightPerAddress
      • maxReqSizeShuffleToMem
      • maxAttemptsOnNettyOOM
  • External Shuffle Service

image-20220731154613075

为了解决Executor为了服务数据的fetch请求导致无法退出问题,我们在每个节点上部署一个External Shuffle Service,这样产生数据的Executor在不需要继续处理任务时,可以随意退出。

3.4 Shuffle的问题以及优化

常见问题

  • 数据存储在本地磁盘,没有备份
  • IO并发:大量RPC请求(M*R)
  • IO吞吐:随机读、写放大(3M)
  • GC频繁,影响NodeManager

优化1. Zero Copy

减少了文件拷配次&程序在拷贝过程中涉及到的用户态和内核态的切换,将文件缓冲区的数据直接输出到目标Channel

image-20220731155101752

Netty 零拷贝

  • 可堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
  • CompositeByteBuf 、 Unpooled.wrappedBuffer、 ByteBuf.slice ,可以合并、包装、切分数组,避免发生内存拷贝
  • Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝

优化2. map-side预聚合算子

image-20220731155443008

优化3. 倾斜优化

  • 方式一:提高并行度

    • 优点:足够简单
    • 缺点:只缓解、不根治

image-20220731155744813

  • 方式二:Spark AEQ Skew Join

AEQ根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后进行join(会有重复出现)

image-20220731160002283

优化4. 参数优化

image-20220731160155971

四、Push Shuffle

4.1 为何需要

  • Avg IO size太小,造成了大量的随机IO,严重影响磁盘的吞吐
  • M * R次读请求,造成大量的网络连接,影响稳定性

image-20220731160832325

4.2 Magnet实现原理

  • Spark driver组件,协调整体的shuffle操作
  • map任务的shuffle writer过程完成后,增加了一个额外的操作push-merge,将数据复制一份到原创的shuffle服务上
  • magnet shuffle service是一个强化版的ESS,将隶属于同一个shuffle partition的block,会在远程传输到magnet后被merge到一个文件中
  • reduce任务从magnet shuffle service接受合并好的shuffle数据

image-20220731160938209

4.3 Magnet 可靠性

  • 如果Map task输出的Block没有成功Push到magnet上,并且反复重试仍然失败,则reduce task直接从ESS上拉取原始的block数据
  • 如果magnet上的block因为重复或者冲突等原因,没有正常完成merge的过程,则reduce task直接拉取未完成的merge的block
  • 如果reduce拉取已经merge好的block失败,则直接拉取merge前的原始block
  • 本质上,magnet中维护了两份shuffle数据的副本(有极小风险,但是选择接受)
  • 下方是Cloud Shuffle Service的写入和读取流程

image-20220731162109120

  • Cloud Shuffle Service 支持AQE

image-20220731162257976

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
2月前
|
数据采集 数据可视化 大数据
Python在大数据处理中的应用实践
Python在大数据处理中扮演重要角色,借助`requests`和`BeautifulSoup`抓取数据,`pandas`进行清洗预处理,面对大规模数据时,`Dask`提供分布式处理能力,而`matplotlib`和`seaborn`则助力数据可视化。通过这些工具,数据工程师和科学家能高效地管理、分析和展示海量数据。
75 4
|
1月前
|
数据采集 运维 Cloud Native
Flink+Paimon在阿里云大数据云原生运维数仓的实践
构建实时云原生运维数仓以提升大数据集群的运维能力,采用 Flink+Paimon 方案,解决资源审计、拓扑及趋势分析需求。
18415 54
Flink+Paimon在阿里云大数据云原生运维数仓的实践
|
1月前
|
存储 机器学习/深度学习 大数据
参与开源大数据Workshop·杭州站,共探企业湖仓演进实践
Apache Flink 诚邀您参加 7 月 27 日在杭州举办的阿里云开源大数据 Workshop,了解流式湖仓、湖仓一体架构的最近演进方向,共探企业云上湖仓实践案例。
147 12
参与开源大数据Workshop·杭州站,共探企业湖仓演进实践
|
9天前
|
SQL 监控 大数据
"解锁实时大数据处理新境界:Google Dataflow——构建高效、可扩展的实时数据管道实践"
【8月更文挑战第10天】随着大数据时代的发展,企业急需高效处理数据以实现即时响应。Google Dataflow作为Google Cloud Platform的强大服务,提供了一个完全托管的流处理与批处理方案。它采用Apache Beam编程模型,支持自动扩展、高可用性,并能与GCP服务无缝集成。例如,电商平台可通过Dataflow实时分析用户行为日志:首先利用Pub/Sub收集数据;接着构建管道处理并分析这些日志;最后将结果输出至BigQuery。Dataflow因此成为构建实时数据处理系统的理想选择,助力企业快速响应业务需求。
31 6
|
17天前
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
180 3
|
22天前
|
存储 搜索推荐 数据建模
阿里巴巴大数据实践之数据建模:构建企业级数据湖
阿里巴巴通过构建高效的数据湖和实施先进的数据建模策略,实现了数据驱动的业务增长。这些实践不仅提升了内部运营效率,也为客户提供了更好的服务体验。随着数据量的不断增长和技术的不断创新,阿里巴巴将持续优化其数据建模方法,以适应未来的变化和发展。
|
28天前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【7月更文挑战第22天】在大数据领域,Python算法效率至关重要。本文深入解析时间与空间复杂度,用大O表示法衡量执行时间和存储需求。通过冒泡排序(O(n^2)时间,O(1)空间)与快速排序(平均O(n log n)时间,O(log n)空间)实例,展示Python代码实现与复杂度分析。策略包括算法适配、分治法应用及空间换取时间优化。掌握这些,可提升大数据处理能力,持续学习实践是关键。
33 1
|
22天前
|
分布式计算 关系型数据库 数据处理
美柚与MaxCompute的数据同步架构设计与实践
数据处理与分析 一旦数据同步到MaxCompute后,就可以使用MaxCompute SQL或者MapReduce进行复杂的数据处理和分析。
|
22天前
|
分布式计算 运维 大数据
混合云模式下 MaxCompute + Hadoop 混搭大数据架构实践。
除了资源效率和成本的优势外,混合云模式还为斗鱼带来了可量化的成本、增值服务以及额外的专业服务。阿里云的专业团队可以为斗鱼提供技术咨询和解决方案,帮助斗鱼解决业务难题。此外,计算资源的可量化也使得斗鱼能够清晰地了解资源使用情况,为业务决策提供依据。
|
26天前
|
SQL 分布式计算 监控
ODPS SQL问题之在ODPS SQL中,Shuffle的概念是什么
ODPS SQL问题之在ODPS SQL中,Shuffle的概念是什么

热门文章

最新文章