EMR Spark Relational Cache如何支持雪花模型中的关联匹配

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 在Spark中,Join通常是代价比较大,尤其是shuffle join。Relational Cache将反范式化表(即关联后的大表)保存为relational cache,便可以使用cache重写执行计划,提高查询效率。

Relational Cache相关文章链接:

使用Relational Cache加速EMR Spark数据分析
使用EMR Spark Relational Cache跨集群同步数据
EMR Spark Relational Cache的执行计划重写

背景

Join是Spark SQL中非常常见的操作,数据表按照业务语义的范式化表定义,便于用户理解与使用,而且可以消除冗余数据。用户通过join操作将相关的数据关联后进行进一步的过滤,聚合等操作。在Spark中,Join通常是代价比较大,尤其是当join的两个表的数据都比较大,无法优化为map join时,需要通过网络shuffle两个表的数据,对数据按照jion字段进行重新组织。Relational Cache是EMR Spark支持的重要特性,类似于数据仓库的物化视图,将反范式化表(即关联后的大表)保存为relational cache,便可以使用cache重写执行计划,提高查询效率。但是A ⋈ B ⋈ C作为relational cache只能用来优化包含A ⋈ B ⋈ C的查询,理论上是不能用来优化只包含A ⋈ B或A ⋈ C的查询。如果表的数量很多,假设有n个表,则总共可能有2^n个关联结果(当然在业务上并不会出现任意两个表都可以关联的情况,但是可能的关联数量依然会非常大),将每个关联结果都作为relational cache构建代价太大,并不现实,我们需要找到一种方式可以通过单个Relational Cache支持优化多个关联查询的方式,从而在加速用户查询的同时,减少创建和更新relational cache的代价。Record Preserve Join是支持这种优化的非常有效的方式。

什么是Record Preserved Join

定义:对于表A和表B,如果A的每一条记录都出现在A ⋈ B的结果中,并且A ⋈ B的结果并没有其他额外的记录,那么我们称A ⋈ B record preserved on A。

下面我们看一下一个Record preserved join的简单示例。

tables

如上图所示,activity_log为事实表,product,user,city为维度表,activity_log表通过user_id字段和user表关联,通过product_id字段与product表关联,user表通过city_id字段与city表关联,关联的结果如下:

cache

如果我们把上述的activity_log,product,user以及city四表的关联结果作为Relational Cache,理论上只有后续的查询包含这四个表的关联时,才能够使用cache优化SQL执行计划,如果查询只包含部分表的关联,比如只是activity_log和user关联,是没有办法使用之前的cache优化的。但是我们仔细观察可以发现,每一行activity_log表中的记录,其user_id都和user表中的其中一条且仅一条记录相关联,对于product_id和city_id的关联也是同样如此,可以看到关联后的结果,仅仅是为activity_log表中的每条记录增加更多的字段,activity_log中的每条记录和关联结果中的每条记录是一一对应的关系,这种join结果就是record preserved join。由于activity_log和user关联的数据每一条都和cache中的数据一一对应,不多也不少,所以实际上我们应该允许使用该cache优化包含activity_log和user关联的查询。比如对于上面的示例,我们把图2中四表关联的结果保存为Relational Cache activity_flat_cache,那么对于query

SELECT activity_text, user_name FROM activity_log, user WHERE activity.user_id = user.user_id AND user_name = 'jack'

由于所有的activity_log记录都在activity_flat_cache中,我们可以将该查询改写为

SELECT activity_text, user_name FROM activity_flat_cache WHERE user_name = 'jack'

同理,我们可以使用activity_flat_cache优化任意包含activity_log与其他单个或多个维度表的关联查询,使用同一个cache优化多种关联场景,大大降低relational cache维护和更新所需的存储和计算成本。

为了能够在Relational Cache中基于Record Preserved Join支持更丰富的优化场景,我们需要首先解决两个问题:

  1. Record preserved Join对于关联两表的数据有非常严格的约束,Relational Cache如何知道两个表关联的结果是否为Record preserved Join。
  2. 已知1的信息,如何推导Relational Cache是否可用于Join查询的执行计划优化。

Record Preserved Join声明

一个常见的Record Preserved Join是Left Outer Join,对于任意的表A和B,A left outer join B record preserved on A。根据表A和B中join key的数据分布,最常见的也可能满足record preserved join的条件。在EMR Spark中,用户可以定义表的主外键和NOT NULL约束,通过表的主外键和NOT NULL约束,Relational Cache可以推断出两表关联是否满足Record Preserved Join。

根据外键的定义,外键的值必须存在其引用的主键中,或者为空,其引用的主键又必须不能重复,所以如果存在表A和表B,而且表A中的外键字段关联表B的主键,且外键字段有NOT NULL约束,则我们可以确定A INNER JOIN B的结果record preserved on A

