大数据Shuffle原理与实践

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Shuffle原理与实践

💨Shuffle概述

🎈在开源实现的MapReduce中,存在Map、 Shuffle、 Reduce三个阶段。

Shuffle过程是MapReduce的核心。

Map阶段:是在单机上进行的针对-一小块数据的计算过程。Shuffle阶段:在map阶段的基础,上,进行数据移动,为后续的reduce阶段做准备。reduce阶段:对移动后的数据进行处理,依然是在单机上处理一小份数据。

🎈为什么需要Shuffle?

在分布式计算框架中,数据本地化是一个很重要的考虑,即计算需要被分发到数据所在的位置,从而减少数据的移动,提高运行效率。 hadoop中,map负责数据的初级拆分获取解析, reduce负责最终数据的集总,除了业务逻辑的功能外,其他的核心数据处理都是由shuffle来支持。

🎈Shuffle有什么?

简单来说,shuffle中有三次的数据排序

🚩第一次是 快速排序,这是因为第一次的数据全部在内存中开辟了一个缓冲区,数据从map出来后,分批进入缓冲区,对它们的索引进行排序,并且按照map的逻辑进行分区,在出缓冲区落盘的时候,完成排序。🚩第二次是归并排序,将第一次分批出来的文件进行区内归并排序。🚩第三次也是归并排序,将所有的map Task第二次产生的文件进行区内归并排序

这三次可以看做是一个整体的过程,从这里应该可以看出,shuffle是一个比较耗费资源并且时间开销比较大的环节。

🎈为什么Shuffle对性能非常重要?

🚩M * R次网络连接🚩大量的数据移动🚩数据丢失风险🚩可能存在大量的排序操作🚩大量的数据序列化、反序列化操作🚩数据压缩


🍳在大数据场景下,数据shuffle表示了不同分区数据交换的过程,不同的shufle策略性能差异较大。 目前在各个引擎中shuffle都是优化的重点,在spark框架中,shuffle 是支撑spark进行大规模复杂 数据处理的基石。


shuffle的数据来源于map,所以可以对map端出来的数据进行处理,我们可以采用压缩的方式尽量减少数据的规模。

💨Shuffle算子

💨分类

spark中会导致shuffle操作的有以下几种算子:

🚩repartition类的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等

🚩byKey类的操作:比如reduceByKey、groupByKey、sortByKey等

🚩join类的操作:比如join、cogroup等

🚩Distinct类的操作:distinct

💨Spark中对shuffle的抽象

窄依赖:父RDD的每个分片至多被子RDD中的一个分片所依赖

宽依赖:父RDD中的分片可能被子RDD中的多个分片所依赖

💨算子内部的依赖关系

ShuffleDependency:

🍳CoGroupedRDD含Cogroup

🚩fullOuterJoin、rightOuterJoin、 leftOuterJoin🚩join

🍳ShuffledRDD

🚩combineByKeyWithClassTag       combineByKey       reduceByKey🚩Coalesce🚩sortByKey            sortBy

💨Shuffle过程

💨Write

Spark中需要Shuffle 输出的Map任务会为每个Reduce创建对应的bucket,Map产生的结果会根据设置的partitioner得到对应的bucketId,然后填 充到相应的bucket中去。每个Map的输出结果可能包含所有的Reduce所需要的数据,所以每个Map会创建R个bucket(R是reduce的 个数),M个Map总共会创建M*R个bucket。

💨Fether

Reduce去拖Map的输出数据,Spark提供了两套不同的拉取数据框架:通过socket连接去取数据;使用netty框架去取数据。Spark Map输出的数据没有经过排序,Spark Shuffle过来的数据也不会进行排序,Spark认为Shuffle过程中的排序不是必须的,并不是所有类型的Reduce需要的数据都需要排序,强 制地进行排序只会增加Shuffle的负担。educe拖过来的数据会放在一个HashMap中,HashMap中存储的也是对,key是Map输出的key,Map输出对应这个key的所有value组成HashMap的value。Spark将 Shuffle取过来的每一个对插入或者更新到HashMap中,来一个处理一个。HashMap全部放在内存中。

💨Shuffle Handle创建

🎈Register Shuffle时做的最重要的事情是根据不同条件创建不同的shuffle Handle

🎈Shuffle Handle与Shuffle Writer的对应关系

BypassMergeSortShuffleHandle——>BypassMergeSortShuffleWriter

SerializedShuffleHandle——>UnsafeShuffleWriter

BaseShuffleHandle——>SortShuffleWriter

💨Reader实现-网络时序图

🎈使用基于netty的网络通信框架

🎈位置信息记录在MapOutputTracker中

🎈主要会发送两种类型的请求

    🚩OpenBlocks请求

    🚩Chunk请求或Stream请求

💨Shuffle优化使用的技术: Netty Zero Copy

🚩可堆外内存,避免JVM堆内存到堆外内存的数据拷贝。🚩CompositeByteBuf、Unpooled.wrappedBuffer. ByteBuf.slice,可以合并、包装、切分数组,避免发生内存拷贝🚩Netty使用FileRegion实现文件传输,FileRegion底层封装了FileChannel#transferTo()方法,可以将文件缓冲区的数据直接传输到目标Channel, 避免内核缓冲区和用户态缓冲区之间的数据拷贝


在第一次排序之后,此时由于原数据中各个字段可能会有数据分布不均,这样会导致reduce端处理数据时的数据倾斜——各个Task的处理量相差悬殊,可以在此处进行初步的数据合并处理。

应用场景:

去重操作;聚合,byKey类操作;排序操作等

💨常见问题

🚩数据存储在本地磁盘,没有备份

🚩I0并发:大量RPC请求(M*R)

🚩I0吞吐:随机读、写放大(3X)

🚩GC频繁,影响NodeManager

💨Shuffle优化

🚩避免shuffle,使用broadcast替代join

🚩使用可以map-side预聚合的算子


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
25天前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
1月前
|
边缘计算 人工智能 搜索推荐
大数据与零售业:精准营销的实践
【10月更文挑战第31天】在信息化社会,大数据技术正成为推动零售业革新的重要驱动力。本文探讨了大数据在零售业中的应用,包括客户细分、个性化推荐、动态定价、营销自动化、预测性分析、忠诚度管理和社交网络洞察等方面,通过实际案例展示了大数据如何帮助商家洞悉消费者行为,优化决策,实现精准营销。同时,文章也讨论了大数据面临的挑战和未来展望。
|
2月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
69 3
|
2月前
|
消息中间件 分布式计算 druid
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
65 2
|
2月前
|
SQL 消息中间件 分布式计算
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
99 0
|
2月前
|
SQL 大数据
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(二)
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(二)
70 0
|
2月前
|
SQL 消息中间件 分布式计算
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
78 0
|
2月前
|
SQL 大数据
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(二)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(二)
87 0
|
1月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
322 7
|
1月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
49 2