使用Relational Cache加速EMR Spark数据分析

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
简介: Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。

使用Relational Cache加速Spark数据分析

背景

Cache被广泛应用于数据处理的各个领域和方向上,在目前,计算速度远远大于IO访问速度依然是计算设备上最突出的矛盾,计算设备上的存储从HDD -> SSD -> NVMe -> Mem -> L3-L2-L1 Cache -> 寄存器 -> CPU,存储设备距离CPU越近,计算和IO访问速度的差距越小,数据处理的速度越快,但同时存储从下到上,价格越来越贵,容量越来越小。Cache以更多的资源消耗为代价,将待处理数据预先推到离计算更近的位置,从而加速数据处理的速度,填补计算和IO访问速度的差距。对于Spark来说,HDFS cache,Alluxio等文件系统都提供了文件级别的Cache服务,通过将文件cache到内存中,加速数据处理的速度,并且对Spark这样的计算框架完全透明。

除此之外,还有另外一种Cache的思路,如果需要多次对同一数据进行处理,且处理逻辑有相通之处,我们可以把中间结果cache起来,这样每次进行数据处理时从中间结果进行处理,节省了从原始数据到中间结果之间的计算。Cache的数据离计算结果更近,相比原始数据,经过更少的计算就能得到结果,同样也会加速处理速度。数据仓库中的物化视图是这种cache类型的典型应用。

在Spark中,也提供了Dataset级别的Cache,用户可以通过SQL DDL或是Dataset API将带有schema信息的关系型数据(而非文件)cache到内存。基于Dataset后续的数据处理都可以通过直接读取cache在内存中的数据而节省计算Dataset的时间。不同于数据仓库中的物化视图,Spark目前的Dataset cache还存在很多的不足之处:

  1. Spark Cached Dataset只能在同一个Spark Context中重用,跨Spark Context无法共享,且当Spark Context退出后,cache数据也会被删除。
  2. Dataset Cache,只支持执行计划精确匹配重用,即只有后续查询的执行计划能够精确匹配cached dataset的执行计划,才能使用cache优化查询,这大大降低了cache的优化范围。
  3. Cache的Dataset数据只能保存在内存或本地磁盘,数据量较大时对内存需求较大,而持久化的数据是序列化二进制数据,没有数据schema信息,反序列化代价较大,而且无法支持project filter pushdown等SQL优化处理。

Relational Cache的设计

基于上面提到的缺点,Spark Dataset cache在实际应用中的使用并不广泛,也无法满足一些典型的交互式分析场景,比如基于星型模型多维数据的分析,一般是通过提前构建Cube,通过SQL执行计划重写,满足亚秒级的交互式分析需求。Relational Cache希望能够兼顾Spark Dataset Cache的易用性和物化视图的优化效率,主要的目标包括三个:

  1. 用户可以cache任意关系型数据,包括Table,View或是Dataset。对于任意关系型数据的cache支持可以大大扩展了Relational Cache的使用范围,任何包含重复计算或是可预先确定计算逻辑的使用场景都可能从Relational Cache获益,例如多维数据分析,报表,Dashboard,ETL等。
  2. cache数据支持存放在内存,本地磁盘或是任意Spark支持的Datasource中。存放在内存的临时cache数据访问速度非常快,但是不支持跨Spark Context共享。对于数据量比较大的cache,例如很多企业构建的物化视图或是Cube可能达到PB量级,显然在这种情况下Relational Cache更适合存储在类似HDFS,OSS这样的持久化分布式文件系统中。
  3. cache数据可用于优化后续任意可优化的用户查询。

EMR Spark通过扩展Spark实现Relational Cache,我们的工作主要包括如下几个部分:

  1. Spark SQL DDL扩展,扩展已有的CACHE语法,支持对任意Table/View的cache的增删改查。
  2. Metastore对cache meta信息的支持。通过metastore支持持久型的cache元数据管理。
  3. 扩展Spark Catalyst,支持Cache Based Optimizer,可以通过in-memory或是持久化的cache优化后续查询的执行计划。
  4. 基于CBO的cache选择,可能有多个cache满足执行计划重写,选择合适的cache用于最终的执行计划重写。

