编译:陈强,花名无咎,阿里巴巴技术专家,目前专注于EMR产品的管控与数据治理的研发工作。
我们激动地宣布 Delta Lake 0.4.0 发布,本次发布包含操纵与管理 Delta Lake 表的 Python API。关键特性包括:
- Python APIs for DML and utility operations (#89) - 现在,您可以使用 Python API 更新、删除、合并 Delta Lake 表,并对表执行实用操作(即:vacuum,history)。这个特性对于使用Python进行复杂工作非常有用,例如:渐变维度(SCD)操作,合并重复变化数据,流式查询中执行upserts。查看文档了解更多详情。
- Convert-to-Delta (#78) – 您可以就地转换 Parquet 表到 Delta Lake 表,而无需重写任何数据。这个特性对于转换非常大的 Parquet 表非常有用,如果重写为 Delta 表,将产生巨大开销。并且,这个过程是可逆的 —— 您可以转换 Parquet 为 Delta Lake 表,执行操作(例如删除和合并),然后轻松转换回 Parquet 表。查看文档了解更多详情。
- SQL for utility operations – 现在,您可以使用 SQL 来执行实用操作 vacuum 和 history。关于如何配置 Spark 以执行这些 Delta 特定的 SQL 命令,查看文档以了解更多详情。
要了解更多信息,请查看 Delta Lake 0.4.0 发布说明,以及 Delta Lake Documentation > Table Deletes, Updates, and Merges。
在本文中,我们将基于 Apache Spark™ 2.4.3,演示一个准时航班情况业务场景中,如何使用全新的 Delta Lake 0.4.0 Python API。我们将展示如何 upsert 与删除数据,用时间旅行 (time travel) 查询数据的旧版本,以及用 vacuum 清理旧版本。
如何上手Delta Lake
使用--option
选项使用 Delta Lake 包。在本文的例子中,我们也会演示在 Spark 中执行文件的VACUUM操作,以及执行 Delta Lake SQL 命令。为了完成这个简短的演示,我们要做以下设置:
-
spark.databricks.delta.retentionDurationCheck.enabled=false
允许对默认保留时长(7天)之内的文件执行 VACUUM 。注意,只有 SQL 命令 VACUUM 才需要这个配置。 -
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
允许在 Apache Spark 中执行 Delta Lake SQL 命令;Python or Scala API 调用不需要这个配置。
# 使用 Spark Packages
./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
加载并保存 Delta Lake 数据
业务场景使用“航班准时延误情况数据集”,由美国交通运输局的 航班出发统计 生成。使用后者的案例有 2014 Flight Departure Performance via d3.js Crossfilter 和 On-Time Flight Performance with GraphFrames for Apache Spark™。这份数据集可以从这个github地址下载到您本地。在 pyspark 中开始读取数据集。
# 路径变量
tripdelaysFilePath = "/root/data/departuredelays.csv"
pathToEventsTable = "/root/deltalake/departureDelays.delta"
# 读取航班延误数据
departureDelays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(tripdelaysFilePath)
接下来,让我们将出发延误数据集保存到 Delta Lake 表。通过将表存储到 Delta Lake,我们可以利用这些特性:ACID 事务,统一批流处理,以及时间旅行 (time travel) 。
# 将航班延误数据保存为 Delta Lake 格式
departureDelays \
.write \
.format("delta") \
.mode("overwrite") \
.save("departureDelays.delta")
注意,此做法与一般的保存Parquet数据类似,不过您现在要指定format("delta")而不是format("parquet")。如果您查看下层的文件系统,可以发现 Delta Lake 表 departureDelays 创建了四个文件。
/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
注意:_delta_log 是包含 Delta Lake 事务日志的文件夹。您可以在这个文档 Diving Into Delta Lake: Unpacking The Transaction Log 查看更多信息。
现在,让我们重新加载数据,不过现在我们的 DataFrame 将由 Delta Lake 支持。
# 将航班延误数据加载为 Delta Lake 格式
delays_delta = spark \
.read \
.format("delta") \
.load("departureDelays.delta")
# 创建临时视图
delays_delta.createOrReplaceTempView("delays_delta")
# 西雅图与旧金山之间的航班有几次
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
最后,让我们查明从西雅图到旧金山的航班次数。在本数据集中有1698次。
就地转换到 Delta Lake
如果您已有 Parquet 表,您可以将表就地转换为 Delta Lake 表,因此不需要重写表。您可以运行以下命令来转换:
from delta.tables import *
# 转换位于路径 /path/to/table 下的 Parquet 无分区表
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")
# 转换位于路径 /path/to/table 下的 Parquet 分区表,分区列是名为 part 的 integer 列
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")
想了解如何更多信息,包括如何在 Scala 和 SQL 中转换,参考 Convert to Delta Lake
删除航班数据
从传统数据湖表中删除数据,您需要:
- select所有数据,排除您要删的那些行
- 基于上述查询创建一个新表
- 删除原表
- 新表重命名为原表,以满足下游依赖
在 Delta Lake 中,我们可以简单运行一个 DELETE 语句来完成删除,而不需要以上的步骤。为了演示,我们来删除所有提前和准时的航班(即 dalay < 0)。
from delta.tables import *
from pyspark.sql.functions import *
# 访问 Delta Lake 表
deltaTable = DeltaTable.forPath(spark, pathToEventsTable
)
# 删除所有准时和提前的航班
deltaTable.delete("delay < 0")
# 西雅图出发到旧金山的航班有几次
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
我们删除所有准时和提前的航班之后,在查询中可以看到,西雅图出发到旧金山的延误航班有837次。如果您查看文件系统,可以注意到,删除数据之后文件反而变多了。
/departureDelays.delta$ ls -l
_delta_log
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet
在传统的数据湖中,删除是以重写被删除数据之外的整张表来实现。在 Delta Lake 中,删除操作则是选择性地写入文件的新版本,其中包括被删除的数据,而原先的文件只被置为删除。这是因为 Delta Lake 使用了多版本并发控制 (MVCC) 来完成表的原子操作:例如,当一名用户正在删除数据,另一名用户可能在查询表的先前版本。这种多版本模型让我们可以进行时间旅行(time travel),查询到先前的版本。稍后我们将看到这个例子。
更新航班数据
在传统数据湖中更新表,您需要:
- select出所有数据,排除您要更新的那些行
- 修改需要更新/变化的行
- 将这两张表合并为一张新表
- 删除原表
- 新表重命名为原表,以满足下游依赖
在 Delta Lake 中,我们可以简单运行一个 UPDATE 语句来完成删除,而不需要以上的步骤。为了演示,我们来更新所有底特律出发到西雅图的航班。
# 所有底特律出发到西雅图的航班,出发地修改为西雅图
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } )
# 西雅图出发到旧金山的航班有几次
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
把底特律航班标记为西雅图航班之后,现在我们有986次从西雅图出发到旧金山的航班了。如果您在文件系统中列出 departureDelays 文件夹 ($../departureDelays/ls -l),会发现现在有11个文件(不同于删除文件后的8个,以及创建表之后的4个)。
合并航班数据
有个常见的场景,在数据湖中持续地给表追加数据。这常常导致重复数据(您不想再次插入表的行)—— 新行需要插入,有些行需要更新。在 Delta Lake 中,这些操作可以使用合并操作实现(类似 SQL 中的 MERGE 语句)。
我们以一个简单的数据集开始,您需要在其中使用下列查询来更新、插入或去重。
# 在此时间范围内,西雅图与旧金山之间的航班有哪些
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()
查询结果如下表。注意,本文给数据打上了颜色,以清楚地区分哪些行是去重的(蓝色),更新的(黄色),以及插入的(绿色)。
接下来,让我们编写以下代码片段,生成自己的 merge_table,包含将要插入、更新或去重的数据。
items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination']
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()
在以上的表 (merge_table) 中有三行,各有唯一的 date 值:
- 1010521: 用新的延误值更新这些行(黄色)来 update flights 表
- 1010710: 这是重复的行(蓝色)
- 1010822: 这是要插入的新行(绿色)
在 Delta Lake 中,如下列代码片段所示,可以使用一条简单的 merge 语句来完成。
# flights 合并 merge_table
deltaTable.alias("flights") \
.merge(merge_table.alias("updates"),"flights.date = updates.date") \
.whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
.whenNotMatchedInsertAll() \
.execute()
# 在此时间范围内,西雅图与旧金山之间的航班有哪些
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()
去重、更新和插入新行三个操作,使用一条语句高效地完成了。
查看表历史
如上所述,在每一个事务(删除、更新)之后,文件系统中产生了更多文件。这是因为每个事务都有不同的Delta Lake 表版本,我们可以从下面的 DeltaTable.history() 方法中查看。
deltaTable.history().show()
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
| 2|2019-09-29 15:41:22| null| null| UPDATE|[predicate -> (or...|null| null| null| 1| null| false|
| 1|2019-09-29 15:40:45| null| null| DELETE|[predicate -> ["(...|null| null| null| 0| null| false|
| 0|2019-09-29 15:40:14| null| null| WRITE|[mode -> Overwrit...|null| null| null| null| null| false|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
注意,你也可以使用 SQL 来实现:
spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()
可以看到,三行展现了不同的表版本,对应各自的表操作(创建表、删除表、更新表)。(下面给出缩略版以便阅读)
用表历史回溯到过去
使用时间旅行(Time Travel),您可以以版本或者时间戳来查看 Delta Lake 表。可以参考 Delta Lake 文档 Read older versions of data using Time Travel 了解更多。为了查看历史数据,指定 version
或 Timestamp
选项;在以下代码片段中,我们指定 version
选项。
# 使用各自版本加载 DataFrame
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")
# 计算每个历史版本中,西雅图出发到旧金山的航班数
cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()
# 把值打印出来
print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))
## 输出
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986
不论是企业治理、风险管理和合规 (GRC),还是回滚错误,Delta Lake 都包含了元数据(例如记录了一个产生删除的操作),以及数据(例如实际被删的行)。不过,因为合规或者数据大小原因删除数据文件,应该怎么做呢?
使用 vacuum 清理旧的表版本
Delta Lake 的 vacuum 方法,可以默认删除7天以前的所有行(与文件),参考:Delta Lake Vacuum。
如果您查看文件系统,可以看到表的11个文件。
/departureDelays.delta$ ls -l
_delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
为了删除当前快照之外的所有文件,您需要给 vaccum 方法指定一个很小的值(默认的保留时长是7天)。
# 删除所有0小时以上的文件
deltaTable.vacuum(0)
注意:您可以使用SQL完成相同任务:
# 删除所有0小时以上的文件
spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)
一旦 vacuum 完成,您再查看文件系统,将看到文件变少,因为历史数据已被删除。
/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
注意,执行vacuum 之后,不能对早于预留时间的版本进行时间旅行。
下一步
为了尝试 Delta Lake,您可以在您的 Apache Spark 2.4.3 (或更高版本) 实例尝试以上代码片段。通过 Delta Lake,您的数据湖将更加可靠(不论是创建新的 Delta Lake 或从已有的数据湖迁移)。想学习更多,请查看 https://delta.io/ 并加入 Slack 和 Google Group 上的 Delta Lake 社区。您可以在 github milestones 中跟踪所有后续发布和计划中的特性。
致谢
我们要感谢以下为 Delta Lake 0.4.0 提供更新、文档改动和contributions的贡献者:Andreas Neumann, Burak Yavuz, Jose Torres, Jules Damji, Jungtaek Lim, Liwen Sun, Michael Armbrust, Mukul Murthy, Pranav Anand, Rahul Mahadev, Shixiong Zhu, Tathagata Das, Terry Kim, Wenchen Fan, Wesley Hoffman, Yishuang Lu, Yucai Yu, lys0716
相关文章推荐阅读:
相关直播链接:
云上大数据的一种高性能数据湖存储方案
钉钉群直播【Delta Lake:一种新型的数据湖方案】
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!