【译】Delta Lake 0.4.0 新特性演示:使用 Python API 就地转换与处理 Delta Lake 表

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文以案例演示在最新的 Delta Lake 0.4.0 中,如何转换 Delta Lake 表,使用全新的 Python API 执行 upsert 与删除数据,用时间旅行 (time travel) 查询数据的旧版本,以及 vacuum 语句清理旧版本。

编译:陈强,花名无咎,阿里巴巴技术专家,目前专注于EMR产品的管控与数据治理的研发工作。


我们激动地宣布 Delta Lake 0.4.0 发布,本次发布包含操纵与管理 Delta Lake 表的 Python API。关键特性包括:

  • Python APIs for DML and utility operations (#89) - 现在,您可以使用 Python API 更新、删除、合并 Delta Lake 表,并对表执行实用操作(即:vacuum,history)。这个特性对于使用Python进行复杂工作非常有用,例如:渐变维度(SCD)操作,合并重复变化数据流式查询中执行upserts。查看文档了解更多详情。
  • Convert-to-Delta (#78) – 您可以就地转换 Parquet 表到 Delta Lake 表,而无需重写任何数据。这个特性对于转换非常大的 Parquet 表非常有用,如果重写为 Delta 表,将产生巨大开销。并且,这个过程是可逆的 —— 您可以转换 Parquet 为 Delta Lake 表,执行操作(例如删除和合并),然后轻松转换回 Parquet 表。查看文档了解更多详情。
  • SQL for utility operations – 现在,您可以使用 SQL 来执行实用操作 vacuum 和 history。关于如何配置 Spark 以执行这些 Delta 特定的 SQL 命令,查看文档以了解更多详情。

要了解更多信息,请查看 Delta Lake 0.4.0 发布说明,以及 Delta Lake Documentation > Table Deletes, Updates, and Merges

delta-lake-0.4.0-merge-python.gif

在本文中,我们将基于 Apache Spark™ 2.4.3,演示一个准时航班情况业务场景中,如何使用全新的 Delta Lake 0.4.0 Python API。我们将展示如何 upsert 与删除数据,用时间旅行 (time travel) 查询数据的旧版本,以及用 vacuum 清理旧版本。

如何上手Delta Lake

使用--option选项使用 Delta Lake 包。在本文的例子中,我们也会演示在 Spark 中执行文件的VACUUM操作,以及执行 Delta Lake SQL 命令。为了完成这个简短的演示,我们要做以下设置:

  • spark.databricks.delta.retentionDurationCheck.enabled=false 允许对默认保留时长(7天)之内的文件执行 VACUUM 。注意,只有 SQL 命令 VACUUM 才需要这个配置。
  • spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension 允许在 Apache Spark 中执行 Delta Lake SQL 命令;Python or Scala API 调用不需要这个配置。
# 使用 Spark Packages
./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

加载并保存 Delta Lake 数据

业务场景使用“航班准时延误情况数据集”,由美国交通运输局的 航班出发统计 生成。使用后者的案例有 2014 Flight Departure Performance via d3.js Crossfilter和 On-Time Flight Performance with GraphFrames for Apache Spark™。这份数据集可以从这个github地址下载到您本地。在 pyspark 中开始读取数据集。

# 路径变量
tripdelaysFilePath = "/root/data/departuredelays.csv"
pathToEventsTable = "/root/deltalake/departureDelays.delta"

# 读取航班延误数据
departureDelays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(tripdelaysFilePath)

接下来,让我们将出发延误数据集保存到 Delta Lake 表。通过将表存储到 Delta Lake,我们可以利用这些特性:ACID 事务,统一批流处理,以及时间旅行 (time travel) 。

# 将航班延误数据保存为 Delta Lake 格式
departureDelays \
.write \
.format("delta") \
.mode("overwrite") \
.save("departureDelays.delta")

注意,此做法与一般的保存Parquet数据类似,不过您现在要指定format("delta")而不是format("parquet")。如果您查看下层的文件系统,可以发现 Delta Lake 表 departureDelays 创建了四个文件。

/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet

注意:_delta_log 是包含 Delta Lake 事务日志的文件夹。您可以在这个文档 Diving Into Delta Lake: Unpacking The Transaction Log 查看更多信息。

现在,让我们重新加载数据,不过现在我们的 DataFrame 将由 Delta Lake 支持。

# 将航班延误数据加载为 Delta Lake 格式
delays_delta = spark \
.read \
.format("delta") \
.load("departureDelays.delta")

# 创建临时视图
delays_delta.createOrReplaceTempView("delays_delta")

# 西雅图与旧金山之间的航班有几次
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

delta-lake-0.4.0-initial.png

最后,让我们查明从西雅图到旧金山的航班次数。在本数据集中有1698次。

就地转换到 Delta Lake

如果您已有 Parquet 表,您可以将表就地转换为 Delta Lake 表,因此不需要重写表。您可以运行以下命令来转换:

from delta.tables import *

# 转换位于路径 /path/to/table 下的 Parquet 无分区表
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

# 转换位于路径 /path/to/table 下的 Parquet 分区表,分区列是名为 part 的 integer 列
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")

想了解如何更多信息,包括如何在 Scala 和 SQL 中转换,参考 Convert to Delta Lake

删除航班数据

从传统数据湖表中删除数据,您需要:

  1. select所有数据,排除您要删的那些行
  2. 基于上述查询创建一个新表
  3. 删除原表
  4. 新表重命名为原表,以满足下游依赖

在 Delta Lake 中,我们可以简单运行一个 DELETE 语句来完成删除,而不需要以上的步骤。为了演示,我们来删除所有提前和准时的航班(即 dalay < 0)。

from delta.tables import *
from pyspark.sql.functions import *

# 访问 Delta Lake 表
deltaTable = DeltaTable.forPath(spark, pathToEventsTable
)
# 删除所有准时和提前的航班
deltaTable.delete("delay < 0") 

# 西雅图出发到旧金山的航班有几次
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

delta-lake-0.4.0-after-delete.png

我们删除所有准时和提前的航班之后,在查询中可以看到,西雅图出发到旧金山的延误航班有837次。如果您查看文件系统,可以注意到,删除数据之后文件反而变多了。

/departureDelays.delta$ ls -l
_delta_log
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet

在传统的数据湖中,删除是以重写被删除数据之外的整张表来实现。在 Delta Lake 中,删除操作则是选择性地写入文件的新版本,其中包括被删除的数据,而原先的文件只被置为删除。这是因为 Delta Lake 使用了多版本并发控制 (MVCC) 来完成表的原子操作:例如,当一名用户正在删除数据,另一名用户可能在查询表的先前版本。这种多版本模型让我们可以进行时间旅行(time travel),查询到先前的版本。稍后我们将看到这个例子。

更新航班数据

在传统数据湖中更新表,您需要:

  1. select出所有数据,排除您要更新的那些行
  2. 修改需要更新/变化的行
  3. 将这两张表合并为一张新表
  4. 删除原表
  5. 新表重命名为原表,以满足下游依赖

在 Delta Lake 中,我们可以简单运行一个 UPDATE 语句来完成删除,而不需要以上的步骤。为了演示,我们来更新所有底特律出发到西雅图的航班。

# 所有底特律出发到西雅图的航班,出发地修改为西雅图
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } ) 