Relational Cache的使用

创建Relational Cache

CACHE [LAZY] TABLE table_name
  [REFRESH ON (DEMAND | COMMIT)]
  [(ENABLE | DISABLE) REWRITE]
  [USING datasource
  [OPTIONS (key1=val1, key2=val2, ...)]
  [PARTITIONED BY (col_name1, col_name2, ...)]
  [ZORDER BY (col_name3, col_name4, ...)]
  [CLUSTERED BY (col_name5, col_name6, ...) INTO num_buckets BUCKETS]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1=val1, key2=val2, ...)]]
  [AS select_statement]
AI 代码解读

创建cache的语法规范如上,我们可以通过该语法可以cache任意Spark表或视图,支持json,parquet,orc等数据格式,HDFS,OSS等数据源,以及partition, bucket和z-order等cache数据的组织方式。

REFRESH ON (DEMAND || COMMIT) 指定cache的更新方式,是在基表数据发生更新(COMMIT模式)时自动更新,还是用户通过更新DDL(DEMAND模式)手工触发更新。

(ENABLE | DISABLE) REWRITE 指定是否允许该cache被用于后续的执行计划优化。

此外,EMR Spark还提供和扩展了了更多的Relational Cache相关的DDL用于cache的增删改查。

UNCACHE TABLE [IF EXISTS] table_name
ALTER TABLE table_name (ENABLE | DISABLE) REWRITE
ALTER TABLE table_name REFRESH ON (DEMAND | COMMIT)
REFRESH TABLE cache_name
SHOW CACHES
(DESC | DESCRIBE) (EXTENDED | FORMATTED) table_name
AI 代码解读

EMR Spark还提供了session级别的参数控制是否开启基于Relational Cache的执行计划优化,用户可以通过spark.sql.cache.queryRewrite参数开启或者关闭执行计划优化。

使用Relational Cache优化查询

下面通过一个简单的示例展示Relational Cache是如何优化Spark查询的。原始的查询SQL为:

SELECT n_name, sum(o_totalprice)
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey
GROUP BY n_name
AI 代码解读

对应的物理执行计划包括两次Join以及Aggregate操作,执行时间为16.9s, 如下所示:

== Physical Plan ==
*(7) HashAggregate(keys=[n_name#36], functions=[sum(o_totalprice#10)])
+- Exchange hashpartitioning(n_name#36, 200)
   +- *(6) HashAggregate(keys=[n_name#36], functions=[partial_sum(o_totalprice#10)])
      +- *(6) Project [o_totalprice#10, n_name#36]
         +- *(6) BroadcastHashJoin [c_nationkey#30L], [n_nationkey#35L], Inner, BuildRight
            :- *(6) Project [o_totalprice#10, c_nationkey#30L]
            :  +- *(6) SortMergeJoin [o_custkey#8L], [c_custkey#27L], Inner
            :     :- *(2) Sort [o_custkey#8L ASC NULLS FIRST], false, 0
            :     :  +- Exchange hashpartitioning(o_custkey#8L, 200)
            :     :     +- *(1) Project [o_custkey#8L, o_totalprice#10]
            :     :        +- *(1) Filter isnotnull(o_custkey#8L)
            :     :           +- *(1) FileScan parquet tpch_sf100_parquet.orders[o_custkey#8L,o_totalprice#10,o_orderdate#15] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/orders], PartitionCount: 2406, PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey)], ReadSchema: struct<o_custkey:bigint,o_totalprice:double>
            :     +- *(4) Sort [c_custkey#27L ASC NULLS FIRST], false, 0
            :        +- Exchange hashpartitioning(c_custkey#27L, 200)
            :           +- *(3) Project [c_custkey#27L, c_nationkey#30L]
            :              +- *(3) Filter (isnotnull(c_custkey#27L) && isnotnull(c_nationkey#30L))
            :                 +- *(3) FileScan parquet tpch_sf100_parquet.customer[c_custkey#27L,c_nationkey#30L,c_mktsegment#34] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/customer], PartitionCount: 5, PartitionFilters: [], PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)], ReadSchema: struct<c_custkey:bigint,c_nationkey:bigint>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               +- *(5) Project [n_nationkey#35L, n_name#36]
                  +- *(5) Filter isnotnull(n_nationkey#35L)
                     +- *(5) FileScan parquet tpch_sf100_parquet.nation[n_nationkey#35L,n_name#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey)], ReadSchema: struct<n_nationkey:bigint,n_name:string>
AI 代码解读

image_1

创建Relational cache有两种方式,可以先创建视图,然后通过Cache语法cache 视图的数据,如下所示:

CREATE VIEW nation_cust_cache AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;

CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet;
AI 代码解读

或者也可以直接创建视图并cache数据。

CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet
AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
AI 代码解读

Cache数据完成后,我们重新执行用户查询SQL,执行计划如下:

== Physical Plan ==
*(2) HashAggregate(keys=[n_name#35], functions=[sum(o_totalprice#20)])
+- Exchange hashpartitioning(n_name#35, 200)
   +- *(1) HashAggregate(keys=[n_name#35], functions=[partial_sum(o_totalprice#20)])
      +- *(1) Project [o_totalprice#20, n_name#35]
         +- *(1) Filter (((isnotnull(o_custkey#18L) && isnotnull(c_custkey#26L)) && isnotnull(c_nationkey#29L)) && isnotnull(n_nationkey#34L))
            +- *(1) FileScan parquet tpch_sf100_parquet._cache_nation_cust_cache[n_name#35,o_custkey#18L,c_custkey#26L,n_nationkey#34L,c_nationkey#29L,o_totalprice#20] Batched: true, Format: Parquet, Location: FullScanFileMetaWithStats[hdfs://emr-header-1.cluster-100048:9000/user/hive/warehouse/tpch_sf100_..., PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey), IsNotNull(c_custkey), IsNotNull(c_nationkey), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,o_custkey:bigint,c_custkey:bigint,n_nationkey:bigint,c_nationkey:bigint,o_to...
AI 代码解读

image_2

可以看到基于cache优化后的执行计划直接从cache中读取数据,省去了两次join的计算时间,整体的执行时间也从16.9s下降到了1.9s。

总结

Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。在特定的应用场景中,比如针对星型模型多维度数据的聚合分析,可以实现PB级数据的亚秒级响应。

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
打赏
0
0
0
0
3055
分享
相关文章
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
138 15
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
266 2
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
241 3
阿里云 EMR Serverless Spark 版正式开启商业化
Spark cache()与unpersist()使用位置
Spark在执行过程中是懒加载模式,RDD转换仅仅是构建DAG描述而不执行,只有遇到action算子才会真正的运行
75 9
迟来的EMR Serverless Spark评测报告
本文是一篇关于阿里云EMR Serverless Spark产品评测的文章,作者分享了使用体验和理解。EMR Serverless Spark是阿里云提供的全托管、一站式的Spark数据计算平台,简化了大数据处理流程,让用户专注于数据分析。文章提到了产品的主要优势,如快速启动、弹性伸缩、高资源利用率和低成本。
315 8
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
272 0
深度解析阿里云EMR Serverless StarRocks在OLAP数据分析中的应用场景
阿里云EMR Serverless StarRocks作为一款高性能、全场景覆盖、全托管免运维的OLAP分析引擎,在企业数据分析领域展现出了强大的竞争力和广泛的应用前景。通过其卓越的技术特点、丰富的应用场景以及完善的生态体系支持,EMR Serverless StarRocks正逐步成为企业数字化转型和智能化升级的重要推手。未来随着技术的不断进步和应用场景的不断拓展我们有理由相信EMR Serverless StarRocks将在更多领域发挥重要作用为企业创造更大的价值。
钉钉群直播【Spark Relational Cache 原理和实践】
主要介绍Relational Cache/物化视图的历史和背景,以及EMR Spark基于Relational Cache加速Spark查询的技术方案,及如何通过基于Relational Cache的数据预计算和预组织,使用Spark支持亚秒级响应的交互式分析使用场景。
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
266 2
ClickHouse与大数据生态集成:Spark & Flink 实战
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等