深入理解 Apache Spark Delta Lake 的事务日志

简介: 深入理解 Apache Spark Delta Lake 的事务日志 事务日志是理解 Delta Lake 的关键,因为它是贯穿许多最重要功能的通用模块,包括 ACID 事务、可扩展的元数据处理、时间旅行(time travel)等。

深入理解 Apache Spark Delta Lake 的事务日志

事务日志是理解 Delta Lake 的关键,因为它是贯穿许多最重要功能的通用模块,包括 ACID 事务、可扩展的元数据处理、时间旅行(time travel)等。本文我们将探讨事务日志(Transaction Log)是什么,它在文件级别是如何工作的,以及它如何为多个并发读取和写入问题提供优雅的解决方案。

事务日志(Transaction Log)是什么

Delta Lake 事务日志(也称为 DeltaLog)是 Delta Lake 表上执行每次事务的有序记录。具体形式如下:

yangping.wyp@yangping.wyp:/tmp/delta-table/_delta_log|
⇒  ll
total 280
-rw-r--r--  1 yangping.wyp  wheel   1.5K  8 21 14:19 00000000000000000000.json
-rw-r--r--  1 yangping.wyp  wheel   1.1K  8 21 14:31 00000000000000000001.json
-rw-r--r--  1 yangping.wyp  wheel   791B  8 21 14:31 00000000000000000002.json
-rw-r--r--  1 yangping.wyp  wheel   3.9K  8 21 14:31 00000000000000000003.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:38 00000000000000000004.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:39 00000000000000000005.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000006.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000007.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000008.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000009.json
-rw-r--r--  1 yangping.wyp  wheel    15K  8 21 19:40 00000000000000000010.checkpoint.parquet
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000010.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000011.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000012.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000013.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000014.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000015.json
-rw-r--r--  1 yangping.wyp  wheel   1.2K  8 21 19:40 00000000000000000016.json

事务日志主要用途是什么?

单一事实来源

Delta Lake 构建于 Apache Spark™ 之上,允许多个写和读操作同时对给定表进行操作。为了始终向用户显示正确的数据视图,事务日志可作为单一事实来源(single source of truth) - 中央存储库,用于跟踪用户对表所做的所有更改。

当用户第一次读取 Delta Lake 表或在打开的表上运行一个新查询,该表自上次读取以来已被修改, Spark 会检查事务日志来查看已向表写入的新事务,然后使用这些新更改更新最终用户的表。这可确保用户表的版本始终与最新查询中的主记录同步,并且用户无法对表进行不同的,冲突的更改。

Delta Lake 上的原子性实现

原子性是 ACID 事务的四个属性之一,它可以保证在 Delta Lake 上执行的操作(如 INSERT 或 UPDATE )要么全部成功要么全部不成功。 如果没有此属性,硬件故障或软件错误很容易导致数据仅部分写入表中,从而导致数据混乱或损坏。

事务日志是 Delta Lake 能够提供原子性保证的机制。无论如何,如果它没有记录在事务日志中,它就不会发生。通过只记录完全执行的事务,并使用该记录作为唯一的真相来源,事务日志允许用户对其数据进行推理;并且即使数据在 PB 级别上,我们也可以对这些数据的准确性高枕无忧。

事务日志是如何工作的

将事务分解为原子提交

每当用户执行修改表的操作(例如插入、更新或删除)时,Delta Lake 将该操作分解为一系列由以下一个或多个操作组成的离散步骤:

  • Add file:添加一个数据文件;
  • Remove file:删除一个数据文件;
  • Update metadata:更新表的元数据(例如更改表的名称,模式或分区);
  • Set transaction:Structured Streaming 作业已经提交的具有给定 ID 的微批次记录;
  • Change protocol:通过将事务日志切换到最新的软件协议来启用新特性;
  • Commit info:包含有关提交的信息以及该操作是在何时何地进行的。

然后这些操作将按照有序的原子单位记录在事务日志中,称为提交。
例如,假设用户创建一个事务以向表中添加新列,并向其中添加更多数据。Delta Lake 会将该事务分解为多个部分,一旦事务完成,就将它们添加到事务日志中,如下所示:

  • Update metadata:更改模式以包含新列;
  • Add file:每个添加的新文件。

文件级别的事务日志

当用户创建 Delta Lake 表时,将在 _delta_log 子目录中自动创建该表的事务日志。 当他或她对该表进行更改时,这些更改将作为有序的原子提交记录在事务日志中。 每个提交都以 JSON 文件的形式写出,从 000000.json 开始。对表的其他更改按升序数字顺序生成后续 JSON 文件,所以下一次提交被写入到 000001.json 文件,下下次修改写入到 000002.json 文件,依此类推。

