深入理解 Apache Spark Delta Lake 的事务日志

本文涉及的产品
云原生内存数据库 Tair,内存型 2GB
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
云数据库 Redis 版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 深入理解 Apache Spark Delta Lake 的事务日志 事务日志是理解 Delta Lake 的关键,因为它是贯穿许多最重要功能的通用模块,包括 ACID 事务、可扩展的元数据处理、时间旅行(time travel)等。

深入理解 Apache Spark Delta Lake 的事务日志

事务日志是理解 Delta Lake 的关键,因为它是贯穿许多最重要功能的通用模块,包括 ACID 事务、可扩展的元数据处理、时间旅行(time travel)等。本文我们将探讨事务日志(Transaction Log)是什么,它在文件级别是如何工作的,以及它如何为多个并发读取和写入问题提供优雅的解决方案。

事务日志(Transaction Log)是什么

Delta Lake 事务日志(也称为 DeltaLog)是 Delta Lake 表上执行每次事务的有序记录。具体形式如下:

yangping.wyp@yangping.wyp:/tmp/delta-table/_delta_log|
⇒  ll
total 280
-rw-r--r--  1 yangping.wyp  wheel   1.5K  8 21 14:19 00000000000000000000.json
-rw-r--r--  1 yangping.wyp  wheel   1.1K  8 21 14:31 00000000000000000001.json
-rw-r--r--  1 yangping.wyp  wheel   791B  8 21 14:31 00000000000000000002.json
-rw-r--r--  1 yangping.wyp  wheel   3.9K  8 21 14:31 00000000000000000003.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:38 00000000000000000004.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:39 00000000000000000005.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000006.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000007.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000008.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000009.json
-rw-r--r--  1 yangping.wyp  wheel    15K  8 21 19:40 00000000000000000010.checkpoint.parquet
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000010.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000011.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000012.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000013.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000014.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000015.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000016.json

事务日志主要用途是什么?

单一事实来源

Delta Lake 构建于 Apache Spark™ 之上,允许多个写和读操作同时对给定表进行操作。为了始终向用户显示正确的数据视图,事务日志可作为单一事实来源(single source of truth) - 中央存储库,用于跟踪用户对表所做的所有更改。

当用户第一次读取 Delta Lake 表或在打开的表上运行一个新查询,该表自上次读取以来已被修改, Spark 会检查事务日志来查看已向表写入的新事务,然后使用这些新更改更新最终用户的表。这可确保用户表的版本始终与最新查询中的主记录同步,并且用户无法对表进行不同的,冲突的更改。

Delta Lake 上的原子性实现

原子性是 ACID 事务的四个属性之一,它可以保证在 Delta Lake 上执行的操作(如 INSERT 或 UPDATE )要么全部成功要么全部不成功。 如果没有此属性,硬件故障或软件错误很容易导致数据仅部分写入表中,从而导致数据混乱或损坏。

事务日志是 Delta Lake 能够提供原子性保证的机制。无论如何,如果它没有记录在事务日志中,它就不会发生。通过只记录完全执行的事务,并使用该记录作为唯一的真相来源,事务日志允许用户对其数据进行推理;并且即使数据在 PB 级别上,我们也可以对这些数据的准确性高枕无忧。

事务日志是如何工作的

将事务分解为原子提交

每当用户执行修改表的操作(例如插入、更新或删除)时,Delta Lake 将该操作分解为一系列由以下一个或多个操作组成的离散步骤:

  • Add file:添加一个数据文件;
  • Remove file:删除一个数据文件;
  • Update metadata:更新表的元数据(例如更改表的名称,模式或分区);
  • Set transaction:Structured Streaming 作业已经提交的具有给定 ID 的微批次记录;
  • Change protocol:通过将事务日志切换到最新的软件协议来启用新特性;
  • Commit info:包含有关提交的信息以及该操作是在何时何地进行的。

然后这些操作将按照有序的原子单位记录在事务日志中,称为提交。
例如,假设用户创建一个事务以向表中添加新列,并向其中添加更多数据。Delta Lake 会将该事务分解为多个部分,一旦事务完成,就将它们添加到事务日志中,如下所示:

  • Update metadata:更改模式以包含新列;
  • Add file:每个添加的新文件。

文件级别的事务日志

当用户创建 Delta Lake 表时,将在 _delta_log 子目录中自动创建该表的事务日志。 当他或她对该表进行更改时,这些更改将作为有序的原子提交记录在事务日志中。 每个提交都以 JSON 文件的形式写出,从 000000.json 开始。对表的其他更改按升序数字顺序生成后续 JSON 文件,所以下一次提交被写入到 000001.json 文件,下下次修改写入到 000002.json 文件,依此类推。

