DATA AI Summit 2022提及到的对 aggregate 的优化

简介: DATA AI Summit 2022提及到的对 aggregate 的优化

背景


本文基于SPARK 3.3.0


HashAggregate的优化


该优化是FaceBook(Meta) 内部的优化,还有合并到spark社区。

该优化的主要是partialaggregate的部分:对于类似求count,sum,Avg的聚合操作,会存在现在mapper进行部分聚合的操作,之后在reduce端,再进行FinalAggregate操作。这看起来是没有问题的(能够很好的减少网络IO),但是我们知道对于聚合操作,我们会进行数据的spill的操作,如果在mapper阶段合并的数据很少,以至于抵消不了网络IO带来的消耗的话,这无疑会给任务带来损耗。

874a8e4b1f2049e5b90b054e4a2abd1e.png

964d47901ff14cab9ac60e63955e30c8.png

4f02bf94d848458ebd2f0508d8ba3242.png

image.png


利用运行时的指标信息,能够达到比较好的加速效果。

7af2035a744d4819ba269e2200decc8f.png


ObjectHashAggregate的优化


对于ObjectHashAggreate的原理,可以参考深入理解SPARK SQL 中HashAggregateExec和ObjectHashAggregateExec以及UnsafeRow,该文可以比较清楚的解释ObjectHashAggregate和HashAggregate的区别:


ObjectHashAggregate能够弥补HashAggregate 不能支持collect_set等这种表达式,从而不会转变为SortAggregate

ObjectHashAggregate利用的是java Array对象(SpecificInternalRow)保存聚合的中间缓冲区,这对jvm gc是不太友好的

ObjectHashAggregate根据hashMap的size(默认是128),而不是输入的行数来进行spill,这会导致提前spill,内存利用率不高。

由于提前的spill,ObjectHashAggregate会对剩下的所有数据做额外的一次排序操作(如果没有spill,就不需要额外的sort操作),而HashAggregate则是会对每次需要spill的数据做排序


使用jvm heap的内存使用情况以及处理的行数来指导什么时候开始spill。

但是这种在数据倾斜的情况下,会增加OOM的风险。


SortAggregate优化


目前SortAggreaget的现状是:


每个任务在sort Aggreate前需要按照key进行排序

根据排序的结果,在相邻的行之间进行聚合操作

不同于Hash Aggregate:

不需要hashTable,也就不存在内存溢写和回退到sortAggregate

优化器更喜欢选择hashAggregate

没有codegen的实现.

目前在spark 3.3.0增加的功能:


如果数据是有序的话,会选择用sortAggragate替代HashAggregate

通过物理计划Rule ReplaceHashWithSortAgg 来做替换,当然通过spark.sql.execution.replaceHashWithSortAgg来开启(默认是关闭的),因为对于任何新特性,在release版本默认都是关闭的,在master分支才是开启的

支持sortAggretate(without keys)的codegen代码生成


其他


对于Aggregate更多的细节了解可以参考sparksql源码系列 | 一文搞懂with one count distinct 执行原理

相关文章
|
2月前
|
机器学习/深度学习 人工智能 分布式计算
人工智能平台PAI产品使用合集之int类型是否可以为raw feature
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
2月前
|
机器学习/深度学习 人工智能 JSON
人工智能平台PAI操作报错合集之带有all reduce 的算子是trace不出来的,结果会错,怎么才可以绕过去
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
3月前
|
机器学习/深度学习 人工智能 API
人工智能平台PAI产品使用合集之机器学习PAI中的sample_weight怎么加在样本中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
3月前
|
机器学习/深度学习 人工智能 运维
人工智能平台PAI产品使用合集之机器学习PAI可以通过再建一个done分区或者使用instance.status来进行部署吗
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
3月前
|
机器学习/深度学习 人工智能 PyTorch
人工智能平台PAI 操作报错合集之机器学习PAI,用Triton Inference Server 22.05 部署模型,遇到SaveV3这个op的问题,如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
3月前
|
人工智能
极智AI | 讲解TensorRT Fully Connected算子
大家好,我是极智视界,本文讲解一下 TensorRT Fully Connected 算子。
87 0
|
机器学习/深度学习 Web App开发 分布式计算
继Spark之后,UC Berkeley 推出新一代高性能深度学习引擎——Ray(1)
继Spark之后,UC Berkeley 推出新一代高性能深度学习引擎——Ray(1)
364 0
继Spark之后,UC Berkeley 推出新一代高性能深度学习引擎——Ray(1)
|
SQL 存储 机器学习/深度学习
2023 Databricks Data+AI Summit:All in AI
Databricks Data+AI Summit 7月初在旧金山召开,整个发布会看下来,最大的感受就是All in AI和All in One。
989 3
|
存储 Web App开发 机器学习/深度学习
继Spark之后,UC Berkeley 推出新一代高性能深度学习引擎——Ray(2)
继Spark之后,UC Berkeley 推出新一代高性能深度学习引擎——Ray(2)
311 0
继Spark之后,UC Berkeley 推出新一代高性能深度学习引擎——Ray(2)
|
SQL 人工智能 分布式计算
DATA AI Summit 2022提及到的对 aggregate 的优化
DATA AI Summit 2022提及到的对 aggregate 的优化
216 0
DATA AI Summit 2022提及到的对 aggregate 的优化