作者:
王晓龙(筱龙),阿里云开源大数据平台技术专家
一、Delta Lake背景回顾
1. 大数据平台架构演进
大数据平台经历了三种架构的演进:
a.第一代:数仓架构
- 支持的场景有限,不适用于高阶复杂的查询分析场景,比如data science和ML场景;
- scale-out扩展能力差。
b.第二代:数据湖+数仓架构
- 可支持多场景应用;
- 多轮ETL,增加了延迟和出错概率,缺乏数据可靠性;
- 支持的workload依然有限;
- 数据冗余带来的存储开销更大。
c.第三代:Lakehouse 架构
- 支持所有结构的数据类型同时,也能针对各种分析场景提供支持;
- 中间的元数据管理层尤为重要,它提供可靠的ACID事务,同时可以针对数据库操作提供性能优化。
2. Delta Lake - 运行在数据湖之上的可靠存储层
Delta Lake作为可靠的数据存储中间层,为构建Lakehouse提供了核心支撑。
3. Delta Lake核心特性
Delta Lake的核心特性是对ACID事务支持,并且基于事务日志机制,实现可串行化的隔离级别,提供ACID事务,保证读写数据的一致性。
Delta Lake 围绕 ACID 底层事务日志,提供了以下能力:
- 时间回溯;
- 可扩展元数据处理;
- Upserts;
- Schema约束及演化;
- 缓存及索引优化;
- 数据布局优化;
- 批流一体。
二、详解事务日志及ACID事务实现机制
1. 示例:Delta Lake表操作
首先通过一个示例,来简单了解Delta Lake的基本语法。
使用PySpark创建 Delta Lake表,并执行表读写操作。
示例版本:
PySpark 3.2.1
Delta Lake 1.1.0
a.Delta Lake Starter - 启动 PySpark
- 启动PySpark并加载 Delta相关依赖:
# Using Spark Packages
./bin/pyspark --packages io.delta:delta-core_2.12:1.1.0
--conf "spark.databricks.delta.retentionDurationCheck.enabled=false"
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
AI 代码解读
b.Delta Lake Starter - DML准备
创建表并执行若干 Update/Delete/Merge操作。
通过PySpark Datafram API创建一张Delta 表,表的名称是random_num,表中只包含一列数字:
>>> data = spark.range(0, 5)
>>> data.write.format("delta").save("/tmp/delta_course/delta_table")
>>> spark.sql("CREATE TABLE random_num USING DELTA location \"{}\"”.format('/tmp/delta_course/delta_table')
AI 代码解读
接下来往表中执行几条简单的修改操作语句:
>>> spark.sql("insert into random_num values(5)").show()
>>> spark.sql("update random_num set id = '10' where id = 1").show()
>>> spark.sql("delete from random_num where id = 3 ").show()
AI 代码解读
c.Delta Lake文件系统目录结构
Delta Lake 表的物理存储目录下, 既包括自身的表数据文件,也包括记录表Schema及表变化的 Delta Logs:
- Delta数据文件:Parquet文件;
- Delta事务日志 _delta_log:包含 Meta Data 以及事务操作历史;
2. Transaction Log概念
Transaction Log(事务日志,也称 Delta Log)是一种有序记录集,顺序记录了Delta Lake表从初始创建以来的所有事务操作。
3. Transaction Log设计目标
a.Transaction Log的整体设计目标,是实现单一信息源(Single Source of Truth),通过跟踪记录用户所有的表操作,从而为用户提供在任意时刻准确的数据视图。
b.同时,因为Delta Lake是基于Apache Spark构建的,依托Spark来管理维护事务日志,所以相比通过Metastore使用单一的数据库管理元数据,Delta Lake具备高可扩展的元数据处理能力,能够支持上百PB的Delta表读写访问。
c.除此之外,Delta Lake的事务日志也是其它重要数据管理特性实现的基础,比如数据版本回溯(Time Travel)等。
4. Transaction Log实现机制
a.Commit
在Delta Lake中,Transaction被称为Commit。每个Commit代表一个事务操作,同时也代表了一个数据版本,对应_delta_log目录下的一个json文件。
- 示例:一条Update语句关联的Commit内容
>>> spark.sql("update random_num set id = '10' where id = 1").show()
AI 代码解读
- 上图中的Update语句关联的事务日志中,会包含诸如remove/add这样的动作,后面包含了文件的路径,从路径看都是delta 表的parquet数据文件。
- 事务日志的最后一行是关于commit的详细信息,包括了时间戳、操作名等元数据。
- 在每个Commit里都包含若干更细粒度的动作(Action)。
Delta Lake 当前定义的 Action 动作包括:涉及数据文件增加和删除(Add file/Remove file)、元数据更新语义(Update metadata)、事务及协议相关的变更操作(Set transaction、Change protocol)等。
- 通过Spark获取到表的最新状态
Delta Lake定义的Commit维护的是变更操作的过程记录,当针对Delta表执行查询语句时,可以通过Spark获取到表的最新状态。Spark会对事务日志做聚合,检查事务日志经历了哪些事务操作,并基于事务日志构建出可靠准确的Delta表状态。
- 小文件问题
在变更操作较多的场景,比如CDC,delta log下会生成大量json小文件,对处理性能会造成较大影响。
b.Checkpoint
为了解决上文提到的小文件问题,Delta Log引入Checkpoint机制。
- Checkpoints:保存了从 version 0开始到当前时刻所有变更记录(默认每 10 次 Commit创建一个Checkpoint文件)。
- Checkpoint文件给 Spark 提供了一种捷径来重构表状态,避免低效地处理可能上千条的json格式的小文件。
示例:查看checkpoint文件内容
>>> chkpt0 = spark.read.parquet(
"/tmp/delta_course/delta_table/_delta_log/00000000000000000010.checkpoint.parquet")
>>> chkpt0.select("*").show()
AI 代码解读
图中包括从第一版本至今所有变更的历史
借助checkpoint,Spark可以快速构建表的状态。Spark通过执行ListFrom 操作,查看所有事务日志文件,快速跳转到最新的checkpoint文件,因此只需处理checkpoint之后的commits即可。
示例:ListForm的实现
在这个示例中,假设Spark里维护了版本7时刻下表的状态。在版本7之后,Delta表又有若干次的提交。当要查看表的最新状态时,Spark 首先通过ListFrom接口获取版本7之后的所有变更文件,发现版本号10关联的checkpoint文件是最新的checkpoint , Spark只需要基于版本10及随后的11和12两次commit构建表的状态,从而大大提升了元数据操作的性能。
因此,借助事物日志及spark,Delta Lake真正实现可扩展的元数据处理支持。
c.乐观并发控制
并发控制主要解决 ACID 中多个并发事务间的隔离性(Isolation)问题, 即:多个事务同一时间触发,系统应该如何决定事务之间的顺序。
在传统数据库领域,有两种典型的实现机制:乐观并发控制和悲观并发控制。
- 乐观并发控制 vs 悲观并发控制
- 悲观并发控制(Pressimistic Concurrency Control,简称PCC),即用锁串行化执行事务;
- 乐观并发控制(Optimistic Concurrency Control,简称OCC),即在只有当冲突发生的时候才采取措施;
- 传统数据库的锁机制其实都是基于悲观并发控制的观点进行实现的;
- 对比悲观并发控制,乐观并发控制可以提供更好的性能;
- 由于大数据场景本身是典型的读多写少场景,因此更适合采用乐观并发控制方式。
Delta Lake的设计者们选择了乐观并发控制,并且在发生冲突时采用排他锁实现。
- 排他锁
Delta Lake处理并发事务场景下的冲突问题时使用排他锁,包括以下五个步骤:
- Record the starting table version.
- Record reads/writes.
- Attempt a commit.
- If someone else wins, check whether anything you read has changed.
- Repeat.
示例:并发写入事务示例 - 演示OCC协议实现方式
- 示例中,用户A和用户B都拿到版本号为0的commit,排他锁(mutual exclusion)决定了只能有一个用户能够创建版本号为1的commit,假设接受了User A的commit,就要拒绝User B。
- Delta Lake为了更好的处理User B的commit,采取了乐观并发控制处理方式,基于操作语义,在版本1基础上完成User B的写入。
5. Delta Lake ACID事务实现
ACID事务具有四个特性:原子性、一致性、隔离性和持久性。
a.原子性 Atomicity
如上文介绍,Transaction Log将事务抽象成一个个Commit(文件),Commits里可以包含不同类型的Action,但是每个 Commit 是原子的。
Martin Kleppman在DDIA书中对原子性的定义:
“ACID atomicity describes what happens if a client wants to make several writes, but a fault occurs after some of the writes have been processed. If the writes are grouped together into an atomic transaction, and the transaction cannot be completed (committed) due to a fault, then the transaction is aborted and the database must discard or undo any writes it has made so far in that transaction. ”
—— Martin Kleppmann - Designing Data-Intensive Applications
b.隔离性 Isolation
隔离性是针对并发事务的处理方式,并发事务不应该相互干扰。在Delta Lake中,隔离性是通过OCC+排他锁方式实现的,并且实现了读写的串行化。
Martin Kleppman在DDIA书中对隔离性的定义:
“Isolation in the sense of ACID means that concurrently executing transactions are isolated from each other: they cannot step on each other’s toes.”
c.持久性 Durability
Transaction Log写入分布式磁盘中,在事务处理结束后,对数据的修改就是永久的,即便系统故障也不会丢失。
Martin Kleppman在DDIA书中对持久性的定义
“Durability is the promise that once a transaction has committed successfully, any data it has written will not be forgotten, even if there is a hardware fault or the database crashes.”
d.一致性 Consistency
从Martin Kleppman在DDIA书中对一致性的定义可以看出,原子性、隔离性和持久性是数据库的属性,应用程序可能依赖数据库的原子性和隔离属性来实现一致性,但这并不取决于数据库本身,但一致性是由应用来决定的。
Martin Kleppman在DDIA书中对一致性的定义
“Atomicity, isolation, and durability are properties of the database, whereas consistency (in the ACID sense) is a property of the application.
The application may rely on the database’s atomicity and isolation properties in order to achieve consistency, but it’s not up to the database alone.
Thus, the letter C doesn’t really belong in ACID”
三、Delta Lake核心特征总结
如上文介绍,Delta Lake基于事务日志,具有能够实现Time Travel/Upserts以及支持可扩展的元数据处理等特性。
除此之外,像Schema约束及演化等特性,在社区版本的Delta Lake里也都做了支持。在后面的公开课中还会针对基于Delta Lake构建批流一体Lake house架构做分享。
除了社区版本的Detla Lake, Databricks商业版提供了商业版的Spark及 Delta Lake引擎,并有一些专有的企业级性能优化特性。下期Delta Lake公开课我们会介绍商业版 Delta Lake 的特性,敬请关注。
参考资料
- Delta Lake Introduction: https://docs.delta.io/latest/delta-intro.html
- Diving Into Delta Lake: DML Internals (Update, Delete, Merge) :https://databricks.com/blog/2020/09/29/diving-into-delta-lake-dml-internals-update-delete-merge.html
- Diving Into Delta Lake: Unpacking The Transaction Log: https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html
- Delta Transaction Log Protocol: https://github.com/delta-io/delta/blob/master/PROTOCOL.md
- Delta Lake: The Definitive Guide by O’Reilly: https://databricks.com/p/ebook/delta-lake-the-definitive-guide-by-oreilly
产品技术咨询
https://survey.aliyun.com/apps/zhiliao/VArMPrZOR
加入技术交流群