【译】Delta Lake 0.5.0介绍

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文主要对Delta Lake最新发布的0.5.0版本进行了介绍,介绍了如何使用Presto读取Delta表以及Delta Lake 0.5.0在并发性上的提升。

原文链接:https://databricks.com/blog/2020/01/29/query-delta-lake-tables-presto-athena-improved-operations-concurrency-merge-performance.html

最近,Delta Lake发布了0.5.0版本,该版本加入了对Presto和Athena的支持,以及提升了操作的并发性,本文将对Delta Lake 0.5.0版本的变化进行一个简单的介绍。

Delta Lake 0.5.0发布的几个最重要的特性如下:

  • 通过使用Manifest文件能够,支持其他数据处理引擎,现在能够使用Scala、Java、Python和SQL的API生成Manifest文件,并使用该文件通过Presto和Amazon Athena访问Delta Lake中的表数据,详细的使用方式见 https://docs.delta.io/0.5.0/presto-integration.html
  • 提升了Delta Lake所有操作的并发性,现在可以并发执行更多的Delta Lake操作。通过使用更加精细的冲突检测策略,Delta Lake的乐观并发控制得到了有效的改善,这使得我们能够更加容易地在Delta表上执行复杂的工作流。举例来说:

    1. 我们可以在给新分区添加内容的同时,在旧分区上执行delete操作;
    2. 在不相交的一组分区上同时执行update和merge操作;
    3. 让文件合并和增加文件内容同时执行

想要获取更多的信息,可以参考开源Delta Lake 0.5.0的release notes。在此博客文章中,我们将详细介绍如何使用Presto读取Delta Lake表、操作并发性的提升以及使用insert-only merge操作来更方便快速地去除重复数据。

使用Presto读取Delta Lake表

正如这篇文章Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs 所说,Delta Lake 的一些修改数据的操作,如delete操作,是通过给包含删除数据的文件写一个新版本,并只是将旧版本文件标记为已删除来实现的。Delta Lake采用这种方法的优势在于能够让我们查询旧版本的数据。如果我们想要了解哪些数据(或行)包含最新的数据,默认情况下我们可以去查询事务日志。其他数据处理系统,如Presto和Athena想要获取这些信息,可以通过读取Delta Lake生成的一种清单文本文件——Manifest,该文件中包含查询Delta Lake表需要读取的数据文件列表。为了实现Presto和Athena读取Delta Lake表,我们可以通过执行一些Python命令来实现,详细的内容可以参考 Set up the Presto or Athena to Delta Lake integration and query Delta tables

生成Delta Lake的Manifest文件

首先,使用以下代码片段创建Delta Lake的Manifest文件:

deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")

正如代码字面意思所示,以上操作将在表根目录中生成Manifest文件。如果你根据 Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs 这篇文章介绍的内容创建了departureDelays表,将会在表根目录中产生一个新的文件夹:

$/departureDelays.delta/_symlink_format_manifest

该文件夹中会有一个名为manifest的文本文件。如果你查看manifest文件的内容(例如使用cat命令),你将能看到类似以下的文本内容,它们指示了包含最新快照的文件。

file:$/departureDelays.delta/part-00003-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00006-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00001-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00000-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00000-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00001-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00002-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00007-...-c000.snappy.parquet

创建Presto表以读取生成的Manifest文件

接下来的步骤是在Hive Metastore中创建一个外部表,以便Presto(或Athena)可以读取上一步生成的Manifest文件,来获得需要读取的Parquet文件,以读取Delta Lake表的最新快照。需要说明的是,对于Presto,你可以使用Apache Spark或Hive CLI来运行以下命令:

1. CREATE EXTERNAL TABLE departureDelaysExternal ( ... )
2. ROW FORMAT SERDE
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
3. STORED AS INPUTFORMAT
4. OUTPUTFORMAT
   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
5. LOCATION '$/departureDelays.delta/_symlink_format_manifest'

一些重要的说明:

  • 第一行所定义的schema必须和Delta Lake中的schema相同
  • 第五行需要指向Manifest文件的位置——_symlink_format_manifest

Presto(或Athena)需要配置SymlinkTextInputFormat才能从Manifest文件中获取Parquet数据文件的列表,而不是使用目录列表中的文件。需要说明的是,如果想要使用分区表,需要按照Configure Presto to read the generated manifests这篇文章进行一些额外的步骤。

更新Manifest文件

需要注意的是,如果Delata Lake的数据有更新,都需要重新生成Manifest文件,以便Presto能够获取到最新的数据。

操作并发性的提升

在Delta Lake 0.5.0版本,我们能够同时执行更多的操作。通过更细粒度的冲突检测,这些最新的更新让Delta Lake能够更容易地在Delta Lake表上执行复杂的工作流,例如:

  • 可以在给新分区添加内容的同时,在旧分区上执行delete操作;
  • 追加文件内容的同时执行文件合并操作;
  • 在不相交的一组分区上同时执行update和merge操作。

并发追加文件内容的用例

举个例子,当我们在执行merge操作的同时,如果有并发的事务向同一个分区写入记录,Delta Lake往往会抛出ConcurrentAppendException异常。

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

上面的代码段就有可能会引发冲突,因为即使表已经按照date和country进行了分区,条件仍然不够明确。问题在于,这个查询将扫描整个表,从而可能与update任何其他分区的并发操作发生冲突。通过指定specificDatespecificCountry,以便可以在特定的date和country进行merge操作,现在我们就可以安全地在不同的date和contry同时执行此操作。

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND d.date = '" + specificDate + "' AND d.country = '" + specificCountry + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

