Databricks 企业版 Spark&Delta Lake 引擎助力 Lakehouse 高效访问

简介: 本文介绍了Databricks企业版Delta Lake的性能优势,借助这些特性能够大幅提升Spark SQL的查询性能,加快Delta表的查询速度。

作者:
李锦桂(锦犀) 阿里云开源大数据平台开发工程师
王晓龙(筱龙) 阿里云开源大数据平台技术专家

背景介绍

Databricks是全球领先的Data+AI企业,是Apache Spark的创始公司,也是Spark的最大代码贡献者,核心围绕Spark、Delta Lake、MLFlow等开源生态打造企业级Lakehouse产品。2020年,Databricks 和阿里云联手打造了基于Apache Spark的云上全托管大数据分析&AI平台——Databricks数据洞察(DDI,Databricks DataInsight),为用户提供数据分析、数据工程、数据科学和人工智能等方面的服务,构建一体化的Lakehouse架构。
图片 1.png

Delta Lake是Databricks从2016年开始在内部研发的一款支持事务的数据湖产品,于2019年正式开源。除了社区主导的开源版Delta Lake OSS,Databricks商业产品里也提供了企业版Spark&Detla Lake引擎,本文将介绍企业版提供的产品特性如何优化性能,助力高效访问Lakehouse。

针对小文件问题的优化解法

在Delta Lake中频繁执行merge, update, insert操作,或者在流处理场景下不断往Delta表中插入数据,会导致Delta表中产生大量的小文件。小文件数量的增加一方面会使得Spark每次串行读取的数据量变少,降低读取效率,另一方面,使得Delta表的元数据增加,元数据获取变慢,从另一个维度降低表的读取效率。

为了解决小文件问题,Databricks提供了三个优化特性,从避免小文件的产生和自动/手动合并小文件两个维度来解决Delta Lake的小文件问题。

特性1:优化Delta表的写入,避免小文件产生

在开源版Spark中,每个executor向partition中写入数据时,都会创建一个表文件进行写入,最终会导致一个partition中产生很多的小文件。Databricks对Delta表的写入过程进行了优化,对每个partition,使用一个专门的executor合并其他executor对该partition的写入,从而避免了小文件的产生。

图片 2.png

该特性由表属性delta.autoOptimize.optimizeWrite来控制:

  • 可以在创建表时指定
CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);
  • 也可以修改表属性
ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

该特性有两个优点:

  1. 通过减少被写入的表文件数量,提高写数据的吞吐量;
  2. 避免小文件的产生,提升查询性能。

其缺点也是显而易见的,由于使用了一个executor来合并表文件的写入,从而降低了表文件写入的并行度,此外,多引入的一层executor需要对写入的数据进行shuffle,带来额外的开销。因此,在使用该特性时,需要对场景进行评估:

  • 该特性适用的场景:频繁使用MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT等SQL语句的场景;
  • 该特性不适用的场景:写入TB级以上数据。

特性2:自动合并小文件

在流处理场景中,比如流式数据入湖场景下,需要持续的将到达的数据插入到Delta表中,每次插入都会创建一个新的表文件用于存储新到达的数据,假设每10s触发一次,那么这样的流处理作业一天产生的表文件数量将达到8640个,且由于流处理作业通常是long-running的,运行该流处理作业100天将产生上百万个表文件。这样的Delta表,仅元数据的维护就是一个很大的挑战,查询性能更是急剧恶化。

为了解决上述问题,Databricks提供了小文件自动合并功能,在每次向Delta表中写入数据之后,会检查Delta表中的表文件数量,如果Delta表中的小文件(size < 128MB的视为小文件)数量达到阈值,则会执行一次小文件合并,将Delta表中的小文件合并为一个新的大文件。

该特性由表属性delta.autoOptimize.autoCompact控制,和特性delta.autoOptimize.optimizeWrite相同,可以在创建表时指定,也可以对已创建的表进行修改。自动合并的阈值由spark.databricks.delta.autoCompact.minNumFiles控制,默认为50,即小文件数量达到50会执行表文件合并;合并后产生的文件最大为128MB,如果需要调整合并后的目标文件大小,可以通过调整配置spark.databricks.delta.autoCompact.maxFileSize实现。

特性3:手动合并小文件

自动小文件合并会在对Delta表进行写入,且写入后表中小文件达到阈值时被触发。除了自动合并之外,Databricks还提供了Optimize命令使用户可以手动合并小文件,优化表结构,使得表文件的结构更加紧凑。在实现上Optimize使用bin-packing算法,该算法不但会合并表中的小文件,且合并后生成的表文件也更均衡(表文件大小相近)。例如,我们要对Delta表student的表文件进行优化,仅需执行如下命令即可实现:

OPTIMIZE student;

Optimize命令不但支持全表小文件的合并,还支持特定的分区的表文件的合并,例如,我们可以仅对date大于2017-01-01的分区中的小文件进行合并:

OPTIMIZE student WHERE date >= '2017-01-01'

从Databricks数据洞察产品上的试验数据看,Optimize能使查询性能达到8x以上的提升。

媲美企业级数据库的查询优化技术

Databricks在数据查询方面也做了诸多优化,包括:

特性1:Data Skipping

在数据查询系统中,有两种经典的查询优化 技术:一种是以更快的速度处理数据,另一种是通过跳过不相关的数据,减少需要扫描的数据量。Data Skipping属于后一种优化技术,通过表文件的统计信息跳过不相关的表文件,从而提升查询性能。

在向Delta表中新增表文件时,Delta Lake会在Delta表的元数据中存储该表文件中的数据前32列的统计信息,包括数据列的最大最小值,以及为null的行的数量,在查询时,Databricks会利用这些统计信息提升查询性能。例如:对于一张Delta表的x列,假设该表的一个表文件x列的最小值为5,最大值为10,如果查询条件为 where x < 3,则根据表文件的统计信息,我们可以得出结论:该表文件中一定不包含我们需要的数据,因此我们可以直接跳过该表文件,减少扫描的数据量,进而提升查询性能。

Data Skipping的实现原理和布隆过滤器类似,通过查询条件判断表文件中是否可能存在需要查询的数据,从而减少需要扫描的数据量。如果表文件不可能存在查询的数据,则可以直接跳过,如果表文件可能存在被查询的数据,则需要扫描表文件。

为了能尽可能多的跳过和查询无关的表文件,我们需要缩小表文件的min-max的差距,使得相近的数据尽可能在文件中聚集。举一个简单的例子,假设一张表包含10个表文件,对于表中的x列,它的取值为[1, 10],如果每个表文件的x列的分布均为[1, 10],则对于查询条件:where x < 3,无法跳过任何一个表文件,因此,也无法实现性能提升,而如果每个表文件的min-max均为0,即在表文件1的x列分布为[1, 1],表文件2的x列分布为[2, 2]...,则对于查询条件:where x < 3,可以跳过80%的表文件。受该思想的启发,Databricks支持使用Z-Ordering来对数据进行聚集,缩小表文件的min-max差距,提升查询性能。下面我们介绍Z-Ordering优化的原理和使用。

特性2:Z-Ordering优化

如上一节所解释的,为了能尽可能多的跳过无关的表文件,表文件中作为查询条件的列应该尽可能紧凑(即min-max的差距尽可能小)。Z-Ordering就可以实现该功能,它可以在多个维度上将关联的信息存储到同一组文件中,因此确切来说,Z-Ordering实际是一种数据布局优化算法,但结合Data Skipping,它可以显著提升查询性能。

Z-Ordering的使用非常简单,对于表events,如果经常使用列eventTypegenerateTime作为查询条件,那么执行命令:

OPTIMIZE events ZORDER BY (eventType, generateTime)

Delta表会使用列eventTypegenerateTime调整数据布局,使得表文件中eventTypegenerateTime尽可能紧凑。

根据我们在Databricks DataInsight上的试验,使用Z-Ordering优化能达到40倍的性能提升,具体的试验案例参考文末Databricks数据洞察的官方文档。

特性3:布隆过滤器索引

布隆过滤器也是一项非常有用的Data-skipping技术。该技术可以快速判断表文件中是否包含要查询的数据,如果不包含就及时跳过该文件,从而减少扫描的数据量,提升查询性能。

如果在表的某列上创建了布隆过滤器索引,并且使用where col = "something"作为查询条件,那么在扫描表中文件时,我们可以使用布隆过滤器索引得出两种结论:文件中肯定不包含col = "something"的行,或者文件有可能包含col = "something"的行。

  • 当得出文件中肯定不包含col = "something"的行的结论时,就可以跳过该文件,从而减少扫描的数据量,提升查询性能。
  • 当得出文件中可能包含col = "something"的行的结论时,引擎才会去处理该文件。注意,这里仅仅是判断该文件中可能包含目标数据。布隆过滤器定义了一个指标,用于描述出现判断失误的概率,即判断文件中包含需要查询的数据,而实际上该文件并不包含目标数据的概率,并称之为FPP(False Positive Probability: 假阳性概率)。

Databricks支持文件级Bloom过滤器,如果在表的某些列创建了布隆过滤器索引,那么该表的每个表文件都会关联一个 Bloom 筛选器索引文件,索引文件存储在表文件同级目录下的 _delta_index 子目录中。在读取表文文件之前,Databricks会检查索引文件,根据上面的步骤判断表文件中是否包含需要查询的数据,如果不包含则直接跳过,否则再进行处理。

