EMR Spark Relational Cache如何支持雪花模型中的关联匹配

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 我们需要找到一种方式可以通过单个Relational Cache支持优化多个关联查询的方式,从而在加速用户查询的同时,减少创建和更新relational cache的代价。Record Preserve Join是支持这种优化的非常有效的方式。

作者:李呈祥,阿里巴巴计算平台事业部EMR团队高级技术专家,Apache Hive Committer, Apache Flink Committer,深度参与了Hadoop,Hive,Spark,Flink等开源项目的研发工作,对于SQL引擎,分布式系统有较为深入的了解和实践,目前主要专注于EMR产品中开源计算引擎的优化工作。

Relational Cache相关文章链接:
使用Relational Cache加速EMR Spark数据分析
使用EMR Spark Relational Cache跨集群同步数据
EMR Spark Relational Cache的执行计划重写

背景

Join是Spark SQL中非常常见的操作,数据表按照业务语义的范式化表定义,便于用户理解与使用,而且可以消除冗余数据。用户通过join操作将相关的数据关联后进行进一步的过滤,聚合等操作。在Spark中,Join通常是代价比较大,尤其是当join的两个表的数据都比较大,无法优化为map join时,需要通过网络shuffle两个表的数据,对数据按照jion字段进行重新组织。Relational Cache是EMR Spark支持的重要特性,类似于数据仓库的物化视图,将反范式化表(即关联后的大表)保存为relational cache,便可以使用cache重写执行计划,提高查询效率。但是A ⋈ B ⋈ C作为relational cache只能用来优化包含A ⋈ B ⋈ C的查询,理论上是不能用来优化只包含A ⋈ B或A ⋈ C的查询。如果表的数量很多,假设有n个表,则总共可能有2^n个关联结果(当然在业务上并不会出现任意两个表都可以关联的情况,但是可能的关联数量依然会非常大),将每个关联结果都作为relational cache构建代价太大,并不现实,我们需要找到一种方式可以通过单个Relational Cache支持优化多个关联查询的方式,从而在加速用户查询的同时,减少创建和更新relational cache的代价。Record Preserve Join是支持这种优化的非常有效的方式。

什么是Record Preserved Join

定义:对于表A和表B,如果A的每一条记录都出现在A ⋈ B的结果中,并且A ⋈ B的结果并没有其他额外的记录,那么我们称A ⋈ B record preserved on A。

下面我们看一下一个Record preserved join的简单示例。
3

如上图所示,activity_log为事实表,product,user,city为维度表,activity_log表通过user_id字段和user表关联,通过product_id字段与product表关联,user表通过city_id字段与city表关联,关联的结果如下:

4

如果我们把上述的activity_log,product,user以及city四表的关联结果作为Relational Cache,理论上只有后续的查询包含这四个表的关联时,才能够使用cache优化SQL执行计划,如果查询只包含部分表的关联,比如只是activity_log和user关联,是没有办法使用之前的cache优化的。但是我们仔细观察可以发现,每一行activity_log表中的记录,其user_id都和user表中的其中一条且仅一条记录相关联,对于product_id和city_id的关联也是同样如此,可以看到关联后的结果,仅仅是为activity_log表中的每条记录增加更多的字段,activity_log中的每条记录和关联结果中的每条记录是一一对应的关系,这种join结果就是record preserved join。由于activity_log和user关联的数据每一条都和cache中的数据一一对应,不多也不少,所以实际上我们应该允许使用该cache优化包含activity_log和user关联的查询。比如对于上面的示例,我们把图2中四表关联的结果保存为Relational Cache activity_flat_cache,那么对于query

由于所有的activity_log记录都在activity_flat_cache中,我们可以将该查询改写为

同理,我们可以使用activity_flat_cache优化任意包含activity_log与其他单个或多个维度表的关联查询,使用同一个cache优化多种关联场景,大大降低relational cache维护和更新所需的存储和计算成本。

为了能够在Relational Cache中基于Record Preserved Join支持更丰富的优化场景,我们需要首先解决两个问题:

  1. Record preserved Join对于关联两表的数据有非常严格的约束,Relational Cache如何知道两个表关联的结果是否为Record preserved Join。
  2. 已知1的信息,如何推导Relational Cache是否可用于Join查询的执行计划优化。

Record Preserved Join声明

一个常见的Record Preserved Join是Left Outer Join,对于任意的表A和B,A left outer join B record preserved on A。根据表A和B中join key的数据分布,最常见的也可能满足record preserved join的条件。在EMR Spark中,用户可以定义表的主外键和NOT NULL约束,通过表的主外键和NOT NULL约束,Relational Cache可以推断出两表关联是否满足Record Preserved Join。

