【译】Delta Lake 0.4.0 新特性演示:使用 Python API 就地转换与处理 Delta Lake 表

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文以案例演示在最新的 Delta Lake 0.4.0 中,如何转换 Delta Lake 表,使用全新的 Python API 执行 upsert 与删除数据,用时间旅行 (time travel) 查询数据的旧版本,以及 vacuum 语句清理旧版本。

编译:陈强,花名无咎,阿里巴巴技术专家,目前专注于EMR产品的管控与数据治理的研发工作。


我们激动地宣布 Delta Lake 0.4.0 发布,本次发布包含操纵与管理 Delta Lake 表的 Python API。关键特性包括:

  • Python APIs for DML and utility operations (#89) - 现在,您可以使用 Python API 更新、删除、合并 Delta Lake 表,并对表执行实用操作(即:vacuum,history)。这个特性对于使用Python进行复杂工作非常有用,例如:渐变维度(SCD)操作,合并重复变化数据流式查询中执行upserts。查看文档了解更多详情。
  • Convert-to-Delta (#78) – 您可以就地转换 Parquet 表到 Delta Lake 表,而无需重写任何数据。这个特性对于转换非常大的 Parquet 表非常有用,如果重写为 Delta 表,将产生巨大开销。并且,这个过程是可逆的 —— 您可以转换 Parquet 为 Delta Lake 表,执行操作(例如删除和合并),然后轻松转换回 Parquet 表。查看文档了解更多详情。
  • SQL for utility operations – 现在,您可以使用 SQL 来执行实用操作 vacuum 和 history。关于如何配置 Spark 以执行这些 Delta 特定的 SQL 命令,查看文档以了解更多详情。

要了解更多信息,请查看 Delta Lake 0.4.0 发布说明,以及 Delta Lake Documentation > Table Deletes, Updates, and Merges

delta-lake-0.4.0-merge-python.gif

在本文中,我们将基于 Apache Spark™ 2.4.3,演示一个准时航班情况业务场景中,如何使用全新的 Delta Lake 0.4.0 Python API。我们将展示如何 upsert 与删除数据,用时间旅行 (time travel) 查询数据的旧版本,以及用 vacuum 清理旧版本。

如何上手Delta Lake

使用--option选项使用 Delta Lake 包。在本文的例子中,我们也会演示在 Spark 中执行文件的VACUUM操作,以及执行 Delta Lake SQL 命令。为了完成这个简短的演示,我们要做以下设置:

  • spark.databricks.delta.retentionDurationCheck.enabled=false 允许对默认保留时长(7天)之内的文件执行 VACUUM 。注意,只有 SQL 命令 VACUUM 才需要这个配置。
  • spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension 允许在 Apache Spark 中执行 Delta Lake SQL 命令;Python or Scala API 调用不需要这个配置。
# 使用 Spark Packages
./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

加载并保存 Delta Lake 数据

业务场景使用“航班准时延误情况数据集”,由美国交通运输局的 航班出发统计 生成。使用后者的案例有 2014 Flight Departure Performance via d3.js CrossfilterOn-Time Flight Performance with GraphFrames for Apache Spark™。这份数据集可以从这个github地址下载到您本地。在 pyspark 中开始读取数据集。

# 路径变量
tripdelaysFilePath = "/root/data/departuredelays.csv"
pathToEventsTable = "/root/deltalake/departureDelays.delta"

# 读取航班延误数据
departureDelays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(tripdelaysFilePath)

接下来,让我们将出发延误数据集保存到 Delta Lake 表。通过将表存储到 Delta Lake,我们可以利用这些特性:ACID 事务,统一批流处理,以及时间旅行 (time travel) 。

# 将航班延误数据保存为 Delta Lake 格式
departureDelays \
.write \
.format("delta") \
.mode("overwrite") \
.save("departureDelays.delta")

注意,此做法与一般的保存Parquet数据类似,不过您现在要指定format("delta")而不是format("parquet")。如果您查看下层的文件系统,可以发现 Delta Lake 表 departureDelays 创建了四个文件。

/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet

注意:_delta_log 是包含 Delta Lake 事务日志的文件夹。您可以在这个文档 Diving Into Delta Lake: Unpacking The Transaction Log 查看更多信息。

现在,让我们重新加载数据,不过现在我们的 DataFrame 将由 Delta Lake 支持。

# 将航班延误数据加载为 Delta Lake 格式
delays_delta = spark \
.read \
.format("delta") \
.load("departureDelays.delta")

# 创建临时视图
delays_delta.createOrReplaceTempView("delays_delta")
 
# 西雅图与旧金山之间的航班有几次
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

delta-lake-0.4.0-initial.png

最后,让我们查明从西雅图到旧金山的航班次数。在本数据集中有1698次。

就地转换到 Delta Lake

如果您已有 Parquet 表,您可以将表就地转换为 Delta Lake 表,因此不需要重写表。您可以运行以下命令来转换:

from delta.tables import *

# 转换位于路径 /path/to/table 下的 Parquet 无分区表
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

# 转换位于路径 /path/to/table 下的 Parquet 分区表,分区列是名为 part 的 integer 列
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")

想了解如何更多信息,包括如何在 Scala 和 SQL 中转换,参考 Convert to Delta Lake

删除航班数据

从传统数据湖表中删除数据,您需要:

  1. select所有数据,排除您要删的那些行
  2. 基于上述查询创建一个新表
  3. 删除原表
  4. 新表重命名为原表,以满足下游依赖

在 Delta Lake 中,我们可以简单运行一个 DELETE 语句来完成删除,而不需要以上的步骤。为了演示,我们来删除所有提前和准时的航班(即 dalay < 0)。

from delta.tables import *
from pyspark.sql.functions import *

# 访问 Delta Lake 表
deltaTable = DeltaTable.forPath(spark, pathToEventsTable
)
# 删除所有准时和提前的航班
deltaTable.delete("delay < 0") 

# 西雅图出发到旧金山的航班有几次
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

delta-lake-0.4.0-after-delete.png

我们删除所有准时和提前的航班之后,在查询中可以看到,西雅图出发到旧金山的延误航班有837次。如果您查看文件系统,可以注意到,删除数据之后文件反而变多了。

/departureDelays.delta$ ls -l
_delta_log
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet

在传统的数据湖中,删除是以重写被删除数据之外的整张表来实现。在 Delta Lake 中,删除操作则是选择性地写入文件的新版本,其中包括被删除的数据,而原先的文件只被置为删除。这是因为 Delta Lake 使用了多版本并发控制 (MVCC) 来完成表的原子操作:例如,当一名用户正在删除数据,另一名用户可能在查询表的先前版本。这种多版本模型让我们可以进行时间旅行(time travel),查询到先前的版本。稍后我们将看到这个例子。

更新航班数据

在传统数据湖中更新表,您需要:

  1. select出所有数据,排除您要更新的那些行
  2. 修改需要更新/变化的行
  3. 将这两张表合并为一张新表
  4. 删除原表
  5. 新表重命名为原表,以满足下游依赖

在 Delta Lake 中,我们可以简单运行一个 UPDATE 语句来完成删除,而不需要以上的步骤。为了演示,我们来更新所有底特律出发到西雅图的航班。

# 所有底特律出发到西雅图的航班,出发地修改为西雅图
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } ) 

