EMR Spark Relational Cache的执行计划重写

本文涉及的产品
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 背景EMR Spark提供的Relational Cache功能,可以通过对数据模型进行预计算和高效地存储,加速Spark SQL,为客户实现利用Spark SQL对海量数据进行即时查询的目的。Relational Cache的工作原理类似物化视图,在用户提交SQL语句时对语句进行分析,并选出可用的预计算结果来加速查询。

背景

EMR Spark提供的Relational Cache功能,可以通过对数据模型进行预计算和高效地存储,加速Spark SQL,为客户实现利用Spark SQL对海量数据进行即时查询的目的。Relational Cache的工作原理类似物化视图,在用户提交SQL语句时对语句进行分析,并选出可用的预计算结果来加速查询。为了实现高效地预计算结果复用,我们构建的预计算缓存一般都较为通用,因此对于用户query,还需进行进一步的计算方能获得最终结果。因此,如何快速地找出匹配的缓存,并构建出准确的新执行计划,就显得尤为重要。

在Hive 3.x中支持的Materialized View,利用了Apache Calcite对执行计划进行重写。考虑到Spark SQL使用Catalyst进行执行计划优化,引入Calcite太重,因此EMR Spark中的Relational Cache实现了自己的Catalyst规则,用于重写执行计划。本文将介绍执行计划重写的相关内容。

执行计划重写

准备工作

Spark会把用户查询语句进行解析,依次转化为Unresolved Logical Plan(未绑定的逻辑计划)、Resolved Logical Plan(绑定的逻辑计划)、Optimized Logical Plan(优化的逻辑计划)、Physical Plan(物理计划)。其中,未优化的逻辑计划根据用户查询语句不同,会有较大区别,而Relational Cache作为优化的一部分,放在逻辑计划优化过程中也较为合适,因此我们拿到的用户查询计划会是优化中的逻辑计划。要与优化中的逻辑计划匹配,我们选择把这个重写过程放在Spark优化器比较靠后的步骤中,同时,预先将Relational Cache的逻辑计划进行解析,获得优化后的Cache计划,减小匹配时的复杂程度。这样,我们只需匹配做完了谓词下推、谓词合并等等优化之后的两个逻辑计划。

基本过程

在匹配时,我们希望能尽可能多得匹配计算和IO操作,因此,我们对目标计划进行前序遍历,依次进行匹配,尝试找到最多的匹配节点。而在判断两个节点是否匹配时,我们采用后序遍历的方式,希望尽快发现不匹配的情况,减少计划匹配的执行时间。然后我们会根据匹配结果,对计划进行重写,包括对于Cache数据进行进一步的Filter、Project、Sort甚至Aggregate等操作,使其与匹配节点完全等价,然后更新逻辑计划节点的引用绑定,无缝替换到逻辑计划中,这样就能轻松获得最终的重写后的计划。

Join匹配

Spark中的Join都是二元操作,而实际的Join顺序可能根据一些策略会有很大区别,因此对于Join节点,必须进行特殊处理。我们会首先将逻辑计划进行处理,根据缓存计划的Join顺序进行Join重排。这一步在树状匹配之前就进行了,避免不断重复Join重排带来的时间浪费。重排后的Join可以更大概率地被我们匹配到。

为了实现Cache的通用性,根据星型数据模型的特点,我们引入了Record Preserve的概念。这和传统数据库中的Primary Key/Foreign Key的关系较为类似,当有主键的表与非空外键指向的表在外键上进行Join时,记录的条数不会变化,不会膨胀某条记录,也不会丢失某条记录。PK/FK的语意在大数据处理框架中经常缺失,我们引入了新的DDL让用户自定义Record Preserve Join的关系。当用户定义A Inner Join B是对于A表Record Preserve时,我们也会把A Inner Join B和A的关系匹配起来。有了PK/FK的帮助,我们能匹配上的情况大大增加了,一个Relational Cache可以被更多看似区别巨大的查询共享,这可以很好的为用户节约额外的存储开销和预计算开销。

Aggregate匹配

