使用Relational Cache加速EMR Spark数据分析-阿里云开发者社区

开发者社区> Apache Spark中国社区> 正文

使用Relational Cache加速EMR Spark数据分析

简介: Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。

使用Relational Cache加速Spark数据分析

背景

Cache被广泛应用于数据处理的各个领域和方向上,在目前,计算速度远远大于IO访问速度依然是计算设备上最突出的矛盾,计算设备上的存储从HDD -> SSD -> NVMe -> Mem -> L3-L2-L1 Cache -> 寄存器 -> CPU,存储设备距离CPU越近,计算和IO访问速度的差距越小,数据处理的速度越快,但同时存储从下到上,价格越来越贵,容量越来越小。Cache以更多的资源消耗为代价,将待处理数据预先推到离计算更近的位置,从而加速数据处理的速度,填补计算和IO访问速度的差距。对于Spark来说,HDFS cache,Alluxio等文件系统都提供了文件级别的Cache服务,通过将文件cache到内存中,加速数据处理的速度,并且对Spark这样的计算框架完全透明。

除此之外,还有另外一种Cache的思路,如果需要多次对同一数据进行处理,且处理逻辑有相通之处,我们可以把中间结果cache起来,这样每次进行数据处理时从中间结果进行处理,节省了从原始数据到中间结果之间的计算。Cache的数据离计算结果更近,相比原始数据,经过更少的计算就能得到结果,同样也会加速处理速度。数据仓库中的物化视图是这种cache类型的典型应用。

在Spark中,也提供了Dataset级别的Cache,用户可以通过SQL DDL或是Dataset API将带有schema信息的关系型数据(而非文件)cache到内存。基于Dataset后续的数据处理都可以通过直接读取cache在内存中的数据而节省计算Dataset的时间。不同于数据仓库中的物化视图,Spark目前的Dataset cache还存在很多的不足之处:

  1. Spark Cached Dataset只能在同一个Spark Context中重用,跨Spark Context无法共享,且当Spark Context退出后,cache数据也会被删除。
  2. Dataset Cache,只支持执行计划精确匹配重用,即只有后续查询的执行计划能够精确匹配cached dataset的执行计划,才能使用cache优化查询,这大大降低了cache的优化范围。
  3. Cache的Dataset数据只能保存在内存或本地磁盘,数据量较大时对内存需求较大,而持久化的数据是序列化二进制数据,没有数据schema信息,反序列化代价较大,而且无法支持project filter pushdown等SQL优化处理。

Relational Cache的设计

基于上面提到的缺点,Spark Dataset cache在实际应用中的使用并不广泛,也无法满足一些典型的交互式分析场景,比如基于星型模型多维数据的分析,一般是通过提前构建Cube,通过SQL执行计划重写,满足亚秒级的交互式分析需求。Relational Cache希望能够兼顾Spark Dataset Cache的易用性和物化视图的优化效率,主要的目标包括三个:

  1. 用户可以cache任意关系型数据,包括Table,View或是Dataset。对于任意关系型数据的cache支持可以大大扩展了Relational Cache的使用范围,任何包含重复计算或是可预先确定计算逻辑的使用场景都可能从Relational Cache获益,例如多维数据分析,报表,Dashboard,ETL等。
  2. cache数据支持存放在内存,本地磁盘或是任意Spark支持的Datasource中。存放在内存的临时cache数据访问速度非常快,但是不支持跨Spark Context共享。对于数据量比较大的cache,例如很多企业构建的物化视图或是Cube可能达到PB量级,显然在这种情况下Relational Cache更适合存储在类似HDFS,OSS这样的持久化分布式文件系统中。
  3. cache数据可用于优化后续任意可优化的用户查询。

EMR Spark通过扩展Spark实现Relational Cache,我们的工作主要包括如下几个部分:

  1. Spark SQL DDL扩展,扩展已有的CACHE语法,支持对任意Table/View的cache的增删改查。
  2. Metastore对cache meta信息的支持。通过metastore支持持久型的cache元数据管理。
  3. 扩展Spark Catalyst,支持Cache Based Optimizer,可以通过in-memory或是持久化的cache优化后续查询的执行计划。
  4. 基于CBO的cache选择,可能有多个cache满足执行计划重写,选择合适的cache用于最终的执行计划重写。

Relational Cache的使用

创建Relational Cache

CACHE [LAZY] TABLE table_name
  [REFRESH ON (DEMAND | COMMIT)]
  [(ENABLE | DISABLE) REWRITE]
  [USING datasource
  [OPTIONS (key1=val1, key2=val2, ...)]
  [PARTITIONED BY (col_name1, col_name2, ...)]
  [ZORDER BY (col_name3, col_name4, ...)]
  [CLUSTERED BY (col_name5, col_name6, ...) INTO num_buckets BUCKETS]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1=val1, key2=val2, ...)]]
  [AS select_statement]

创建cache的语法规范如上,我们可以通过该语法可以cache任意Spark表或视图,支持json,parquet,orc等数据格式,HDFS,OSS等数据源,以及partition, bucket和z-order等cache数据的组织方式。

REFRESH ON (DEMAND || COMMIT) 指定cache的更新方式,是在基表数据发生更新(COMMIT模式)时自动更新,还是用户通过更新DDL(DEMAND模式)手工触发更新。

(ENABLE | DISABLE) REWRITE 指定是否允许该cache被用于后续的执行计划优化。