根据外键的定义,外键的值必须存在其引用的主键中,或者为空,其引用的主键又必须不能重复,所以如果存在表A和表B,而且表A中的外键字段关联表B的主键,且外键字段有NOT NULL约束,则我们可以确定A INNER JOIN B的结果record preserved on A

以上面四个表的关系为例,在Spark SQL定义主外键的DDL示例如下:

ALTER TABLE product ADD CONSTRAINT prd_pk PRIMARY KEY (product_id);
ALTER TABLE user ADD CONSTRAINT user_pk PRIMARY KEY (user_id);
ALTER TABLE city ADD CONSTRAINT city_pk PRIMARY KEY (city_id);
ALTER TABLE activity_log ADD CONSTRAINT act_prd_fk FOREIGN KEY (product_id) REFRENCES product (product_id);
ALTER TABLE activity_log ADD CONSTRAINT act_user_fk FOREIGN KEY (user_id) REFRENCES user (user_id);
ALTER TABLE user ADD CONSTRAINT user_city_fk FOREIGN KEY (city_id) REFRENCES city (city_id);

结合各外键字段的NOT NULL约束,我们可以推断出如下Record Preserved Join:

  • activity_log inner join product record preserved on activity_log
  • activity_log inner join user record preserved on activity_log
  • user inner join city record preserved on user

使用Record Preserved Join优化优化执行计划

EMR Spark支持通过任意的SQL查询创建Relational Cache,可能包含关联,聚合,过滤,投影等各种操作,其中关联也包括record preserved join和其他join,如何利用到其中的record preserved join特性对更多的查询优化其执行计划,决定了我们对于Relational Cache的利用效率。Relational Cache通过比较用户查询和cache视图的执行计划来决定是否可以使用cache代替查询执行计划或其一部分,在匹配Join时判断的主要步骤如下:

  1. 收集用户查询中的join相关信息,与Relational Cache中join相关信息,找到两者并集,且并集中所有表都是关联的。
  2. 对于Relational Cache中的除1中并集外的其他关联操作,根据用户定义的约束推断出来的record preserved join信息,判断Relational Cache其他关联操作的结果是否是record preserved on 并集结果。
  3. 使用cache替换并集,并和用户查询中剩余的其他表重新拼接join。
  4. 继续适配执行计划其他部分。

例如我们创建了relational cache (A ⋈ B) ⋈ C,且 A ⋈ B record preserved on A & A ⋈ C record preserved on A, 用户查询为A ⋈ C,

在判断过程中,直接从约束的得到的Record Preserved Join信息可能并不足够,我们还需要通过一些定律进一步推理,从而充分利用Record Preserved Join信息优化更多的查询。

Record Preserved Join推理

根据record preserved join的定义和关系代数的基本原理,我们可以推导出如下定理。

1.等价

then A full outer join B = A left outer join B
if A inner join B record preserved on A
then A inner join B = A left outer join B

已知A left outer join B record preserved on A,如果同时A full outer join B record preserved on A的话,那么我们可以确定A left outer join B和A full outer join B的结果一致,可以互相替换。对于Inner Join同样如此。

1.交换

  then B reverse(⋈) A record preserved on A
  def reverse(join) join match {
    case INNER => INNER
    case LEFT OUTER => RIGHT OUTER
    case FULL OUTER => FULL OUTER
  }

根据关系代数的基本定义可以得到reverse函数,加上record preserved join定义,可以很方便的推导出此定理。实际的查询中,join的顺序可能和Relational Cache中并不一致,可能需要变换join顺序进行比较。

1.结合

  and B ⋈ C record preserved on B
then A ⋈ B join C record preserved on A

由于B ⋈ C record preserved on B,可以认为B join C的结果是在B表中新增更多的维度列,所以A ⋈ B ⋈ C的结果和A ⋈ B的结果记录数一致,A ⋈ B record preserved on A,所以A ⋈ B ⋈ C record preserved on A。

同理,也可推导出:

     and A ⋈ C record preserved on A
   then (A ⋈ B) ⋈ C record preserved on A and (A ⋈ B),
     (A ⋈ C) ⋈ B record preserved on A and (A ⋈ C),

1.传导

  and B ⋈ C record preserved on B
then A ⋈ C record preserved on A
// same join type, same join key

由于A ⋈ B record preserved on A和B ⋈ C record preserved on B可以得知A ⋈ B ⋈ C record perserved on A,如果A ⋈ B中的join字段和A ⋈ C中A的join字段一致,且B ⋈ C中的join字段和A ⋈ C中C的join字段一致,将A ⋈ B ⋈ C结果中的B相关字段去掉,即为A join C,其结果依然record preserved on A。