一般的Aggregate匹配较为简单,而Spark支持的Grouping Set操作,会构建出Expand逻辑计划节点,相当于把一条记录转为多条,使用Grouping ID进行标记。由于Expand的子节点是所有Grouping的情况共用的,这里我们只对子节点进行一次匹配,再分别进行上面的Grouping属性和Aggregate属性的匹配。主要是验证目标聚合所需的属性或者聚合函数都能从某个Grouping ID对应的聚合结果中计算出来,比如粗粒度的Sum可以对细粒度的Sum进行二次Sum求和,而粗粒度的Count对细粒度的Count也应通过二次Sum求和,粗粒度的Average无法仅从细粒度的Average中还原出来等等。

计划重写

找出匹配的逻辑计划之后,就是重写逻辑计划的过程。对于无需二次聚合的逻辑计划,直接根据缓存数据的schema,从缓存数据的Relation中选择所需列,根据条件过滤后,进行后续操作。如果还需二次聚合,选择所需列时需保留外部要用的所有列,以及聚合时需要的列,还有聚合函数需要的数据。二次聚合的聚合函数需要根据实际情况进行重写,确保能使用Relational Cache中已经初步聚合的结果。这里面需要根据聚合的语意判断是否能够二次聚合。如果时Grouping Set的聚合,二次聚合之前还需选择正确的Grouping ID进行过滤。经过二次聚合后,步骤大体和普通的重写一致,只需替换到目标计划中即可。

结果

我们以一个例子来具体说明逻辑计划的重写结果。Star Schema Benchmark(论文链接)是星型模型数据分析的一个标准Benchmark,其结构定义如图所示:

ssb_schema

我们构建Relational Cache的SQL语句如下:

SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUM
FROM supplier, p_lineorder, dates, customer, part
WHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkey
GROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))
AI 代码解读

我们从中选出一条查询作为示例。具体查询语句:

select c_city, s_city, d_year, sum(lo_revenue) as revenue
    from customer, lineorder, supplier, dates
    where lo_custkey = c_custkey
        and lo_suppkey = s_suppkey
        and lo_orderdate = d_datekey
        and c_nation = 'UNITED KINGDOM'
        and (c_city='UNITED KI1' or c_city='UNITED KI5')
        and (s_city='UNITED KI1' or s_city='UNITED KI5')
        and s_nation = 'UNITED KINGDOM'
        and d_yearmonth = 'Dec1997'
    group by c_city, s_city, d_year
    order by d_year asc, revenue desc
AI 代码解读

原始逻辑计划如下所示:

Sort [d_year#39 ASC NULLS FIRST, revenue#0L DESC NULLS LAST], true
+- Aggregate [c_city#6, s_city#31, d_year#39], [c_city#6, s_city#31, d_year#39, sum(lo_revenue#23L) AS revenue#0L]
   +- Project [c_city#6, lo_revenue#23L, s_city#31, d_year#39]
      +- Join Inner, (lo_orderdate#16 = d_datekey#35)
         :- Project [c_city#6, lo_orderdate#16, lo_revenue#23L, s_city#31]
         :  +- Join Inner, (lo_suppkey#15 = s_suppkey#28)
         :     :- Project [c_city#6, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
         :     :  +- Join Inner, (lo_custkey#13 = c_custkey#3)
         :     :     :- Project [c_custkey#3, c_city#6]
         :     :     :  +- Filter (((isnotnull(c_nation#7) && (c_nation#7 = UNITED KINGDOM)) && ((c_city#6 = UNITED KI1) || (c_city#6 = UNITED KI5))) && isnotnull(c_custkey#3))
         :     :     :     +- HiveTableRelation `ssb`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#3, c_name#4, c_address#5, c_city#6, c_nation#7, c_region#8, c_phone#9, c_mktsegment#10]
         :     :     +- Project [lo_custkey#13, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
         :     :        +- Filter ((isnotnull(lo_custkey#13) && isnotnull(lo_suppkey#15)) && isnotnull(lo_orderdate#16))
         :     :           +- HiveTableRelation `ssb`.`lineorder`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [lo_orderkey#11L, lo_linenumber#12L, lo_custkey#13, lo_partkey#14, lo_suppkey#15, lo_orderdate#16, lo_orderpriotity#17, lo_shippriotity#18, lo_quantity#19L, lo_extendedprice#20L, lo_ordtotalprice#21L, lo_discount#22L, lo_revenue#23L, lo_supplycost#24L, lo_tax#25L, lo_commitdate#26, lo_shipmode#27]
         :     +- Project [s_suppkey#28, s_city#31]
         :        +- Filter (((isnotnull(s_nation#32) && ((s_city#31 = UNITED KI1) || (s_city#31 = UNITED KI5))) && (s_nation#32 = UNITED KINGDOM)) && isnotnull(s_suppkey#28))
         :           +- HiveTableRelation `ssb`.`supplier`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [s_suppkey#28, s_name#29, s_address#30, s_city#31, s_nation#32, s_region#33, s_phone#34]
         +- Project [d_datekey#35, d_year#39]
            +- Filter ((isnotnull(d_yearmonth#41) && (d_yearmonth#41 = Dec1997)) && isnotnull(d_datekey#35))
               +- HiveTableRelation `ssb`.`dates`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [d_datekey#35, d_date#36, d_dayofweek#37, d_month#38, d_year#39, d_yearmonthnum#40, d_yearmonth#41, d_daynuminweek#42, d_daynuminmonth#43, d_daynuminyear#44, d_monthnuminyear#45, d_weeknuminyear#46, d_sellingseason#47, d_lastdayinweekfl#48, d_lastdayinmonthfl#49, d_holidayfl#50, d_weekdayfl#51]
AI 代码解读

重写后的一个逻辑计划如下:

Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true
+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L]
   +- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322))
      +- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet
AI 代码解读

由此可见,执行计划大大简化,我们可以做到亚秒级响应用户的命中查询。

进一步优化

在实际测试过程中,我们发现当多个Relational Cache存在时,匹配时间线性增长明显。由于我们在metastore中存储的是Cache的SQL语句,取SQL语句和再次解析的时间都不容小觑,这就使得匹配过程明显增长,背离了我们追求亚秒级响应的初衷。因此我们在Spark中构建了逻辑计划缓存,将解析过的Relational Cache的计划缓存在内存中,每个Relational Cache只缓存一份,计划本身占用空间有限,因此我们可以缓存住几乎所有的Relational Cache的优化后的逻辑计划,从而在第一次查询之后,所有查询都不再收到取SQL语句和再次解析的延迟困扰。经过这样的优化,匹配时间大幅减少到100ms的量级。

总结与思考

Relational Cache实现了一种基于Cache的优化方案,让Spark SQL能够用于即时查询的场景下,满足用户对海量数据秒级查询的需求。通过对用户查询的动态改写,可以大大提高缓存的利用率,扩展缓存的命中场景,有效提高查询性能。现有方案也有很多可优化的地方,比如重复的回溯遍历时间复杂度较高,不如在逻辑计划节点内部更新维护可匹配的信息。考虑到对Spark的侵入性,我们暂时选择了现有方案,后续根据实际的使用情况,还会进一步优化我们的逻辑计划重写过程。而重写的逻辑计划还涉及到基于不同的Relational Cache Plan会有不同的重写方式,在这些重写结果中如何根据执行代价选择最优的重写方案,将会在后续文章中进行揭秘,敬请期待!

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
打赏
0
0
0
0
1100
分享
相关文章
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
鹰角网络为应对游戏业务高频活动带来的数据潮汐、资源弹性及稳定性需求,采用阿里云 EMR Serverless Spark 替代原有架构。迁移后实现研发效率提升,支持业务快速发展、计算效率提升,增强SLA保障,稳定性提升,降低运维成本,并支撑全球化数据架构部署。
183 56
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
375 58
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
226 15
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
钉钉群直播【Spark Relational Cache 原理和实践】
主要介绍Relational Cache/物化视图的历史和背景,以及EMR Spark基于Relational Cache加速Spark查询的技术方案,及如何通过基于Relational Cache的数据预计算和预组织,使用Spark支持亚秒级响应的交互式分析使用场景。
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
190 79
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
404 2
ClickHouse与大数据生态集成:Spark & Flink 实战
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
136 0
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
106 0
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
156 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等