# 西雅图出发到旧金山的航班有几次
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

delta-lake-0.4.0-after-update.png

把底特律航班标记为西雅图航班之后,现在我们有986次从西雅图出发到旧金山的航班了。如果您在文件系统中列出 departureDelays 文件夹 ($../departureDelays/ls -l),会发现现在有11个文件(不同于删除文件后的8个,以及创建表之后的4个)。

合并航班数据

有个常见的场景,在数据湖中持续地给表追加数据。这常常导致重复数据(您不想再次插入表的行)—— 新行需要插入,有些行需要更新。在 Delta Lake 中,这些操作可以使用合并操作实现(类似 SQL 中的 MERGE 语句)。

我们以一个简单的数据集开始,您需要在其中使用下列查询来更新、插入或去重。

# 在此时间范围内,西雅图与旧金山之间的航班有哪些
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

查询结果如下表。注意,本文给数据打上了颜色,以清楚地区分哪些行是去重的(蓝色),更新的(黄色),以及插入的(绿色)。

delta-lake-0.4.0-merge-source-table.png

接下来,让我们编写以下代码片段,生成自己的 merge_table,包含将要插入、更新或去重的数据。

items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination']
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()

delta-lake-0.4.0-merge-merge-table.png

在以上的表 (merge_table) 中有三行,各有唯一的 date 值:

  1. 1010521: 用新的延误值更新这些行(黄色)来 update flights 表
  2. 1010710: 这是重复的行(蓝色)
  3. 1010822: 这是要插入的新行(绿色)

在 Delta Lake 中,如下列代码片段所示,可以使用一条简单的 merge 语句来完成。

# flights 合并 merge_table
deltaTable.alias("flights") \
    .merge(merge_table.alias("updates"),"flights.date = updates.date") \
    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
    .whenNotMatchedInsertAll() \
    .execute()