此外,EMR Spark还提供和扩展了了更多的Relational Cache相关的DDL用于cache的增删改查。

UNCACHE TABLE [IF EXISTS] table_name
ALTER TABLE table_name (ENABLE | DISABLE) REWRITE
ALTER TABLE table_name REFRESH ON (DEMAND | COMMIT)
REFRESH TABLE cache_name
SHOW CACHES
(DESC | DESCRIBE) (EXTENDED | FORMATTED) table_name

EMR Spark还提供了session级别的参数控制是否开启基于Relational Cache的执行计划优化,用户可以通过spark.sql.cache.queryRewrite参数开启或者关闭执行计划优化。

使用Relational Cache优化查询

下面通过一个简单的示例展示Relational Cache是如何优化Spark查询的。原始的查询SQL为:

SELECT n_name, sum(o_totalprice)
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey
GROUP BY n_name

对应的物理执行计划包括两次Join以及Aggregate操作,执行时间为16.9s, 如下所示:

== Physical Plan ==
*(7) HashAggregate(keys=[n_name#36], functions=[sum(o_totalprice#10)])
+- Exchange hashpartitioning(n_name#36, 200)
   +- *(6) HashAggregate(keys=[n_name#36], functions=[partial_sum(o_totalprice#10)])
      +- *(6) Project [o_totalprice#10, n_name#36]
         +- *(6) BroadcastHashJoin [c_nationkey#30L], [n_nationkey#35L], Inner, BuildRight
            :- *(6) Project [o_totalprice#10, c_nationkey#30L]
            :  +- *(6) SortMergeJoin [o_custkey#8L], [c_custkey#27L], Inner
            :     :- *(2) Sort [o_custkey#8L ASC NULLS FIRST], false, 0
            :     :  +- Exchange hashpartitioning(o_custkey#8L, 200)
            :     :     +- *(1) Project [o_custkey#8L, o_totalprice#10]
            :     :        +- *(1) Filter isnotnull(o_custkey#8L)
            :     :           +- *(1) FileScan parquet tpch_sf100_parquet.orders[o_custkey#8L,o_totalprice#10,o_orderdate#15] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/orders], PartitionCount: 2406, PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey)], ReadSchema: struct<o_custkey:bigint,o_totalprice:double>
            :     +- *(4) Sort [c_custkey#27L ASC NULLS FIRST], false, 0
            :        +- Exchange hashpartitioning(c_custkey#27L, 200)
            :           +- *(3) Project [c_custkey#27L, c_nationkey#30L]
            :              +- *(3) Filter (isnotnull(c_custkey#27L) && isnotnull(c_nationkey#30L))
            :                 +- *(3) FileScan parquet tpch_sf100_parquet.customer[c_custkey#27L,c_nationkey#30L,c_mktsegment#34] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/customer], PartitionCount: 5, PartitionFilters: [], PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)], ReadSchema: struct<c_custkey:bigint,c_nationkey:bigint>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               +- *(5) Project [n_nationkey#35L, n_name#36]
                  +- *(5) Filter isnotnull(n_nationkey#35L)
                     +- *(5) FileScan parquet tpch_sf100_parquet.nation[n_nationkey#35L,n_name#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey)], ReadSchema: struct<n_nationkey:bigint,n_name:string>

image_1

创建Relational cache有两种方式,可以先创建视图,然后通过Cache语法cache 视图的数据,如下所示:

CREATE VIEW nation_cust_cache AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;

CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet;

或者也可以直接创建视图并cache数据。

CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet
AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;

Cache数据完成后,我们重新执行用户查询SQL,执行计划如下:

== Physical Plan ==
*(2) HashAggregate(keys=[n_name#35], functions=[sum(o_totalprice#20)])
+- Exchange hashpartitioning(n_name#35, 200)
   +- *(1) HashAggregate(keys=[n_name#35], functions=[partial_sum(o_totalprice#20)])
      +- *(1) Project [o_totalprice#20, n_name#35]
         +- *(1) Filter (((isnotnull(o_custkey#18L) && isnotnull(c_custkey#26L)) && isnotnull(c_nationkey#29L)) && isnotnull(n_nationkey#34L))
            +- *(1) FileScan parquet tpch_sf100_parquet._cache_nation_cust_cache[n_name#35,o_custkey#18L,c_custkey#26L,n_nationkey#34L,c_nationkey#29L,o_totalprice#20] Batched: true, Format: Parquet, Location: FullScanFileMetaWithStats[hdfs://emr-header-1.cluster-100048:9000/user/hive/warehouse/tpch_sf100_..., PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey), IsNotNull(c_custkey), IsNotNull(c_nationkey), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,o_custkey:bigint,c_custkey:bigint,n_nationkey:bigint,c_nationkey:bigint,o_to...

image_2

可以看到基于cache优化后的执行计划直接从cache中读取数据,省去了两次join的计算时间,整体的执行时间也从16.9s下降到了1.9s。

总结

Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。在特定的应用场景中,比如针对星型模型多维度数据的聚合分析,可以实现PB级数据的亚秒级响应。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:

阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎加入!邀请你加入钉钉群聊Apache Spark中国技术交流社区,点击进入查看详情 https://qr.dingtalk.com/action/joingroup?code=v1,k1,X7S/0/QcrLMkK7QZ5sw2oTvoYW49u0g5dvGu7PW+sm4=&_dt_no_comment=1&origin=11

官方博客
官网链接