因此,如果我们通过从数据文件 1.parquet 和 2.parquet 向表中添加记录。该事务将自动添加到事务日志中,并以 000000.json 的形式保存到磁盘。然后,我们改变主意并决定删除这些文件并添加一个新文件(3.parquet)。 这些操作将记录为事务日志中的下一个提交,也就是 000001.json,如下所示。

尽管 1.parquet 和 2.parquet 不再是我们 Delta Lake 表的一部分,但它们的添加和删除仍记录在事务日志中,因为这些操作是在我们的表上执行的 - 尽管它们最终相互抵消了。 Delta Lake 仍然保留这样的原子提交,以确保在需要审计表或使用“时间旅行”来查看表在给定时间点的样子时,我们能够准确地做到这一点。

此外,即使我们从表中删除了基础数据文件,Spark 也不会立刻从磁盘中删除文件。用户可以使用 VACUUM 命令删除不再需要的文件。

使用检查点文件(Checkpoint Files)快速重新计算状态

一旦我们提交了10次事务日志,Delta Lake 就会在相同的 _delta_log 子目录中以 Parquet 格式保存一个检查点文件(如上面的 00000000000000000010.checkpoint.parquet 文件)。每 10 次提交 Delta Lake 会自动生成检查点文件,这个是通过参数 checkpointInterval 参数设置。

这些检查点文件在某个时间点保存表的整个状态 - 以原生的 Parquet 格式保存,Spark 可以快速轻松地读取。换句话说,它们为 Spark reader 提供了一种“快捷方式”来完全复制表的状态,从而允许 Spark 避免重新处理可能存在的数千个低效的小 JSON 文件。

为了提高速度,Spark可以运行一个 listFrom 操作来查看事务日志中的所有文件,快速跳转到最新的检查点文件,并且只处理自保存了最新的检查点文件以来提交的JSON。

为了演示这是如何工作的,假设我们已经创建了提交,并且事务日志已经记录到 000007.json。Spark 加快了提交的速度,并在内存中自动缓存了表的最新版本。与此同时,其他一些写入者(可能是您过于热心的队友)已经向表中写入了新数据,并事务日志已经记录到 0000012.json 了。

为了合并这些新事务并更新表的状态,Spark 将运行 listFrom 方法来查看版本7之后对表的新更改。

Spark可以直接跳到最近的检查点文件(上图中的 0000010.checkpoint.parquet 文件),而不需要处理所有中间 JSON 文件,因为这个检查点文件包含 commit #10 中表的整个状态。现在,Spark 只需执行 0000011.json 和 0000012.json 的增量处理即可获得表的当前状态。然后 Spark 将表的版本12的状态缓存到内存中。通过遵循此工作流程,Delta Lake 能够使用 Spark 以高效的方式始终更新表的状态。

处理多个并发的读取和写入

现在我们已经在高层次上了解了事务日志如何工作的,让我们来谈谈并发性。到目前为止,我们的示例主要涵盖了用户线性提交事务或至少没有冲突的情况。 但是当 Delta Lake 处理多个并发读写时会发生什么?
答案很简单,由于 Delta Lake 由 Apache Spark 提供支持,因此不仅可以让多个用户同时修改表 - 这是预期的。 为了处理这些情况,Delta Lake 采用了乐观的并发控制。

什么是乐观并发控制?

乐观并发控制是一种处理并发事务的方法,它假定不同用户对表所做的事务(更改)可以在不相互冲突的情况下完成。它的速度快得令人难以置信,因为当处理 PB 级的数据时,用户很可能同时处理数据的不同部分,从而允许他们同时完成不冲突的事务。

例如,假设你和我正在一起玩拼图游戏。只要我们都在做拼图的不同部分——比如你在角落里,我在边缘上——我们没有理由不能同时做更大拼图的那一部分,并且以两倍的速度完成拼图。只有当我们同时需要相同的部件时,才会产生冲突。这就是乐观并发控制。

相反,一些数据库系统使用悲观锁定的概念,这是假设最坏的情况——即使我们有10,000块拼图,在某个时候我们肯定需要相同的拼图——这导致了太多的冲突。为了解决这个问题,它的理由是,应该只允许一个人同时做拼图,并把其他人都锁在房间外面。这不是一个快速(或友好)解决难题的方法!

当然,即使使用乐观并发控制,有时用户也会尝试同时修改数据的相同部分。幸运的是,Delta Lake 有相应的协议处理它。

乐观地解决冲突