雪花数据模型

Relational Cache一个重要的使用场景是决策支持系统,通过BI,报表或多维数据分析快速支持用户的商业决策。在这种场景中,数据模型通常包括一个事实表(Fact Table)和多个维度表(Dimension Table),对于事实表和维度表的关联关系,可以大体分为三种类型:

  1. Star Schema:所有的维度表都是反范式化(denormalized)的,即维度表只有一层,事实表可以和任意维度表直接关联。
  2. Snowflake Schema:所有的维度表都是范式化(normalized)的,即维度表有多层,事实表需要通过多次关联才能关联到全部维度数据。
  3. Starflake Schema:部分维度表是范式化的,部分维度表是反范式化的。

在Star/Snowflake/Starflake数据模型中,事实表和维度表的数据存在着业务上的关联关系,实际的数据也满足主外键/非空字段等约束条件,是验证在执行计划优化时使用Record Preserved Join的合适场景。在MOLAP引擎中,例如apache kylin,通常需要用户描述Star/Snowflake/Starflake数据模型,结合维度和统计列信息构建Cube,用于快速响应多维分析请求。用户的多维分析查询可能涉及到事实表和一个或者多个维度表的关联,实际上Star/Snowflake/Starflake数据模型的定义也隐含着事实表和维度表的Record Preserved Join约束,Relational Cache通过更基础的字段约束定义,推导出Record Preserved Join,从而支持使用relational cache构建cube,通过执行计划重写,满足交互式的多维分析查询需求。Relational Cache的Record Preserved Join推导不仅可用于基于雪花模型的多维分析场景,也可以用于其他涉及到Join的场景,拓展relational cache可优化的查询场景,减少维护的成本和代价。

使用Record Preserved Join优化雪花模型示例

我们使用第二节中的表及其约束,构建Relational Cache,假设用户需要进行多维分析,构建一个Full Cube语句如下:

USING parquet
AS 
SELECT product_name, user_name, city_name, count(1), GROUPING_ID() AS grouping_id 
FROM activity_log, user, product, city 
WHERE activity_log.product_id = product.product_id and activity_log.user_id = user.user_id and user.city_id = city.city_id
GROUP BY CUBE(product_name, user_name, city_name);

用户查询如下:

在匹配Join时判断的主要步骤如下:

  1. cache和用户查询join的并集为:activity_log ⋈ product
  2. cache中剩余的表为user和city,这一步可能重复多次,在第一轮通过activity_log ⋈ user record preserved on activity_log以及activity_log ⋈ product record preserved on activity_log使用结合律2推导出(activity_log ⋈ product) ⋈ user record preserved on (activity_log ⋈ product), 在第二轮使用结合律1和上轮的结果推导出(activity_log ⋈ product) ⋈ user ⋈ city record preserved on (activity_log ⋈ product), 从而得出结论cache可以用于替换activity_log ⋈ product。
  3. 继续其他部分执行计划的匹配和重写。

可以看到,基于Record Preserved Join及其推理,我们可以使用单个大宽表(包含事实表和所有维度表关联的结果)作为cache优化所有包含事实表activity_log的关联查询,以此为基础,我们构建的activity_cube可以用于优化基于各个维度组合的查询,结合我们在聚合层面的匹配策略,支持Starflake模型数据的交互式多维分析。

总结

Relational Cache通过Spark中表的各种字段约束信息,推导出Record Preserved Join,结合更进一步的推理规则,使得relational cache可以通过一个宽表的cache优化多种关联查询的场景。在star/snowflake/starflake数据模型下,通过将事实表和所有维度表关联并根据维度聚合后的结果(即Cube)保存为relational cache后,通过Record Preserved Join的推导,relational cache在执行计划优化时可以使用cube数据重写各种维度组合的多维分析查询的执行计划,从而满足亚秒级响应的交互式分析需求。

_

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
36 4
|
19天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
24天前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
118 2
|
2月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
149 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
2月前
|
存储 缓存 分布式计算
Spark cache()与unpersist()使用位置
Spark在执行过程中是懒加载模式,RDD转换仅仅是构建DAG描述而不执行,只有遇到action算子才会真正的运行
49 9
|
4月前
|
弹性计算 分布式计算 运维
迟来的EMR Serverless Spark评测报告
本文是一篇关于阿里云EMR Serverless Spark产品评测的文章,作者分享了使用体验和理解。EMR Serverless Spark是阿里云提供的全托管、一站式的Spark数据计算平台,简化了大数据处理流程,让用户专注于数据分析。文章提到了产品的主要优势,如快速启动、弹性伸缩、高资源利用率和低成本。
236 8
|
3月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
205 0
|
19天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
54 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0