# 在此时间范围内,西雅图与旧金山之间的航班有哪些
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

去重、更新和插入新行三个操作,使用一条语句高效地完成了。

delta-lake-0.4.0-merge-after-merge.png

查看表历史

如上所述,在每一个事务(删除、更新)之后,文件系统中产生了更多文件。这是因为每个事务都有不同的Delta Lake 表版本,我们可以从下面的 DeltaTable.history() 方法中查看。

deltaTable.history().show()
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      2|2019-09-29 15:41:22|  null|    null|   UPDATE|[predicate -> (or...|null|    null|     null|          1|          null|        false|
|      1|2019-09-29 15:40:45|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|          null|        false|
|      0|2019-09-29 15:40:14|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|       null|          null|        false|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

注意,你也可以使用 SQL 来实现:

spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()

可以看到,三行展现了不同的表版本,对应各自的表操作(创建表、删除表、更新表)。(下面给出缩略版以便阅读)

delta-lake-0.4.0-abridged-table-history.png

用表历史回溯到过去

使用时间旅行(Time Travel),您可以以版本或者时间戳来查看 Delta Lake 表。可以参考 Delta Lake 文档 Read older versions of data using Time Travel 了解更多。为了查看历史数据,指定 versionTimestamp 选项;在以下代码片段中,我们指定 version 选项。

# 使用各自版本加载 DataFrame
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")

# 计算每个历史版本中,西雅图出发到旧金山的航班数
cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()

# 把值打印出来
print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))

## 输出
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986

不论是企业治理、风险管理和合规 (GRC),还是回滚错误,Delta Lake 都包含了元数据(例如记录了一个产生删除的操作),以及数据(例如实际被删的行)。不过,因为合规或者数据大小原因删除数据文件,应该怎么做呢?

使用 vacuum 清理旧的表版本

Delta Lake 的 vacuum 方法,可以默认删除7天以前的所有行(与文件),参考:Delta Lake Vacuum

如果您查看文件系统,可以看到表的11个文件。

/departureDelays.delta$ ls -l
_delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

为了删除当前快照之外的所有文件,您需要给 vaccum 方法指定一个很小的值(默认的保留时长是7天)。

 # 删除所有0小时以上的文件
deltaTable.vacuum(0) 

注意:您可以使用SQL完成相同任务:

# 删除所有0小时以上的文件

spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)

一旦 vacuum 完成,您再查看文件系统,将看到文件变少,因为历史数据已被删除。

/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

注意,执行vacuum 之后,不能对早于预留时间的版本进行时间旅行。

下一步

为了尝试 Delta Lake,您可以在您的 Apache Spark 2.4.3 (或更高版本) 实例尝试以上代码片段。通过 Delta Lake,您的数据湖将更加可靠(不论是创建新的 Delta Lake 或从已有的数据湖迁移)。想学习更多,请查看 https://delta.io/ 并加入 SlackGoogle Group 上的 Delta Lake 社区。您可以在 github milestones 中跟踪所有后续发布和计划中的特性。

致谢

我们要感谢以下为 Delta Lake 0.4.0 提供更新、文档改动和contributions的贡献者:Andreas Neumann, Burak Yavuz, Jose Torres, Jules Damji, Jungtaek Lim, Liwen Sun, Michael Armbrust, Mukul Murthy, Pranav Anand, Rahul Mahadev, Shixiong Zhu, Tathagata Das, Terry Kim, Wenchen Fan, Wesley Hoffman, Yishuang Lu, Yucai Yu, lys0716

原文地址:https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html


相关文章推荐阅读:

深入剖析 Delta Lake:详解事务日志

【译】数据湖正在成为新的数据仓库

从数砖开源 Delta Lake 说起

相关直播链接:

云上大数据的一种高性能数据湖存储方案
钉钉群直播【Delta Lake:一种新型的数据湖方案】


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
二维码.JPG

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
Java API 数据库
构建RESTful API已经成为现代Web开发的标准做法之一。Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐。
【10月更文挑战第11天】本文介绍如何使用Spring Boot构建在线图书管理系统的RESTful API。通过创建Spring Boot项目,定义`Book`实体类、`BookRepository`接口和`BookService`服务类,最后实现`BookController`控制器来处理HTTP请求,展示了从基础环境搭建到API测试的完整过程。
42 4
|
11天前
|
JSON 安全 API
如何使用Python开发API接口?
在现代软件开发中,API(应用程序编程接口)用于不同软件组件之间的通信和数据交换,实现系统互操作性。Python因其简单易用和强大功能,成为开发API的热门选择。本文详细介绍了Python开发API的基础知识、优势、实现方式(如Flask和Django框架)、实战示例及注意事项,帮助读者掌握高效、安全的API开发技巧。
37 3
如何使用Python开发API接口?
|
3天前
|
JSON API 数据格式
如何使用Python开发1688商品详情API接口?
本文介绍了如何使用Python开发1688商品详情API接口,获取商品的标题、价格、销量和评价等详细信息。主要内容包括注册1688开放平台账号、安装必要Python模块、了解API接口、生成签名、编写Python代码、解析返回数据以及错误处理和日志记录。通过这些步骤,开发者可以轻松地集成1688商品数据到自己的应用中。
18 1
|
11天前
|
前端开发 API 开发者
Python Web开发者必看!AJAX、Fetch API实战技巧,让前后端交互如丝般顺滑!
在Web开发中,前后端的高效交互是提升用户体验的关键。本文通过一个基于Flask框架的博客系统实战案例,详细介绍了如何使用AJAX和Fetch API实现不刷新页面查看评论的功能。从后端路由设置到前端请求处理,全面展示了这两种技术的应用技巧,帮助Python Web开发者提升项目质量和开发效率。
26 1
|
17天前
|
JavaScript 前端开发 API
Vue 3新特性详解:Composition API的威力
【10月更文挑战第25天】Vue 3 引入的 Composition API 是一组用于组织和复用组件逻辑的新 API。相比 Options API,它提供了更灵活的结构,便于逻辑复用和代码组织,特别适合复杂组件。本文将探讨 Composition API 的优势,并通过示例代码展示其基本用法,帮助开发者更好地理解和应用这一强大工具。
22 1
|
18天前
|
JSON API 数据格式
如何使用Python和Flask构建一个简单的RESTful API。Flask是一个轻量级的Web框架
本文介绍了如何使用Python和Flask构建一个简单的RESTful API。Flask是一个轻量级的Web框架,适合小型项目和微服务。文章从环境准备、创建基本Flask应用、定义资源和路由、请求和响应处理、错误处理等方面进行了详细说明,并提供了示例代码。通过这些步骤,读者可以快速上手构建自己的RESTful API。
25 2
|
7天前
|
安全 API 网络架构
Python中哪个框架最适合做API?
本文介绍了Python生态系统中几个流行的API框架,包括Flask、FastAPI、Django Rest Framework(DRF)、Falcon和Tornado。每个框架都有其独特的优势和适用场景。Flask轻量灵活,适合小型项目;FastAPI高性能且自动生成文档,适合需要高吞吐量的API;DRF功能强大,适合复杂应用;Falcon高性能低延迟,适合快速API开发;Tornado异步非阻塞,适合高并发场景。文章通过示例代码和优缺点分析,帮助开发者根据项目需求选择合适的框架。
24 0
|
30天前
|
JSON API 数据格式
使用Python和Flask构建简单的RESTful API
【10月更文挑战第12天】使用Python和Flask构建简单的RESTful API
42 1
|
1月前
|
缓存 JavaScript 前端开发
深入理解 Vue 3 的 Composition API 与新特性
本文详细探讨了 Vue 3 中的 Composition API,包括 setup 函数的使用、响应式数据管理(ref、reactive、toRefs 和 toRef)、侦听器(watch 和 watchEffect)以及计算属性(computed)。我们还介绍了自定义 Hooks 的创建与使用,分析了 Vue 2 与 Vue 3 在响应式系统上的重要区别,并概述了组件生命周期钩子、Fragments、Teleport 和 Suspense 等新特性。通过这些内容,读者将能更深入地理解 Vue 3 的设计理念及其在构建现代前端应用中的优势。
31 0
深入理解 Vue 3 的 Composition API 与新特性
|
1月前
|
数据采集 人工智能 自然语言处理
Python实时查询股票API的FinanceAgent框架构建股票(美股/A股/港股)AI Agent
金融领域Finance AI Agents方面的工作,发现很多行业需求和用户输入的 query都是和查询股价/行情/指数/财报汇总/金融理财建议相关。如果需要准确的 金融实时数据就不能只依赖LLM 来生成了。常规的方案包括 RAG (包括调用API )再把对应数据和prompt 一起拼接送给大模型来做文本生成。稳定的一些商业机构的金融数据API基本都是收费的,如果是以科研和demo性质有一些开放爬虫API可以使用。这里主要介绍一下 FinanceAgent,github地址 https://github.com/AI-Hub-Admin/FinanceAgent