以上面四个表的关系为例,在Spark SQL定义主外键的DDL示例如下:

ALTER TABLE activity_log ADD CONSTRAINT act_pk PRIMARY KEY (activity_id);
ALTER TABLE product ADD CONSTRAINT prd_pk PRIMARY KEY (product_id);
ALTER TABLE user ADD CONSTRAINT user_pk PRIMARY KEY (user_id);
ALTER TABLE city ADD CONSTRAINT city_pk PRIMARY KEY (city_id);
ALTER TABLE activity_log ADD CONSTRAINT act_prd_fk FOREIGN KEY (product_id) REFRENCES product (product_id);
ALTER TABLE activity_log ADD CONSTRAINT act_user_fk FOREIGN KEY (user_id) REFRENCES user (user_id);
ALTER TABLE user ADD CONSTRAINT user_city_fk FOREIGN KEY (city_id) REFRENCES city (city_id);

结合各外键字段的NOT NULL约束,我们可以推断出如下Record Preserved Join:

  • activity_log inner join product record preserved on activity_log
  • activity_log inner join user record preserved on activity_log
  • user inner join city record preserved on user

使用Record Preserved Join优化优化执行计划

EMR Spark支持通过任意的SQL查询创建Relational Cache,可能包含关联,聚合,过滤,投影等各种操作,其中关联也包括record preserved join和其他join,如何利用到其中的record preserved join特性对更多的查询优化其执行计划,决定了我们对于Relational Cache的利用效率。Relational Cache通过比较用户查询和cache视图的执行计划来决定是否可以使用cache代替查询执行计划或其一部分,在匹配Join时判断的主要步骤如下:

  1. 收集用户查询中的join相关信息,与Relational Cache中join相关信息,找到两者并集,且并集中所有表都是关联的。
  2. 对于Relational Cache中的除1中并集外的其他关联操作,根据用户定义的约束推断出来的record preserved join信息,判断Relational Cache其他关联操作的结果是否是record preserved on 并集结果。
  3. 使用cache替换并集,并和用户查询中剩余的其他表重新拼接join。
  4. 继续适配执行计划其他部分。

例如我们创建了relational cache (A ⋈ B) ⋈ C,且 A ⋈ B record preserved on A & A ⋈ C record preserved on A, 用户查询为A ⋈ C,

在判断过程中,直接从约束的得到的Record Preserved Join信息可能并不足够,我们还需要通过一些定律进一步推理,从而充分利用Record Preserved Join信息优化更多的查询。

Record Preserved Join推理

根据record preserved join的定义和关系代数的基本原理,我们可以推导出如下定理。

  1. 等价

    if A full outer join B record preserved on A
    then A full outer join B = A left outer join B
    if A inner join B record preserved on A
    then A inner join B = A left outer join B

已知A left outer join B record preserved on A,如果同时A full outer join B record preserved on A的话,那么我们可以确定A left outer join B和A full outer join B的结果一致,可以互相替换。对于Inner Join同样如此。

  1. 交换

    if A ⋈ B record preserved on A
      then B reverse(⋈) A record preserved on A
      def reverse(join) join match {
        case INNER => INNER
        case LEFT OUTER => RIGHT OUTER
        case FULL OUTER => FULL OUTER
      }

根据关系代数的基本定义可以得到reverse函数,加上record preserved join定义,可以很方便的推导出此定理。实际的查询中,join的顺序可能和Relational Cache中并不一致,可能需要变换join顺序进行比较。

  1. 结合

    if A ⋈ B record preserved on A
      and B ⋈ C record preserved on B
    then A ⋈ B join C record preserved on A

由于B ⋈ C record preserved on B,可以认为B join C的结果是在B表中新增更多的维度列,所以A ⋈ B ⋈ C的结果和A ⋈ B的结果记录数一致,A ⋈ B record preserved on A,所以A ⋈ B ⋈ C record preserved on A。

同理,也可推导出:

if A ⋈ B record preserved on A
     and A ⋈ C record preserved on A
   then (A ⋈ B) ⋈ C record preserved on A and (A ⋈ B),
     (A ⋈ C) ⋈ B record preserved on A and (A ⋈ C),
     
  1. 传导

    if A ⋈ B record preserved on A
      and B ⋈ C record preserved on B
    then A ⋈ C record preserved on A
    // same join type, same join key

​ 由于A ⋈ B record preserved on A和B ⋈ C record preserved on B可以得知A ⋈ B ⋈ C record perserved on A,如果A ⋈ B中的join字段和A ⋈ C中A的join字段一致,且B ⋈ C中的join字段和A ⋈ C中C的join字段一致,将A ⋈ B ⋈ C结果中的B相关字段去掉,即为A join C,其结果依然record preserved on A。

雪花数据模型