因此,如果我们通过从数据文件 1.parquet 和 2.parquet 向表中添加记录。该事务将自动添加到事务日志中,并以 000000.json 的形式保存到磁盘。然后,我们改变主意并决定删除这些文件并添加一个新文件(3.parquet)。 这些操作将记录为事务日志中的下一个提交,也就是 000001.json,如下所示。

尽管 1.parquet 和 2.parquet 不再是我们 Delta Lake 表的一部分,但它们的添加和删除仍记录在事务日志中,因为这些操作是在我们的表上执行的 - 尽管它们最终相互抵消了。 Delta Lake 仍然保留这样的原子提交,以确保在需要审计表或使用“时间旅行”来查看表在给定时间点的样子时,我们能够准确地做到这一点。

此外,即使我们从表中删除了基础数据文件,Spark 也不会立刻从磁盘中删除文件。用户可以使用 VACUUM 命令删除不再需要的文件。

使用检查点文件(Checkpoint Files)快速重新计算状态

一旦我们提交了10次事务日志,Delta Lake 就会在相同的 _delta_log 子目录中以 Parquet 格式保存一个检查点文件(如上面的 00000000000000000010.checkpoint.parquet 文件)。每 10 次提交 Delta Lake 会自动生成检查点文件,这个是通过参数 checkpointInterval 参数设置。

这些检查点文件在某个时间点保存表的整个状态 - 以原生的 Parquet 格式保存,Spark 可以快速轻松地读取。换句话说,它们为 Spark reader 提供了一种“快捷方式”来完全复制表的状态,从而允许 Spark 避免重新处理可能存在的数千个低效的小 JSON 文件。

为了提高速度,Spark可以运行一个 listFrom 操作来查看事务日志中的所有文件,快速跳转到最新的检查点文件,并且只处理自保存了最新的检查点文件以来提交的JSON。

为了演示这是如何工作的,假设我们已经创建了提交,并且事务日志已经记录到 000007.json。Spark 加快了提交的速度,并在内存中自动缓存了表的最新版本。与此同时,其他一些写入者(可能是您过于热心的队友)已经向表中写入了新数据,并事务日志已经记录到 0000012.json 了。

为了合并这些新事务并更新表的状态,Spark 将运行 listFrom 方法来查看版本7之后对表的新更改。

Spark可以直接跳到最近的检查点文件(上图中的 0000010.checkpoint.parquet 文件),而不需要处理所有中间 JSON 文件,因为这个检查点文件包含 commit #10 中表的整个状态。现在,Spark 只需执行 0000011.json 和 0000012.json 的增量处理即可获得表的当前状态。然后 Spark 将表的版本12的状态缓存到内存中。通过遵循此工作流程,Delta Lake 能够使用 Spark 以高效的方式始终更新表的状态。

处理多个并发的读取和写入

现在我们已经在高层次上了解了事务日志如何工作的,让我们来谈谈并发性。到目前为止,我们的示例主要涵盖了用户线性提交事务或至少没有冲突的情况。 但是当 Delta Lake 处理多个并发读写时会发生什么?
答案很简单,由于 Delta Lake 由 Apache Spark 提供支持,因此不仅可以让多个用户同时修改表 - 这是预期的。 为了处理这些情况,Delta Lake 采用了乐观的并发控制。

什么是乐观并发控制?

乐观并发控制是一种处理并发事务的方法,它假定不同用户对表所做的事务(更改)可以在不相互冲突的情况下完成。它的速度快得令人难以置信,因为当处理 PB 级的数据时,用户很可能同时处理数据的不同部分,从而允许他们同时完成不冲突的事务。

例如,假设你和我正在一起玩拼图游戏。只要我们都在做拼图的不同部分——比如你在角落里,我在边缘上——我们没有理由不能同时做更大拼图的那一部分,并且以两倍的速度完成拼图。只有当我们同时需要相同的部件时,才会产生冲突。这就是乐观并发控制。

相反,一些数据库系统使用悲观锁定的概念,这是假设最坏的情况——即使我们有10,000块拼图,在某个时候我们肯定需要相同的拼图——这导致了太多的冲突。为了解决这个问题,它的理由是,应该只允许一个人同时做拼图,并把其他人都锁在房间外面。这不是一个快速(或友好)解决难题的方法!

当然,即使使用乐观并发控制,有时用户也会尝试同时修改数据的相同部分。幸运的是,Delta Lake 有相应的协议处理它。