为了提供ACID事务,Delta Lake 有一个协议,用于确定提交应该如何排序(在数据库中称为 serializability),并确定在同时执行两个或多个提交时应该做什么。Delta Lake通过实现互斥(mutual exclusion)规则来处理这些情况,然后尝试乐观地解决任何冲突。该协议允许Delta Lake遵循ACID隔离原则,该原则确保多个并发写操作之后的表的结果状态与那些连续发生的写操作相同,并且是彼此隔离的。

一般来说,这个过程是这样进行的

  • 记录起始表的版本;
  • 记录读和写操作;
  • 尝试提交;
  • 如果有人已经提交了,检查一下你读到的内容是否有变化;
  • 重复上面的步骤。

为了了解这一切是如何实时进行的,让我们看一下下面的图表,看看 Delta Lake 在冲突突然出现时是如何管理冲突的。假设两个用户从同一个表中读取数据,然后每个用户都尝试向表中添加一些数据。

  • Delta Lake 记录在进行任何更改之前读取的表的起始表版本(版本0);
  • 用户1和2都试图同时向表添加一些数据。在这里,我们遇到了一个冲突,因为接下来只有一个提交可以被记录为 000001.json;
  • Delta Lake使用“互斥”概念处理这种冲突,这意味着只有一个用户能够成功提交 000001.json。用户1的提交被接受,而用户2的提交被拒绝;
  • Delta Lake 更倾向于乐观地处理这种冲突,而不是为用户2抛出错误。 它检查是否对表进行了任何新的提交,并悄悄地更新表以反映这些更改,然后在新更新的表上重试用户2的提交(不进行任何数据处理),最后成功提交 000002.json。

在绝大多数情况下,这种和解是悄无声息地、天衣无缝地、成功地进行的。但是,如果 Delta Lake 无法乐观地解决不可调和的问题(例如,如果用户1删除了用户2也删除的文件),那么惟一的选择就是抛出一个错误。
最后要注意的是,由于在 Delta Lake 表上进行的所有事务都直接存储到磁盘中,因此这个过程满足 ACID 持久性的特性,这意味着即使在系统发生故障时,它也会保持。

其他用户案例

时间旅行(Time Travel)

每个表都是事务日志中记录的所有提交的总和的结果—不多也不少。事务日志提供了一步一步的指导,详细描述了如何从表的原始状态转换到当前状态。

因此,我们可以通过从原始表开始重新创建表在任何时间点的状态,并且只处理在该点之前提交的数据。这种强大的功能被称为“时间旅行”,或数据版本控制,在任何情况下都是救星。

数据血统(Data Lineage)和调试

作为对 Delta Lake 表所做的每个更改的最终记录,事务日志为用户提供了可验证的数据血统,这对于治理、审计和合规性目的非常有用。它还可以用于跟踪一个意外更改或管道中的一个 bug 的起源,以追溯到导致该更改的确切操作。用户可以运行 DESCRIBE HISTORY 来查看所做更改的元数据。

总结

在本博客中,我们深入研究 Delta Lake 事务日志的工作原理。我们讨论了:

  • 事务日志是什么,它是如何构造的,以及提交如何作为文件存储在磁盘上;
  • 事务日志如何作为一个单一的事实来源,允许 Delta Lake 实现原子性原则;
  • Delta Lake 如何计算每个表的状态——包括它如何使用事务日志来跟踪最近的检查点,以及它如何解决“小文件”问题;
  • 通过使用 Apache Spark 的强大功能来大规模处理元数据;
  • 使用乐观并发控制允许多个并发读和写,即使在表发生更改时也是如此;
  • Delta Lake 如何使用互斥来确保正确地线性(serialized)提交,以及在发生冲突时如何默默地重试提交。

本文翻译自:https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html

写在最后

为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds

xxx

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
目录
相关文章
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
518 1
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
617 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
673 6
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
442 0
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
797 0
|
测试技术 Apache 数据安全/隐私保护
使用 Apache JMeter 事务控制器的详细指南
Apache JMeter 的事务控制器用于组合多个请求成一个事务,以便衡量整体性能。创建测试计划涉及添加线程组、事务控制器和采样器,配置参数如线程数、Ramp-Up时间和循环次数。在事务控制器内,添加HTTP请求模拟用户登录和访问主页等操作。通过勾选选项,包括计时器和处理器时间。添加监听器如汇总报告和查看结果树来分析结果,从而评估系统性能瓶颈。事务控制器对于测试复杂业务流程的性能非常有用。
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
331 0
|
8月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1376 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
624 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式

推荐镜像

更多