EMR Spark Relational Cache 利用数据预组织加速查询

简介: 本文介绍了EMR Spark的Relational Cache如何从数据量较大的Cube中快速提取出所需数据加速查询的原理。通过列式存储、文件索引、Z-Order等技术,我们可以快速过滤数据,大大减少实际发生的IO数据量,避免IO瓶颈的出现,从而优化整体查询性能。

作者:
王道远,花名健身,阿里云EMR技术专家,Apache Spark活跃贡献者,主要关注大数据计算优化相关工作。


Relational Cache相关文章链接:
使用Relational Cache加速EMR Spark数据分析
使用EMR Spark Relational Cache跨集群同步数据
EMR Spark Relational Cache的执行计划重写
EMR Spark Relational Cache如何支持雪花模型中的关联匹配

背景

在利用Relational Cache进行查询优化时,我们需要通过预计算,存储大量数据。而在查询时,我们真正需要读取的数据量也许并不大。为了能让查询实现秒级响应,这就涉及到优化从大量数据中快速定位所需数据的场景。本文介绍在EMR Spark Relational Cache中,我们如何针对这种场景进行了优化。

存储格式

在数据存储格式上,我们默认选择Spark社区支持最好的Parquet格式。Parquet是一种列式存储格式,我们可以很方便地利用列式存储格式进行字段裁剪。另外,Parquet的每个数据文件由多个Row Group组成,同时在每个数据文件的footer中记录了各个Row Group的统计信息,如最大值、最小值等。这些统计信息可以在读取数据时减少实际的IO开销。事实上,在现在的Spark版本中,我们可以看到Catalyst优化器已经把可以下推的一些过滤条件下推到了Parquet reader,利用Parquet文件的统计信息过滤真正需要读取的Row Group,从而实现减少IO量,加速查询时间的效果。这也是列存格式基本都支持的功能。

对于Relational Cache而言,有很多过滤条件时确定已知的。我们直接利用这一特性,将确定的查询条件下推到Parquet reader里,由Parquet reader完成对Row Group的选择。由于实际要读取的数据量占总数据量的比重往往很小,这种过滤的实际效果还是比较好的。

image

数据分区

对于Spark Relational Cache来说,由于构建Cube时会使用到Expand算子,我们需要引入Grouping ID来区分不同的grouping set。在大部分后续的查询中,我们往往只需要其中一个Grouping ID所对应的数据。因此,Grouping ID成了一个天然的数据分区选择。在Hive/Spark等大数据分析引擎中,数据分区是对于结构化数据,将其中一个或多个字段的具体值作为目录,分目录存放文件的一种常见做法。当我们确定要选择某Grouping ID对应的数据时,我们只需读取对应目录中的数据即可。这种做法可以直接忽略Grouping ID不匹配的文件,从而大大减少启动的总task数量,减少Spark的任务调度开销。

文件索引

当总数据量较大时,存储的文件数也会比较多。此时即使我们通过Parquet的footer可以获得较好的过滤效果,我们还是要启动一些task去读取这些footer。在Spark实际的实现中,往往需要与文件数量的量级相当的task去进行footer读取。在集群计算资源有限时,调度这些任务就显得比较浪费时间。为了能进一步减少Spark作业的调度开销,进一步提高执行效率,我们实现了文件索引来优化这种场景。

文件索引就类似于独立的footer。我们提前收集每个文件中各字段数据的最大最小值,并存储在一个隐藏的数据表中。这样,我们只需要读取一个单独的表就可以从文件层面对需要处理的文件做一个初步的过滤。这是一个单独的stage,由于一个文件只对应这个隐藏表中的一条记录,因此读取隐藏表所需的task数量要远远小于读取所有数据文件footer的开销。后续stage的任务数量也因此可以大大减少。在Relational Cache的访问场景下,整体加速效果非常明显。

数据排序

为了能实现高效的数据准备过程,不论是在Parquet文件的footer还是在我们实现的文件索引中,都是主要依靠最大值和最小值的信息来过滤数据。那么在极端场景下,光靠这些统计信息可能会完全没有过滤的效果。举个例子,如果某个key的所有数据文件、所有Row Group的最大值和最小值都等于全局最大值和最小值时,对这个key的过滤就完全无效了。这样,我们会自然而然的想到对数据进行排序。

但是,传统的数据排序还有一个问题。在数据库中,当我们对多个字段进行排序时,往往字段之间具有主次关系,这就导致排序字段序列中,排在最前面的字段有很好的过滤效果,而排得靠后的字段因为数据分散,往往过滤效果越来越差。这就需要我们找到更好的排序方法,能够兼顾到多个字段的数据过滤效果。

这里涉及到一个空间填充曲线的概念。我们可以把数据想像成一个有限空间,如何将数据进行排序和分块,能够使得每一块的最值都只是在一个不大的范围内,从而让文件索引获得较好的过滤效果呢?我们选择了Z-Order曲线对多维数据空间进行排序,这样可以保证每列都有较为均衡的过滤效果。下图是二维空间中Z-Order曲线的示意图。

image
不过我们也要注意到,随着排序列的增加,单列的过滤效果将会越来越差。因此在实际运用中,我们也要对排序列进行取舍,才能获得最佳的整体效果。

小结

本文介绍了EMR Spark的Relational Cache如何从数据量较大的Cube中快速提取出所需数据加速查询的原理。通过列式存储、文件索引、Z-Order等技术,我们可以快速过滤数据,大大减少实际发生的IO数据量,避免IO瓶颈的出现,从而优化整体查询性能。

参考
Apache Parquet
Processing Petabytes of Data in Seconds with Databricks Delta
Z-Order Curve


_

相关文章
|
6月前
|
存储 分布式计算 Serverless
|
3月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
194 2
|
4月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
183 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
4月前
|
存储 缓存 分布式计算
Spark cache()与unpersist()使用位置
Spark在执行过程中是懒加载模式,RDD转换仅仅是构建DAG描述而不执行,只有遇到action算子才会真正的运行
63 9
|
5月前
|
SQL 存储 分布式计算
|
5月前
|
存储 安全 API
阿里云EMR数据湖文件系统问题之JindoFS元数据查询和修改请求的问题如何解决
阿里云EMR数据湖文件系统问题之JindoFS元数据查询和修改请求的问题如何解决
|
6月前
|
弹性计算 分布式计算 运维
迟来的EMR Serverless Spark评测报告
本文是一篇关于阿里云EMR Serverless Spark产品评测的文章,作者分享了使用体验和理解。EMR Serverless Spark是阿里云提供的全托管、一站式的Spark数据计算平台,简化了大数据处理流程,让用户专注于数据分析。文章提到了产品的主要优势,如快速启动、弹性伸缩、高资源利用率和低成本。
265 8
|
6月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
DataWorks 安全 API
DataWorks产品使用合集之是否可以不使用DataWorks进行EMR的调度和DataX数据导入
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
242 0