EMR Spark Runtime Filter性能优化

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
简介: Join是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描/shuffle/Join等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等),在同样的资源的情况下可以支撑更多的查询。

背景

Join是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描/shuffle/Join等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等),在同样的资源的情况下可以支撑更多的查询。

目前在SparkSQL中有Filter下推优化,包括两个维度:

生成Filter

SparkSQL会从用户的SQL语句中获取到Filter

  • 直接显示获取

    select * from A where a=1
    AI 代码解读

    生成Filter(a=1) on A

  • 隐式推断

    select * from A, B where A.a = B.b and A.a=1
    AI 代码解读

    推断出Filter(b=1) on B

Filter优化

利用生成的Filter算子可以优化,比如:

  • 将Filter尽量下推到靠近DataSource端
  • 如果Filter中的列是分区列,可以提前对DataSource进行分区裁剪,只扫描需要的分区数据

Runtime Filter是针对Equi-Join场景提出的一种新的生成Filter的方式,通过动态获取Filter内容来做相关优化。

Runtime Filter原理

优化对象

Equi Join, 形如

select x,y from A join B on A.a = B.b
AI 代码解读

其中A是一个小表(如维表),B是一个大表(如事实表)
备注: A/B也可以是一个简单的子查询

优化思路

如上述小表A和大表B进行Join,Join条件为A.a=B.b,实际Join过程中需要对大表进行全表扫描才能完成Join操作,极端情况下如A.a仅仅只有一条记录,也需要对B表全表扫描,影响性能。

如果在B表扫描之前,能获取A表的a的相关信息(如所有的a值,或者a的min/max/Bloomfilter等统计信息),并在实际执行Join之前将这些信息对B表的数据进行过滤,而不是全表扫描,可以大大提高性能。

两种场景

根据大表B参与join的key(b)的属性,可以分别采集小表A参与join的key(a)的信息:

b是分区列

如上b为大表B的一个分区列,则可以提前收集A.a列的所有值,然后利用A.a的值对B表的b列进行分区裁剪

b不是分区列

不能做分区裁剪,只能在实际数据扫描的过程中进行过滤。可以提前收集A.a列的min/max/Bloomfilter的统计信息,然后利用这些统计信息对B表进行数据过滤,这个过滤又可以分成两种粒度:

  • 可下推到存储层,减少数据扫描
    如底层文件格式是Parquet/ORC, 可以将相关过滤谓词(min/max等)下推到存储层面,从而减少实际扫描的数据。
  • 扫描后数据过滤
    不能下推到存储层的,可以在数据被扫描后做条件过滤,减少后续参与计算的数据量(如shuffle/join等)

image

Runtime Filter实现

Runtime Filter的实现主要在Catalyst中,分为4个步骤:

谓词合成

在用户SQL生成的逻辑执行计划树(logical plan)中,寻找满足条件的Equi-Join节点,然后根据上面的思路,在Join的大表B侧插入一个新的Filter节点,如Filter(In(b, Seq(DynamicValue(a, A))), B)

谓词下推

上面生成的新的Filter会经过PushDownPredicate的Rule,尽量下推靠近DataSource附近

物理执行计划生成

该阶段会将上面下推的Filter(In(b, Seq(DynamicValue(a, A))), B)转换成物理节点(FilterExec),根据上面两种场景会生成两种不同的FilterExec

  • b是分区列
    b是分区列,采集的是a列的所有值,如:

    case class DynamicPartitionPruneFilterExec(
      child: SparkPlan, collectors: Seq[(Expression, SparkPlan)])
      extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
    AI 代码解读

其中colletors就是用于采集信息的SparkPlan,因为要跑一个SQL来采集a列的所有值(select a from A group by a);
因为有可能会有多个分区列,所以这个地方是一个Seq.

  • b是非分区列
    b是非分区列,采集的是a列的min/max/bloomfilter统计信息,如

    case class DynamicMinMaxFilterExec(
      child: SparkPlan, collectors: Seq[(Expression, SparkPlan)])
    extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
    AI 代码解读

同理上面collectors也是用户采集信息的SparkPlan,如select min(a),max(a) from A

执行

在物理执行计划实际执行的过程中,会在DynamicPartitionPruneFilterExec/DynamicMinMaxFilterExec物理算子内先执行collectors获取到a列的相关信息,然后对底层B的执行计划进行改写,比如利用采集到的信息做分区裁剪/数据过滤等。

Runtime Filter性能测试

以TPC-DS 10TB的Query54为例:

Runtime Filter 关闭

​​image

Runtime Filter 打开

image
经过DynamicPartitionPruneFilter对catalog_sales的分区进行了裁剪,实际对表的扫描从14,327,953,968减少到136,107,053,然后经过min/max的过滤继续减少到135,564,763;另外Runtime Filter减少了大表的扫描,shuffle的数据量以及参加Join的数据量,所以对整个集群IO/网络/CPU有比较大的节省

总结

针对Equi-Join的场景,可以额外的采集小表侧的信息,然后在Join之前对大表进行分区裁剪或者扫描后过滤,从而提高查询性能,减少资源消耗。

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
打赏
0
0
0
0
1306
分享
相关文章
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
鹰角网络为应对游戏业务高频活动带来的数据潮汐、资源弹性及稳定性需求,采用阿里云 EMR Serverless Spark 替代原有架构。迁移后实现研发效率提升,支持业务快速发展、计算效率提升,增强SLA保障,稳定性提升,降低运维成本,并支撑全球化数据架构部署。
345 56
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
一体系数据平台的进化:基于阿里云 EMR Serverless Spark 的持续演进
本文介绍了一体系汽配供应链平台如何借助阿里云EMR Serverless Spark实现从传统Hadoop平台向云原生架构的迁移。通过融合高质量零部件供应与创新互联网科技,一体系利用EMR Serverless Spark和DataWorks构建高效数据分析体系,解决大规模数据处理瓶颈。方案涵盖实时数据集成、Lakehouse搭建、数仓分层设计及BI/ML应用支持,显著提升数据处理性能与业务响应速度,降低运维成本,为数字化转型奠定基础。最终实现研发效率提升、运维压力减轻,并推动AI技术深度整合,迈向智能化云原生数据平台。
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
462 58
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
276 15
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
500 58
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
377 3
阿里云 EMR Serverless Spark 版正式开启商业化
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
535 2
ClickHouse与大数据生态集成:Spark & Flink 实战
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
164 0

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等