使用Relational Cache加速EMR Spark数据分析

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
简介: Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。

背景

Cache被广泛应用于数据处理的各个领域和方向上,在目前,计算速度远远大于IO访问速度依然是计算设备上最突出的矛盾,计算设备上的存储从HDD -> SSD -> NVMe -> Mem -> L3-L2-L1 Cache -> 寄存器 -> CPU,存储设备距离CPU越近,计算和IO访问速度的差距越小,数据处理的速度越快,但同时存储从下到上,价格越来越贵,容量越来越小。Cache以更多的资源消耗为代价,将待处理数据预先推到离计算更近的位置,从而加速数据处理的速度,填补计算和IO访问速度的差距。对于Spark来说,HDFS cache,Alluxio等文件系统都提供了文件级别的Cache服务,通过将文件cache到内存中,加速数据处理的速度,并且对Spark这样的计算框架完全透明。

除此之外,还有另外一种Cache的思路,如果需要多次对同一数据进行处理,且处理逻辑有相通之处,我们可以把中间结果cache起来,这样每次进行数据处理时从中间结果进行处理,节省了从原始数据到中间结果之间的计算。Cache的数据离计算结果更近,相比原始数据,经过更少的计算就能得到结果,同样也会加速处理速度。数据仓库中的物化视图是这种cache类型的典型应用。

在Spark中,也提供了Dataset级别的Cache,用户可以通过SQL DDL或是Dataset API将带有schema信息的关系型数据(而非文件)cache到内存。基于Dataset后续的数据处理都可以通过直接读取cache在内存中的数据而节省计算Dataset的时间。不同于数据仓库中的物化视图,Spark目前的Dataset cache还存在很多的不足之处:

  1. Spark Cached Dataset只能在同一个Spark Context中重用,跨Spark Context无法共享,且当Spark Context退出后,cache数据也会被删除。
  2. Dataset Cache,只支持执行计划精确匹配重用,即只有后续查询的执行计划能够精确匹配cached dataset的执行计划,才能使用cache优化查询,这大大降低了cache的优化范围。
  3. Cache的Dataset数据只能保存在内存或本地磁盘,数据量较大时对内存需求较大,而持久化的数据是序列化二进制数据,没有数据schema信息,反序列化代价较大,而且无法支持project filter pushdown等SQL优化处理。

Relational Cache的设计

基于上面提到的缺点,Spark Dataset cache在实际应用中的使用并不广泛,也无法满足一些典型的交互式分析场景,比如基于星型模型多维数据的分析,一般是通过提前构建Cube,通过SQL执行计划重写,满足亚秒级的交互式分析需求。Relational Cache希望能够兼顾Spark Dataset Cache的易用性和物化视图的优化效率,主要的目标包括三个:

  1. 用户可以cache任意关系型数据,包括Table,View或是Dataset。对于任意关系型数据的cache支持可以大大扩展了Relational Cache的使用范围,任何包含重复计算或是可预先确定计算逻辑的使用场景都可能从Relational Cache获益,例如多维数据分析,报表,Dashboard,ETL等。
  2. cache数据支持存放在内存,本地磁盘或是任意Spark支持的Datasource中。存放在内存的临时cache数据访问速度非常快,但是不支持跨Spark Context共享。对于数据量比较大的cache,例如很多企业构建的物化视图或是Cube可能达到PB量级,显然在这种情况下Relational Cache更适合存储在类似HDFS,OSS这样的持久化分布式文件系统中。
  3. cache数据可用于优化后续任意可优化的用户查询。

EMR Spark通过扩展Spark实现Relational Cache,我们的工作主要包括如下几个部分:

  1. Spark SQL DDL扩展,扩展已有的CACHE语法,支持对任意Table/View的cache的增删改查。

2.Metastore对cache meta信息的支持。通过metastore支持持久型的cache元数据管理。

3.扩展Spark Catalyst,支持Cache Based Optimizer,可以通过in-memory或是持久化的cache优化后续查询的执行计划。

4.基于CBO的cache选择,可能有多个cache满足执行计划重写,选择合适的cache用于最终的执行计划重写。