# 西雅图出发到旧金山的航班有几次
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

delta-lake-0.4.0-after-update.png

把底特律航班标记为西雅图航班之后,现在我们有986次从西雅图出发到旧金山的航班了。如果您在文件系统中列出 departureDelays 文件夹 ($../departureDelays/ls -l),会发现现在有11个文件(不同于删除文件后的8个,以及创建表之后的4个)。

合并航班数据

有个常见的场景,在数据湖中持续地给表追加数据。这常常导致重复数据(您不想再次插入表的行)—— 新行需要插入,有些行需要更新。在 Delta Lake 中,这些操作可以使用合并操作实现(类似 SQL 中的 MERGE 语句)。

我们以一个简单的数据集开始,您需要在其中使用下列查询来更新、插入或去重。

# 在此时间范围内,西雅图与旧金山之间的航班有哪些
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

查询结果如下表。注意,本文给数据打上了颜色,以清楚地区分哪些行是去重的(蓝色),更新的(黄色),以及插入的(绿色)。

delta-lake-0.4.0-merge-source-table.png

接下来,让我们编写以下代码片段,生成自己的 merge_table,包含将要插入、更新或去重的数据。

items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination']
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()

delta-lake-0.4.0-merge-merge-table.png

在以上的表 (merge_table) 中有三行,各有唯一的 date 值:

  1. 1010521: 用新的延误值更新这些行(黄色)来 update flights 表
  2. 1010710: 这是重复的行(蓝色)
  3. 1010822: 这是要插入的新行(绿色)

