Apache Spark 3.0中的SQL性能改进概览

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 阿里巴巴高级技术专家李呈祥为大家带来Apache Spark 3.0中的SQL性能改进概览的介绍。以下由Spark+AI Summit中文精华版峰会的精彩内容整理。

今天主要跟大家分享一下spark 3.0在SQL方向上的一些优化工作。从spark 2.4开始,大概有超过一年半的时间。对于一个比较活跃的开源项目来说,这个时间是非常长的。所以里面包含了大量的这种功能增强,性能优化,等各方面的新的feature在里面。大概超过50%的相关的issue都是和SQL相关的。在SQL这个方向上主要做的工作,大概分成四个方面。第一方面是工具类的。就是说基于spark的一个开发者怎么去和spark交互,提供一些更多的工具。第二个是dynamic optimization。简单来说就是运行时的优化。在这里面,包含了几个重大的性能改进。第三个是在spark的catalyst优化器方面有很多新的改进。第四个是基础依赖的更新。主要在语言层面引入了一些新的支持和依赖。

image.png

Spark 3.0是一个时间跨度非常长的release,包含了非常多的社区的工作。统计下来有接近3400多个issue在spark 3.0里面进行了处理。针对这么多的issue,我们用spark 3.0的时候,需要考虑有哪些东西对于实际的生产环境可能有好处,有哪些新的特性。

image.png

总结下来,大概可以把在SQL方向上的这种大的改动分成七个部分,分属于上文中提到的四个类别。

image.png

第一部分是new explain format。当我们想去改进,去优化一个spark SQL的性能的时候,首先需要去了解SQL的查询计划大概是一个什么样子,有针对性的去进行这种SQL的重写,或其他的一些改进。前提就是我的查询计划可读性比较强,是非常容易去看的。

image.png

对于之前2.4的版本,可以通过explain SQL去展示。只不过是这种展示的方式看起来繁杂一点。我们可以看到针对于SQL,这么一个物理查询计划,是一个树状的结构。也是可以去看的,但是可读性相对来说不够好。

image.png

在3.0里面,针对查询计划的这种展示进行了一定的优化,以简要的格式展示。根据节点的编号,可以找到对应的更详细的信息。而且对于每一个节点展示的信息也做了一些归类和整理,整理成input,output,condition等。通过这种方式,用户可以更加清晰的看到整个的查询计划。

image.png

第二部分是all type of join hints。在spark 2.4只支持broadcast。而spark 3.0除了支持broadcast,还支持sort merge,shuffle hash和cartesian。

image.png

第三部分是adaptive query execution。社区为什么要去做它,最主要的原因就是说,对于一些查询计划,在运行时能够拿到更准确的数据统计信息,可以选择最优的这种计划,对数据进行处理,从而提升spark处理数据的性能。主要包括三种场景。第一种是调整reducer的数量,从而避免额外的内存和IO的开销。第二种是说,选择最合适的join的策略。第三种是说,针对倾斜数据,在join的时候提供更好的处理方式。上述场景都是自动的,根据运行时的情况,自动地收集相关的信息,然后去做判断。

image.png

怎么去动态的调整reducer的数量。在spark 2.4,默认指定partition数量,每一个partition经过shuffle之后,对应的要处理的数据的大小可能是不一样的。这是由数据本身的特性来决定的,它的分布可能本来就是不均衡的。

image.png

在spark 3.0中,在shuffle的时候,每一个partition有不同的数据量大小,需要把小的partition数据进行合并,给同一个reducer去处理,从而使得每一个reducer它所处理的数据量大小是相近的。

image.png

针对有数据倾斜的这种join,在spark 2.4中带来的主要的问题就是说,在处理最大的partition时,要花费很长的时间,影响整个join。

image.png

在spark 3.0中,有数据倾斜的join,比在spark 2.4中更快。如图所示,对于表A和表B,我把大表的数据做切分,小表的数据做全量的分发。第一个,满足join的语义要求。第二个,在倾斜的这些key上面,它是被切成多分,然后在多个task里面去处理。

image.png

第四部分是dynamic partitioning pruning。在join操作中,要避免读取不必要的partition。而dynamic filter能够避免读取不必要的partition。

image.png

如下图所示,在spark 2.4中,大表中的所有数据都被读取。

image.png

而在spark 3.0中,通过pushdown with dynamic filter,能够减少大表中需要被读取的数据量。

image.png

如下图所示,是一个dynamic partitioning pruning的例子。

image.png

第五部分是Enhanced nested column pruning & pushdown,是针对于这种嵌套的数据结构的支持。在spark 2.4里面,其实已经提供了部分的这种支持。如下图所示的表里面,有column 1和column 2,而后者是一个嵌套的数据结构,它里面有两个字段。比如说,我查询的时候只查了column 2里面的第1个字段。去访问这个数据的时候,我只需要把column 2的第1个字段拿出来就行了,而不需要把整个column 2都拿出来。但是在spark 2.4里面它的支持是有限的。就是说,只能穿透有限的几个算子,比如说LIMIT这种算子,对于其他的一些算子是没办法的。

image.png

而在spark 3.0里面,对这一块进行了进一步的优化,能够支持把column pruning推到穿透所有的算子。

image.png

另外一种场景,就是说filter过滤的条件是根据嵌套字段里面的某一个子字段去做过滤,是不是支持把过滤条件也推到table scan里面。在spark 2.4里面也是不能够完全支持的。

image.png

而在spark 3.0里面,针对嵌套字段的filter,也是一直可以往下推到具体访问数据的table scan里面。

image.png

第六部分是Improved aggregation code generation,针对aggregation扩件的一个优化。

image.png

就是说,在spark里面我们去支持这种扩件,但是扩件会有一个限制。针对每个方法,如果大于8000 Java bytecode,HotSpot编译器就rollback,放弃生成native code。所以,如果你的这种SQL比较复杂,可能会没办法利用到扩件的这种特性。

image.png

在spark 3.0里面,针对这种情况做一些优化。简单来说,把一个方法拆分成多个方法,从而避免碰到8000 Java bytecode的限制。

image.png

具体的例子如下图所示。

image.png

第七部分是New Scala and Java,针对新的语言版本的支持。支持了新的Java 11这个版本,以及Scala 2.12版本。

image.png


关键词:Spark 3.0,SQL性能改进,Interactions with developers,Dynamic optimizations,Catalyst improvements,Infrastructure updates

获取更多 Spark+AI SUMMIT 精彩演讲视频回放,立刻点击前往:
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月4日第一场<<
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月5日第二场<<

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
87 4
|
2月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
62 1
|
1月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
75 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
88 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
65 0
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
78 0
|
2月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
49 0
|
11天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
291 33
The Past, Present and Future of Apache Flink
|
2月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
845 13
Apache Flink 2.0-preview released

推荐镜像

更多