Relational Cache的使用

创建Relational Cache

  [REFRESH ON (DEMAND | COMMIT)]
  [(ENABLE | DISABLE) REWRITE]
  [USING datasource
  [OPTIONS (key1=val1, key2=val2, ...)]
  [PARTITIONED BY (col_name1, col_name2, ...)]
  [ZORDER BY (col_name3, col_name4, ...)]
  [CLUSTERED BY (col_name5, col_name6, ...) INTO num_buckets BUCKETS]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1=val1, key2=val2, ...)]]
  [AS select_statement]

创建cache的语法规范如上,我们可以通过该语法可以cache任意Spark表或视图,支持json,parquet,orc等数据格式,HDFS,OSS等数据源,以及partition, bucket和z-order等cache数据的组织方式。

REFRESH ON (DEMAND || COMMIT) 指定cache的更新方式,是在基表数据发生更新(COMMIT模式)时自动更新,还是用户通过更新DDL(DEMAND模式)手工触发更新。

(ENABLE | DISABLE) REWRITE 指定是否允许该cache被用于后续的执行计划优化。

此外,EMR Spark还提供和扩展了了更多的Relational Cache相关的DDL用于cache的增删改查。

UNCACHE TABLE [IF EXISTS] table_name
ALTER TABLE table_name (ENABLE | DISABLE) REWRITE
ALTER TABLE table_name REFRESH ON (DEMAND | COMMIT)
REFRESH TABLE cache_name
SHOW CACHES
(DESC | DESCRIBE) (EXTENDED | FORMATTED) table_name

EMR Spark还提供了session级别的参数控制是否开启基于Relational Cache的执行计划优化,用户可以通过spark.sql.cache.queryRewrite参数开启或者关闭执行计划优化。

使用Relational Cache优化查询

下面通过一个简单的示例展示Relational Cache是如何优化Spark查询的。原始的查询SQL为:

SELECT n_name, sum(o_totalprice)
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey
GROUP BY n_name

对应的物理执行计划包括两次Join以及Aggregate操作,执行时间为16.9s, 如下所示:

*(7) HashAggregate(keys=[n_name#36], functions=[sum(o_totalprice#10)])
+- Exchange hashpartitioning(n_name#36, 200)
   +- *(6) HashAggregate(keys=[n_name#36], functions=[partial_sum(o_totalprice#10)])
      +- *(6) Project [o_totalprice#10, n_name#36]
         +- *(6) BroadcastHashJoin [c_nationkey#30L], [n_nationkey#35L], Inner, BuildRight
            :- *(6) Project [o_totalprice#10, c_nationkey#30L]
            :  +- *(6) SortMergeJoin [o_custkey#8L], [c_custkey#27L], Inner
            :     :- *(2) Sort [o_custkey#8L ASC NULLS FIRST], false, 0
            :     :  +- Exchange hashpartitioning(o_custkey#8L, 200)
            :     :     +- *(1) Project [o_custkey#8L, o_totalprice#10]
            :     :        +- *(1) Filter isnotnull(o_custkey#8L)
            :     :           +- *(1) FileScan parquet tpch_sf100_parquet.orders[o_custkey#8L,o_totalprice#10,o_orderdate#15] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/orders], PartitionCount: 2406, PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey)], ReadSchema: struct<o_custkey:bigint,o_totalprice:double>
            :     +- *(4) Sort [c_custkey#27L ASC NULLS FIRST], false, 0
            :        +- Exchange hashpartitioning(c_custkey#27L, 200)
            :           +- *(3) Project [c_custkey#27L, c_nationkey#30L]
            :              +- *(3) Filter (isnotnull(c_custkey#27L) && isnotnull(c_nationkey#30L))
            :                 +- *(3) FileScan parquet tpch_sf100_parquet.customer[c_custkey#27L,c_nationkey#30L,c_mktsegment#34] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/customer], PartitionCount: 5, PartitionFilters: [], PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)], ReadSchema: struct<c_custkey:bigint,c_nationkey:bigint>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               +- *(5) Project [n_nationkey#35L, n_name#36]
                  +- *(5) Filter isnotnull(n_nationkey#35L)
                     +- *(5) FileScan parquet tpch_sf100_parquet.nation[n_nationkey#35L,n_name#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey)], ReadSchema: struct<n_nationkey:bigint,n_name:string>

image

创建Relational cache有两种方式,可以先创建视图,然后通过Cache语法cache 视图的数据,如下所示:

CREATE VIEW nation_cust_cache AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;

CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet;

或者也可以直接创建视图并cache数据。

CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet
AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;

Cache数据完成后,我们重新执行用户查询SQL,执行计划如下:

== Physical Plan ==
*(2) HashAggregate(keys=[n_name#35], functions=[sum(o_totalprice#20)])
+- Exchange hashpartitioning(n_name#35, 200)
   +- *(1) HashAggregate(keys=[n_name#35], functions=[partial_sum(o_totalprice#20)])
      +- *(1) Project [o_totalprice#20, n_name#35]
         +- *(1) Filter (((isnotnull(o_custkey#18L) && isnotnull(c_custkey#26L)) && isnotnull(c_nationkey#29L)) && isnotnull(n_nationkey#34L))
            +- *(1) FileScan parquet tpch_sf100_parquet._cache_nation_cust_cache[n_name#35,o_custkey#18L,c_custkey#26L,n_nationkey#34L,c_nationkey#29L,o_totalprice#20] Batched: true, Format: Parquet, Location: FullScanFileMetaWithStats[hdfs://emr-header-1.cluster-100048:9000/user/hive/warehouse/tpch_sf100_..., PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey), IsNotNull(c_custkey), IsNotNull(c_nationkey), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,o_custkey:bigint,c_custkey:bigint,n_nationkey:bigint,c_nationkey:bigint,o_to...

f6b4c0479f27479bc351ca2fa8d84a49c7338ec2_1_

可以看到基于cache优化后的执行计划直接从cache中读取数据,省去了两次join的计算时间,整体的执行时间也从16.9s下降到了1.9s。

总结

Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。在特定的应用场景中,比如针对星型模型多维度数据的聚合分析,可以实现PB级数据的亚秒级响应。

相关文章
|
5月前
|
SQL 分布式计算 Serverless
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
鹰角网络为应对游戏业务高频活动带来的数据潮汐、资源弹性及稳定性需求,采用阿里云 EMR Serverless Spark 替代原有架构。迁移后实现研发效率提升,支持业务快速发展、计算效率提升,增强SLA保障,稳定性提升,降低运维成本,并支撑全球化数据架构部署。
488 56
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
|
3月前
|
人工智能 分布式计算 DataWorks
一体系数据平台的进化:基于阿里云 EMR Serverless Spark 的持续演进
本文介绍了一体系汽配供应链平台如何借助阿里云EMR Serverless Spark实现从传统Hadoop平台向云原生架构的迁移。通过融合高质量零部件供应与创新互联网科技,一体系利用EMR Serverless Spark和DataWorks构建高效数据分析体系,解决大规模数据处理瓶颈。方案涵盖实时数据集成、Lakehouse搭建、数仓分层设计及BI/ML应用支持,显著提升数据处理性能与业务响应速度,降低运维成本,为数字化转型奠定基础。最终实现研发效率提升、运维压力减轻,并推动AI技术深度整合,迈向智能化云原生数据平台。
139 4
|
7月前
|
存储 分布式计算 物联网
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
567 58
|
7月前
|
机器学习/深度学习 分布式计算 大数据
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
332 15
|
7月前
|
SQL 分布式计算 Serverless
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
147 0
|
11月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
544 58
|
SQL 分布式计算 Spark
钉钉群直播【Spark Relational Cache 原理和实践】
主要介绍Relational Cache/物化视图的历史和背景,以及EMR Spark基于Relational Cache加速Spark查询的技术方案,及如何通过基于Relational Cache的数据预计算和预组织,使用Spark支持亚秒级响应的交互式分析使用场景。
|
3月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
182 0
|
6月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
282 79