以上方法适用于其他所有的Delta Lake操作(如delete、更改元数据等)。

并发文件合并

如果你连续不断地将数据写入Delta表,随着时间的流逝,将会累积出大量的文件。这在流式数据场景中尤为重要,因为此时是以比较小的batch写入数据的,这将会导致文件系统不断地累积小文件,随着时间的推移,小文件的数量会不断增加,会降低查询的性能。优化这种场景的一个比较重要的方式就是定期获取大批量的小文件,并将其重写为数量比较小的大文件(文件合并)。
过去,在同时进行数据查询和执行文件合并时,出现异常的可能性会非常高。但是现在,由于Delta Lake 0.5.0版本的优化改进,我们可以同时执行查询操作(包括流式查询)和文件的合并,并且不会有任何异常产生。举个例子来说,如果你的表已经进行了分区,并且你只想基于谓词对一个分区进行重新分区,则可以使用where来仅读取该分区,并使用replaceWhere回写该分区:

path = "..."
partition = "year = '2019'"
numFilesPerPartition = 16   # Compact partition of a table to no. of files

(spark.read
  .format("delta")
  .load(path)
  .where(partition)
  .repartition(numFilesPerPartition)
  .write
  .option("dataChange", "false")
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", partition)
  .save(path))

以上代码中需要注意的是,仅在没有数据更改时,才使用dataChange == false选项,否则可能会破坏底层数据。

使用Insert-only Merge操作方便快速地去除重复数据

一个场景的ETL用例是搜集日志,并将其附加到Delta Lake表当中,一个比较常见的问题是数据源会产生重复的日志记录。通过使用Delta Lake的merge,你可以避免插入这些重复的记录,例如以下涉及merge以及update航班数据的代码:

# Merge merge_table with flights
deltaTable.alias("flights") \
    .merge(merge_table.alias("updates"),"flights.date = updates.date") \
    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
    .whenNotMatchedInsertAll() \
    .execute()

在Delta Lake 0.5.0版本之前,不可能从Delta Lake表中将重复数据作为流进行读取,因为insert-only merge并不是纯粹地将数据追加到表中。
例如,在流查询中,你可以在foreachBatch中执行merge操作来连续不断地将流数据写入Delta Lake表当中,并将需要删除的重复数据打上标记。以下PySpark的代码展示了这个场景:

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
}

# Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream \
  .format("delta") \
  .foreachBatch(upsertToDelta) \
  .outputMode("update") \
  .start()

在另一个流式查询中,你可以从该Delta Lake表中连续读取需要删除的重复数据。这是可能的,因为insert-only merge操作(在Delta Lake 0.5.0版本引入)只会将新数据追加到Delta Lake表中。

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
7月前
|
存储 SQL Apache
Apache Hudi与Delta Lake对比
Apache Hudi与Delta Lake对比
102 0
|
存储 SQL JSON
Delta Lake、Hudi与Iceberg详解
Delta Lake、Hudi与Iceberg详解
1022 0
Delta Lake、Hudi与Iceberg详解
|
7月前
|
存储 数据采集 大数据
Data Lake架构揭秘
Data Lake架构揭秘
83 0
|
流计算
Delta Lake中CDC的实现
Delta Lake中CDC的实现
154 0
|
SQL 存储 分布式计算
数据湖揭秘—Delta Lake
Delta Lake 是 DataBricks 公司开源的、用于构建湖仓架构的存储框架。能够支持 Spark,Flink,Hive,PrestoDB,Trino 等查询/计算引擎。作为一个开放格式的存储层,它在提供了批流一体的同时,为湖仓架构提供可靠的,安全的,高性能的保证。
4081 7
数据湖揭秘—Delta Lake
|
SQL 分布式计算 搜索推荐
《 Delta Lake 数据湖专题系列5讲》文章回顾
《Delta Lake 数据湖专题系列5讲》由阿里云 DDI 团队翻译整理自大数据技术公司 Databricks 针对数据湖 Delta Lake 系列技术文章。阅读完此系列文章可以帮助您达到入门级,对数据湖 Lakehouse 有整体上的认识和应用,掌握理论知识体系。
《 Delta Lake 数据湖专题系列5讲》文章回顾
|
存储 SQL 分布式计算
Delta Lake,让你从复杂的Lambda架构中解放出来
Linux 基金会的 Delta Lake(Delta.io)是一个给数据湖提供可靠性的开源存储层软件。在 QCon 全球软件开发大会(上海站)2019 的演讲中,Databricks 公司的 Engineering Manager 李潇带我们了解了 Delta Lake 在实际生产中的应用与实践以及未来项目规划,本文便整理自此次演讲。
Delta Lake,让你从复杂的Lambda架构中解放出来
|
SQL 存储 分布式计算
Data Lake 三剑客——Delta、Hudi、Iceberg 对比分析
定性上讲,三者均为 Data Lake 的数据存储中间层,其数据管理的功能均是基于一系列的 meta 文件。meta 文件的角色类似于数据库的 catalog/wal,起到 schema 管理、事务管理和数据管理的功能。
15896 2
Data Lake 三剑客——Delta、Hudi、Iceberg 对比分析
|
存储 数据采集 SQL
Delta Lake - 数据湖的数据可靠性
Delta Lake 是一个开源的存储层,为数据湖带来了可靠性。Delta Lake 提供了ACID事务、可伸缩的元数据处理以及统一的流和批数据处理。它运行在现有的数据湖之上,与 Apache Spark API完全兼容。
Delta Lake - 数据湖的数据可靠性