在 Delta Lake 中,如下列代码片段所示,可以使用一条简单的 merge 语句来完成。

# flights 合并 merge_table
deltaTable.alias("flights") \
    .merge(merge_table.alias("updates"),"flights.date = updates.date") \
    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
    .whenNotMatchedInsertAll() \
    .execute()

# 在此时间范围内,西雅图与旧金山之间的航班有哪些
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

去重、更新和插入新行三个操作,使用一条语句高效地完成了。

delta-lake-0.4.0-merge-after-merge.png

查看表历史

如上所述,在每一个事务(删除、更新)之后,文件系统中产生了更多文件。这是因为每个事务都有不同的Delta Lake 表版本,我们可以从下面的 DeltaTable.history() 方法中查看。

deltaTable.history().show()
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      2|2019-09-29 15:41:22|  null|    null|   UPDATE|[predicate -> (or...|null|    null|     null|          1|          null|        false|
|      1|2019-09-29 15:40:45|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|          null|        false|
|      0|2019-09-29 15:40:14|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|       null|          null|        false|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

注意,你也可以使用 SQL 来实现:

spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()

可以看到,三行展现了不同的表版本,对应各自的表操作(创建表、删除表、更新表)。(下面给出缩略版以便阅读)

delta-lake-0.4.0-abridged-table-history.png

用表历史回溯到过去

使用时间旅行(Time Travel),您可以以版本或者时间戳来查看 Delta Lake 表。可以参考 Delta Lake 文档 Read older versions of data using Time Travel 了解更多。为了查看历史数据,指定 versionTimestamp 选项;在以下代码片段中,我们指定 version 选项。

# 使用各自版本加载 DataFrame
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")

# 计算每个历史版本中,西雅图出发到旧金山的航班数
cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()

# 把值打印出来
print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))

## 输出
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986

不论是企业治理、风险管理和合规 (GRC),还是回滚错误,Delta Lake 都包含了元数据(例如记录了一个产生删除的操作),以及数据(例如实际被删的行)。不过,因为合规或者数据大小原因删除数据文件,应该怎么做呢?

使用 vacuum 清理旧的表版本

Delta Lake 的 vacuum 方法,可以默认删除7天以前的所有行(与文件),参考:Delta Lake Vacuum

如果您查看文件系统,可以看到表的11个文件。

/departureDelays.delta$ ls -l
_delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

为了删除当前快照之外的所有文件,您需要给 vaccum 方法指定一个很小的值(默认的保留时长是7天)。

 # 删除所有0小时以上的文件
deltaTable.vacuum(0)

注意:您可以使用SQL完成相同任务:

# 删除所有0小时以上的文件

spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)

一旦 vacuum 完成,您再查看文件系统,将看到文件变少,因为历史数据已被删除。

/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

注意,执行vacuum 之后,不能对早于预留时间的版本进行时间旅行。

下一步

为了尝试 Delta Lake,您可以在您的 Apache Spark 2.4.3 (或更高版本) 实例尝试以上代码片段。通过 Delta Lake,您的数据湖将更加可靠(不论是创建新的 Delta Lake 或从已有的数据湖迁移)。想学习更多,请查看 https://delta.io/ 并加入 SlackGoogle Group 上的 Delta Lake 社区。您可以在 github milestones 中跟踪所有后续发布和计划中的特性。

致谢