乐观地解决冲突

为了提供ACID事务,Delta Lake 有一个协议,用于确定提交应该如何排序(在数据库中称为 serializability),并确定在同时执行两个或多个提交时应该做什么。Delta Lake通过实现互斥(mutual exclusion)规则来处理这些情况,然后尝试乐观地解决任何冲突。该协议允许Delta Lake遵循ACID隔离原则,该原则确保多个并发写操作之后的表的结果状态与那些连续发生的写操作相同,并且是彼此隔离的。

一般来说,这个过程是这样进行的

  • 记录起始表的版本;
  • 记录读和写操作;
  • 尝试提交;
  • 如果有人已经提交了,检查一下你读到的内容是否有变化;
  • 重复上面的步骤。

为了了解这一切是如何实时进行的,让我们看一下下面的图表,看看 Delta Lake 在冲突突然出现时是如何管理冲突的。假设两个用户从同一个表中读取数据,然后每个用户都尝试向表中添加一些数据。

  • Delta Lake 记录在进行任何更改之前读取的表的起始表版本(版本0);
  • 用户1和2都试图同时向表添加一些数据。在这里,我们遇到了一个冲突,因为接下来只有一个提交可以被记录为 000001.json;
  • Delta Lake使用“互斥”概念处理这种冲突,这意味着只有一个用户能够成功提交 000001.json。用户1的提交被接受,而用户2的提交被拒绝;
  • Delta Lake 更倾向于乐观地处理这种冲突,而不是为用户2抛出错误。 它检查是否对表进行了任何新的提交,并悄悄地更新表以反映这些更改,然后在新更新的表上重试用户2的提交(不进行任何数据处理),最后成功提交 000002.json。

在绝大多数情况下,这种和解是悄无声息地、天衣无缝地、成功地进行的。但是,如果 Delta Lake 无法乐观地解决不可调和的问题(例如,如果用户1删除了用户2也删除的文件),那么惟一的选择就是抛出一个错误。
最后要注意的是,由于在 Delta Lake 表上进行的所有事务都直接存储到磁盘中,因此这个过程满足 ACID 持久性的特性,这意味着即使在系统发生故障时,它也会保持。

其他用户案例

时间旅行(Time Travel)

每个表都是事务日志中记录的所有提交的总和的结果—不多也不少。事务日志提供了一步一步的指导,详细描述了如何从表的原始状态转换到当前状态。

因此,我们可以通过从原始表开始重新创建表在任何时间点的状态,并且只处理在该点之前提交的数据。这种强大的功能被称为“时间旅行”,或数据版本控制,在任何情况下都是救星。

数据血统(Data Lineage)和调试

作为对 Delta Lake 表所做的每个更改的最终记录,事务日志为用户提供了可验证的数据血统,这对于治理、审计和合规性目的非常有用。它还可以用于跟踪一个意外更改或管道中的一个 bug 的起源,以追溯到导致该更改的确切操作。用户可以运行 DESCRIBE HISTORY 来查看所做更改的元数据。

总结

在本博客中,我们深入研究 Delta Lake 事务日志的工作原理。我们讨论了:

  • 事务日志是什么,它是如何构造的,以及提交如何作为文件存储在磁盘上;
  • 事务日志如何作为一个单一的事实来源,允许 Delta Lake 实现原子性原则;
  • Delta Lake 如何计算每个表的状态——包括它如何使用事务日志来跟踪最近的检查点,以及它如何解决“小文件”问题;
  • 通过使用 Apache Spark 的强大功能来大规模处理元数据;
  • 使用乐观并发控制允许多个并发读和写,即使在表发生更改时也是如此;
  • Delta Lake 如何使用互斥来确保正确地线性(serialized)提交,以及在发生冲突时如何默默地重试提交。

本文翻译自:https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html

写在最后

为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds

xxx

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
23天前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
31 1
|
27天前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
26天前
|
Ubuntu Linux 测试技术
在Linux中,已知 apache 服务的访问日志按天记录在服务器本地目录/app/logs 下,由于磁盘空间紧张现在要求只能保留最近7天的访问日志,请问如何解决?
在Linux中,已知 apache 服务的访问日志按天记录在服务器本地目录/app/logs 下,由于磁盘空间紧张现在要求只能保留最近7天的访问日志,请问如何解决?
|
24天前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
29天前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
34 0
|
1月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
143 0
|
2月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
110 1
Spark快速大数据分析PDF下载读书分享推荐
|
1月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
130 3
|
17天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
31 3
|
21天前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
38 3

推荐镜像

更多