布隆过滤器索引的创建和传统数据库索引的创建类似,但需要指定假阳性概率和该列可能出现的值的数量:

CREATE BLOOMFILTER INDEX ON TABLE table_name
FOR COLUMNS(col_name OPTIONS (fpp=0.1, numItems=50000000))

根据我们在Databricks DataInsight上的试验,使用布隆过滤器索引能达到3倍以上的性能提升,试验案例参考文末Databricks数据洞察的官方文档。

特性4:动态文件剪枝

动态文件剪枝(Dynamic File Pruning, DFP)和动态分区剪枝(Dynamic Partition Pruning)相似,都是在维表和事实表的Join执行阶段进行剪枝,减少扫描的数据量,提升查询效率。
下面我们以一个简单的查询为例来介绍DFP的原理:

SELECT sum(ss_quantity) FROM store_sales 
JOIN item ON ss_item_sk = i_item_sk
WHERE i_item_id = 'AAAAAAAAICAAAAAA'

在该查询中,item为维表(数据量很少),store_sales为事实表(数据量非常大),where查询条件作用于维表上。如果不开启DFP,那么该查询的逻辑执行计划如下:

图片 3.png

从上图可以看出,先对store_sales进行全表扫描,然后再和过滤后的item表的行进行join,虽然结果仅有4万多条数据,但却扫描了表store_sales中的80多亿条数据。针对该查询,很直观的优化是:先查询出表item中i_item_id = 'AAAAAAAAICAAAAAA'数据行,然后将这些数据行的i_item_sk值作为表store_sales的ss_item_sk的查询条件在表store_sales的SCAN阶段进行过滤,结合我们在上面介绍的Data Skipping技术,可以大幅减少表文件的扫描。这一思路正是DFP的根本原理,启动DFP后的逻辑执行计划如下图所示:

图片 4.png

可以看到,在启用DFP之后,过滤条件被下推到SCAN操作中,仅扫描了600多万条store_sales中的数据,从结果上看,启动DFP后,该条查询实现了10倍的性能提升,此外,Databricks还针对该特性对TPC-DS测试,测试发现启用DFP后,TPC-DS的第15条查询达到了8倍的性能提升,有36条查询实现了2倍及以上的性能提升。

总结

前文概括介绍了Databricks企业版Delta Lake的性能优势,借助这些特性能够大幅提升Spark SQL的查询性能,加快Delta表的查询速度。Databricks基于企业版Lakehouse架构为众多企业提供了价值,现599元即可试用阿里云Databricks数据洞察,体验企业版Spark&Delta Lake引擎的极致性能。https://www.aliyun.com/product/bigdata/spark

加入我们

阿里云计算平台事业部开源大数据生态企业团队负责阿里云上大数据开源生态方向的商业化产品接入,合作伙伴包括Databricks、Confluent、Cloudera、Starburst等开源领域的明星级企业,目前团队在火热招聘中,期待你的加入,邮箱请联系zongze.hzz@alibaba-inc.com

参考文档和试验数据
Databricks数据洞察官方文档 https://help.aliyun.com/document_detail/190745.html


产品技术咨询
https://survey.aliyun.com/apps/zhiliao/VArMPrZOR

加入技术交流群
DDI钉群.png

相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
7月前
|
SQL 分布式计算 数据安全/隐私保护
如何杜绝 spark history server ui 的未授权访问? 1
如何杜绝 spark history server ui 的未授权访问?
|
8月前
|
SQL 分布式计算 数据库连接
大数据Spark分布式SQL引擎
大数据Spark分布式SQL引擎
229 0
|
3天前
|
机器学习/深度学习 分布式计算 数据处理
Spark是一个基于内存的通用数据处理引擎,可以进行大规模数据处理和分析
【5月更文挑战第2天】Spark是一个基于内存的通用数据处理引擎,可以进行大规模数据处理和分析
25 3
|
3天前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
3天前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
108 2
|
3天前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
3天前
|
SQL 分布式计算 Apache
流数据湖平台Apache Paimon(五)集成 Spark 引擎
流数据湖平台Apache Paimon(五)集成 Spark 引擎
96 0
|
3天前
|
SQL 分布式计算 Java
Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)
Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)
51 0
|
6月前
|
机器学习/深度学习 分布式计算 大数据
Spark:大数据处理的下一代引擎
Spark:大数据处理的下一代引擎
50 0
|
7月前
|
SQL 分布式计算 Hadoop
配置Hive使用Spark执行引擎
在Hive中,可以通过配置来指定使用不同的执行引擎。Hive执行引擎包括:默认MR、tez、spark。
162 0