我们要感谢以下为 Delta Lake 0.4.0 提供更新、文档改动和contributions的贡献者:Andreas Neumann, Burak Yavuz, Jose Torres, Jules Damji, Jungtaek Lim, Liwen Sun, Michael Armbrust, Mukul Murthy, Pranav Anand, Rahul Mahadev, Shixiong Zhu, Tathagata Das, Terry Kim, Wenchen Fan, Wesley Hoffman, Yishuang Lu, Yucai Yu, lys0716

原文地址:https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html


相关文章推荐阅读:

深入剖析 Delta Lake:详解事务日志

【译】数据湖正在成为新的数据仓库

从数砖开源 Delta Lake 说起

相关直播链接:

云上大数据的一种高性能数据湖存储方案
钉钉群直播【Delta Lake:一种新型的数据湖方案】


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
二维码.JPG

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
9天前
|
数据采集 JSON API
如何利用Python爬虫淘宝商品详情高级版(item_get_pro)API接口及返回值解析说明
本文介绍了如何利用Python爬虫技术调用淘宝商品详情高级版API接口(item_get_pro),获取商品的详细信息,包括标题、价格、销量等。文章涵盖了环境准备、API权限申请、请求构建和返回值解析等内容,强调了数据获取的合规性和安全性。
|
1月前
|
API 开发者 Python
如何用Python调用孔夫子API?
要使用Python调用孔夫子旧书网API,需先在开发者平台注册并获取API密钥与调用密钥。示例代码展示了如何利用requests库发送请求,获取并解析搜索结果。使用时需替换密钥,并按API文档调整URL和参数。注意遵守API使用规则及法律法规。
如何用Python调用孔夫子API?
|
17天前
|
存储 API 数据库
使用Python开发获取商品销量详情API接口
本文介绍了使用Python开发获取商品销量详情的API接口方法,涵盖API接口概述、技术选型(Flask与FastAPI)、环境准备、API接口创建及调用淘宝开放平台API等内容。通过示例代码,详细说明了如何构建和调用API,以及开发过程中需要注意的事项,如数据库连接、API权限、错误处理、安全性和性能优化等。
68 5
|
23天前
|
API Python
【Azure Developer】分享一段Python代码调用Graph API创建用户的示例
分享一段Python代码调用Graph API创建用户的示例
45 11
|
24天前
|
JSON 安全 API
Python调用API接口的方法
Python调用API接口的方法
111 5
|
25天前
|
存储 Java 数据挖掘
Java 8 新特性之 Stream API:函数式编程风格的数据处理范式
Java 8 引入的 Stream API 提供了一种新的数据处理方式,支持函数式编程风格,能够高效、简洁地处理集合数据,实现过滤、映射、聚合等操作。
41 6
|
1月前
|
JSON 安全 API
如何使用Python开发API接口?
在现代软件开发中,API(应用程序编程接口)用于不同软件组件之间的通信和数据交换,实现系统互操作性。Python因其简单易用和强大功能,成为开发API的热门选择。本文详细介绍了Python开发API的基础知识、优势、实现方式(如Flask和Django框架)、实战示例及注意事项,帮助读者掌握高效、安全的API开发技巧。
194 3
如何使用Python开发API接口?
|
24天前
|
API Python
利用python淘宝/天猫获得淘宝app商品详情原数据 API
要使用Python获取淘宝/天猫商品详情原数据,需先注册开放平台账号并实名认证,创建应用获取API权限。随后,根据API文档构建请求URL和参数,使用requests库发送请求,处理返回的商品详情数据。注意遵守平台使用规则。
|
27天前
|
供应链 API 开发者
探索Python与1688商品详情API接口的协同效应
在数字化时代,1688作为中国领先的B2B平台,其商品详情API接口为市场分析、库存管理和销售策略提供了重要数据支持。本文介绍如何使用Python调用该API,包括前期准备、技术实现、数据解析及错误处理等内容,助力企业和开发者挖掘数据价值,提升商业智能水平。
|
1月前
|
JSON 前端开发 API
使用Python和Flask构建简易Web API
使用Python和Flask构建简易Web API