Relational Cache一个重要的使用场景是决策支持系统,通过BI,报表或多维数据分析快速支持用户的商业决策。在这种场景中,数据模型通常包括一个事实表(Fact Table)和多个维度表(Dimension Table),对于事实表和维度表的关联关系,可以大体分为三种类型:

  1. Star Schema:所有的维度表都是反范式化(denormalized)的,即维度表只有一层,事实表可以和任意维度表直接关联。
  2. Snowflake Schema:所有的维度表都是范式化(normalized)的,即维度表有多层,事实表需要通过多次关联才能关联到全部维度数据。
  3. Starflake Schema:部分维度表是范式化的,部分维度表是反范式化的。

在Star/Snowflake/Starflake数据模型中,事实表和维度表的数据存在着业务上的关联关系,实际的数据也满足主外键/非空字段等约束条件,是验证在执行计划优化时使用Record Preserved Join的合适场景。在MOLAP引擎中,例如apache kylin,通常需要用户描述Star/Snowflake/Starflake数据模型,结合维度和统计列信息构建Cube,用于快速响应多维分析请求。用户的多维分析查询可能涉及到事实表和一个或者多个维度表的关联,实际上Star/Snowflake/Starflake数据模型的定义也隐含着事实表和维度表的Record Preserved Join约束,Relational Cache通过更基础的字段约束定义,推导出Record Preserved Join,从而支持使用relational cache构建cube,通过执行计划重写,满足交互式的多维分析查询需求。Relational Cache的Record Preserved Join推导不仅可用于基于雪花模型的多维分析场景,也可以用于其他涉及到Join的场景,拓展relational cache可优化的查询场景,减少维护的成本和代价。

使用Record Preserved Join优化雪花模型示例

我们使用第二节中的表及其约束,构建Relational Cache,假设用户需要进行多维分析,构建一个Full Cube语句如下:

CACHE TABLE activity_cube
USING parquet
AS 
SELECT product_name, user_name, city_name, count(1), GROUPING_ID() AS grouping_id 
FROM activity_log, user, product, city 
WHERE activity_log.product_id = product.product_id and activity_log.user_id = user.user_id and user.city_id = city.city_id
GROUP BY CUBE(product_name, user_name, city_name);

用户查询如下:

SELECT product_name, count FROM activity_log, product WHERE activity_log.product_id = product.product_id and product_name = 'xxx';

在匹配Join时判断的主要步骤如下:

  1. cache和用户查询join的并集为:activity_log ⋈ product
  2. cache中剩余的表为user和city,这一步可能重复多次,在第一轮通过activity_log ⋈ user record preserved on activity_log以及activity_log ⋈ product record preserved on activity_log使用结合律2推导出(activity_log ⋈ product) ⋈ user record preserved on (activity_log ⋈ product), 在第二轮使用结合律1和上轮的结果推导出(activity_log ⋈ product) ⋈ user ⋈ city record preserved on (activity_log ⋈ product), 从而得出结论cache可以用于替换activity_log ⋈ product。
  3. 继续其他部分执行计划的匹配和重写。

可以看到,基于Record Preserved Join及其推理,我们可以使用单个大宽表(包含事实表和所有维度表关联的结果)作为cache优化所有包含事实表activity_log的关联查询,以此为基础,我们构建的activity_cube可以用于优化基于各个维度组合的查询,结合我们在聚合层面的匹配策略,支持Starflake模型数据的交互式多维分析。

总结

Relational Cache通过Spark中表的各种字段约束信息,推导出Record Preserved Join,结合更进一步的推理规则,使得relational cache可以通过一个宽表的cache优化多种关联查询的场景。在star/snowflake/starflake数据模型下,通过将事实表和所有维度表关联并根据维度聚合后的结果(即Cube)保存为relational cache后,通过Record Preserved Join的推导,relational cache在执行计划优化时可以使用cube数据重写各种维度组合的多维分析查询的执行计划,从而满足亚秒级响应的交互式分析需求。

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
3月前
|
存储 分布式计算 Serverless
|
11天前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
56 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
23天前
|
存储 缓存 分布式计算
Spark cache()与unpersist()使用位置
Spark在执行过程中是懒加载模式,RDD转换仅仅是构建DAG描述而不执行,只有遇到action算子才会真正的运行
38 9
|
3月前
|
弹性计算 分布式计算 运维
迟来的EMR Serverless Spark评测报告
本文是一篇关于阿里云EMR Serverless Spark产品评测的文章,作者分享了使用体验和理解。EMR Serverless Spark是阿里云提供的全托管、一站式的Spark数据计算平台,简化了大数据处理流程,让用户专注于数据分析。文章提到了产品的主要优势,如快速启动、弹性伸缩、高资源利用率和低成本。
212 8
|
2月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
158 0
|
3月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
128 1
Spark快速大数据分析PDF下载读书分享推荐
|
2月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
165 3
|
1月前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
39 3
|
2月前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
52 3
|
2月前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
下一篇
无影云桌面