Delta Lake基础介绍(开源版)【Databricks 数据洞察公开课】

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 针对社区版本Delta Lake提供的几大核心特性进行讲解,并通过示例演示如何使用这些特性。

作者:
王晓龙(筱龙),阿里云开源大数据平台技术专家

一、Delta Lake背景回顾

1. 大数据平台架构演进

大数据平台经历了三种架构的演进:

1.png

a.第一代:数仓架构

  • 支持的场景有限,不适用于高阶复杂的查询分析场景,比如data science和ML场景;
  • scale-out扩展能力差。

b.第二代:数据湖+数仓架构

  • 可支持多场景应用;
  • 多轮ETL,增加了延迟和出错概率,缺乏数据可靠性;
  • 支持的workload依然有限;
  • 数据冗余带来的存储开销更大。

c.第三代:Lakehouse 架构

  • 支持所有结构的数据类型同时,也能针对各种分析场景提供支持;
  • 中间的元数据管理层尤为重要,它提供可靠的ACID事务,同时可以针对数据库操作提供性能优化。

2. Delta Lake - 运行在数据湖之上的可靠存储层

Delta Lake作为可靠的数据存储中间层,为构建Lakehouse提供了核心支撑。

2.png

3. Delta Lake核心特性

3.png

Delta Lake的核心特性是对ACID事务支持,并且基于事务日志机制,实现可串行化的隔离级别,提供ACID事务,保证读写数据的一致性。

Delta Lake 围绕 ACID 底层事务日志,提供了以下能力:

  • 时间回溯;
  • 可扩展元数据处理;
  • Upserts;
  • Schema约束及演化;
  • 缓存及索引优化;
  • 数据布局优化;
  • 批流一体。

二、详解事务日志及ACID事务实现机制

1. 示例:Delta Lake表操作

首先通过一个示例,来简单了解Delta Lake的基本语法。
使用PySpark创建 Delta Lake表,并执行表读写操作。

示例版本:
PySpark 3.2.1
Delta Lake 1.1.0

a.Delta Lake Starter - 启动 PySpark

  • 启动PySpark并加载 Delta相关依赖:
# Using Spark Packages
./bin/pyspark --packages io.delta:delta-core_2.12:1.1.0 
--conf "spark.databricks.delta.retentionDurationCheck.enabled=false" 
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

b.Delta Lake Starter - DML准备

创建表并执行若干 Update/Delete/Merge操作。
通过PySpark Datafram API创建一张Delta 表,表的名称是random_num,表中只包含一列数字:

>>> data = spark.range(0, 5)
>>> data.write.format("delta").save("/tmp/delta_course/delta_table") 
>>> spark.sql("CREATE TABLE random_num USING DELTA location  \"{}\"”.format('/tmp/delta_course/delta_table')

接下来往表中执行几条简单的修改操作语句:

>>> spark.sql("insert into random_num values(5)").show()
>>> spark.sql("update random_num set id = '10' where id = 1").show()
>>> spark.sql("delete from random_num where id = 3 ").show()

c.Delta Lake文件系统目录结构

Delta Lake 表的物理存储目录下, 既包括自身的表数据文件,也包括记录表Schema及表变化的 Delta Logs:

  • Delta数据文件:Parquet文件;
  • Delta事务日志 _delta_log:包含 Meta Data 以及事务操作历史;

4.png

2. Transaction Log概念

Transaction Log(事务日志,也称 Delta Log)是一种有序记录集,顺序记录了Delta Lake表从初始创建以来的所有事务操作。

5.png

3. Transaction Log设计目标

a.Transaction Log的整体设计目标,是实现单一信息源(Single Source of Truth),通过跟踪记录用户所有的表操作,从而为用户提供在任意时刻准确的数据视图。
b.同时,因为Delta Lake是基于Apache Spark构建的,依托Spark来管理维护事务日志,所以相比通过Metastore使用单一的数据库管理元数据,Delta Lake具备高可扩展的元数据处理能力,能够支持上百PB的Delta表读写访问。
c.除此之外,Delta Lake的事务日志也是其它重要数据管理特性实现的基础,比如数据版本回溯(Time Travel)等。

4. Transaction Log实现机制

a.Commit
在Delta Lake中,Transaction被称为Commit。每个Commit代表一个事务操作,同时也代表了一个数据版本,对应_delta_log目录下的一个json文件。

lQLPDhs9Cr4SDknNAUvNAcawiNEltTU8axsCNK8w_8BGAA_454_331.png

  • 示例:一条Update语句关联的Commit内容
>>> spark.sql("update random_num set id = '10' where id = 1").show()

7.png

  1. 上图中的Update语句关联的事务日志中,会包含诸如remove/add这样的动作,后面包含了文件的路径,从路径看都是delta 表的parquet数据文件。
  2. 事务日志的最后一行是关于commit的详细信息,包括了时间戳、操作名等元数据。
  • 在每个Commit里都包含若干更细粒度的动作(Action)。

Delta Lake 当前定义的 Action 动作包括:涉及数据文件增加和删除(Add file/Remove file)、元数据更新语义(Update metadata)、事务及协议相关的变更操作(Set transaction、Change protocol)等。

8.1.png

  • 通过Spark获取到表的最新状态

Delta Lake定义的Commit维护的是变更操作的过程记录,当针对Delta表执行查询语句时,可以通过Spark获取到表的最新状态。Spark会对事务日志做聚合,检查事务日志经历了哪些事务操作,并基于事务日志构建出可靠准确的Delta表状态。

8.png

  • 小文件问题

在变更操作较多的场景,比如CDC,delta log下会生成大量json小文件,对处理性能会造成较大影响。

9.png

b.Checkpoint

为了解决上文提到的小文件问题,Delta Log引入Checkpoint机制。

  • Checkpoints:保存了从 version 0开始到当前时刻所有变更记录(默认每 10 次 Commit创建一个Checkpoint文件)。
  • Checkpoint文件给 Spark 提供了一种捷径来重构表状态,避免低效地处理可能上千条的json格式的小文件。

10.png

示例:查看checkpoint文件内容

>>> chkpt0 = spark.read.parquet(
"/tmp/delta_course/delta_table/_delta_log/00000000000000000010.checkpoint.parquet")
>>> chkpt0.select("*").show()

11.png

图中包括从第一版本至今所有变更的历史

借助checkpoint,Spark可以快速构建表的状态。Spark通过执行ListFrom 操作,查看所有事务日志文件,快速跳转到最新的checkpoint文件,因此只需处理checkpoint之后的commits即可。

12.png

示例:ListForm的实现

在这个示例中,假设Spark里维护了版本7时刻下表的状态。在版本7之后,Delta表又有若干次的提交。当要查看表的最新状态时,Spark 首先通过ListFrom接口获取版本7之后的所有变更文件,发现版本号10关联的checkpoint文件是最新的checkpoint , Spark只需要基于版本10及随后的11和12两次commit构建表的状态,从而大大提升了元数据操作的性能。
因此,借助事物日志及spark,Delta Lake真正实现可扩展的元数据处理支持。

c.乐观并发控制

并发控制主要解决 ACID 中多个并发事务间的隔离性(Isolation)问题, 即:多个事务同一时间触发,系统应该如何决定事务之间的顺序。
在传统数据库领域,有两种典型的实现机制:乐观并发控制和悲观并发控制。

  • 乐观并发控制 vs 悲观并发控制
  1. 悲观并发控制(Pressimistic Concurrency Control,简称PCC),即用锁串行化执行事务;
  2. 乐观并发控制(Optimistic Concurrency Control,简称OCC),即在只有当冲突发生的时候才采取措施;
  3. 传统数据库的锁机制其实都是基于悲观并发控制的观点进行实现的;
  4. 对比悲观并发控制,乐观并发控制可以提供更好的性能;
  5. 由于大数据场景本身是典型的读多写少场景,因此更适合采用乐观并发控制方式。

Delta Lake的设计者们选择了乐观并发控制,并且在发生冲突时采用排他锁实现。

  • 排他锁

Delta Lake处理并发事务场景下的冲突问题时使用排他锁,包括以下五个步骤:

  1. Record the starting table version.
  2. Record reads/writes.
  3. Attempt a commit.
  4. If someone else wins, check whether anything you read has changed.
  5. Repeat.

示例:并发写入事务示例 - 演示OCC协议实现方式
13.png

  • 示例中,用户A和用户B都拿到版本号为0的commit,排他锁(mutual exclusion)决定了只能有一个用户能够创建版本号为1的commit,假设接受了User A的commit,就要拒绝User B。
  • Delta Lake为了更好的处理User B的commit,采取了乐观并发控制处理方式,基于操作语义,在版本1基础上完成User B的写入。

5. Delta Lake ACID事务实现

ACID事务具有四个特性:原子性、一致性、隔离性和持久性。
14.png

a.原子性 Atomicity

如上文介绍,Transaction Log将事务抽象成一个个Commit(文件),Commits里可以包含不同类型的Action,但是每个 Commit 是原子的。

Martin Kleppman在DDIA书中对原子性的定义:

“ACID atomicity describes what happens if a client wants to make several writes, but a fault occurs after some of the writes have been processed. If the writes are grouped together into an atomic transaction, and the transaction cannot be completed (committed) due to a fault, then the transaction is aborted and the database must discard or undo any writes it has made so far in that transaction. ”
—— Martin Kleppmann - Designing Data-Intensive Applications

b.隔离性 Isolation

隔离性是针对并发事务的处理方式,并发事务不应该相互干扰。在Delta Lake中,隔离性是通过OCC+排他锁方式实现的,并且实现了读写的串行化。

Martin Kleppman在DDIA书中对隔离性的定义:

“Isolation in the sense of ACID means that concurrently executing transactions are isolated from each other: they cannot step on each other’s toes.”

c.持久性 Durability

Transaction Log写入分布式磁盘中,在事务处理结束后,对数据的修改就是永久的,即便系统故障也不会丢失。

Martin Kleppman在DDIA书中对持久性的定义

“Durability is the promise that once a transaction has committed successfully, any data it has written will not be forgotten, even if there is a hardware fault or the database crashes.”

d.一致性 Consistency

从Martin Kleppman在DDIA书中对一致性的定义可以看出,原子性、隔离性和持久性是数据库的属性,应用程序可能依赖数据库的原子性和隔离属性来实现一致性,但这并不取决于数据库本身,但一致性是由应用来决定的。

Martin Kleppman在DDIA书中对一致性的定义

“Atomicity, isolation, and durability are properties of the database, whereas consistency (in the ACID sense) is a property of the application.
The application may rely on the database’s atomicity and isolation properties in order to achieve consistency, but it’s not up to the database alone.
Thus, the letter C doesn’t really belong in ACID”

三、Delta Lake核心特征总结

如上文介绍,Delta Lake基于事务日志,具有能够实现Time Travel/Upserts以及支持可扩展的元数据处理等特性。

除此之外,像Schema约束及演化等特性,在社区版本的Delta Lake里也都做了支持。在后面的公开课中还会针对基于Delta Lake构建批流一体Lake house架构做分享。

除了社区版本的Detla Lake, Databricks商业版提供了商业版的Spark及 Delta Lake引擎,并有一些专有的企业级性能优化特性。下期Delta Lake公开课我们会介绍商业版 Delta Lake 的特性,敬请关注。
15.png

参考资料

  1. Delta Lake Introduction: https://docs.delta.io/latest/delta-intro.html
  2. Diving Into Delta Lake: DML Internals (Update, Delete, Merge) :https://databricks.com/blog/2020/09/29/diving-into-delta-lake-dml-internals-update-delete-merge.html
  3. Diving Into Delta Lake: Unpacking The Transaction Log: https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html
  4. Delta Transaction Log Protocol: https://github.com/delta-io/delta/blob/master/PROTOCOL.md
  5. Delta Lake: The Definitive Guide by O’Reilly: https://databricks.com/p/ebook/delta-lake-the-definitive-guide-by-oreilly

产品技术咨询
https://survey.aliyun.com/apps/zhiliao/VArMPrZOR

加入技术交流群
DDI钉群.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
3月前
|
机器学习/深度学习 数据采集 分布式计算
【颠覆传统!】揭秘Databricks如何助力零售业需求预测——从数据到洞察,一秒钟变销售预言家!
【8月更文挑战第9天】随着大数据技术的发展,数据驱动决策日益关键,尤其在零售业中,通过分析历史销售数据预测未来趋势变得至关重要。本文探讨如何运用Databricks平台优化零售业需求预测。Databricks是一个基于Apache Spark的统一数据分析平台,能高效处理大规模数据任务。通过示例代码展示数据读取、预处理及建模过程,相较于传统方法,Databricks在数据处理能力、可扩展性、内置机器学习库以及协作版本控制方面展现出显著优势,帮助零售商优化库存管理、提升客户体验并增加销售额。
87 8
|
存储 分布式计算 数据挖掘
【数据湖仓架构】数据湖和仓库:Databricks 和 Snowflake
【数据湖仓架构】数据湖和仓库:Databricks 和 Snowflake
|
存储 SQL 分布式计算
【数据湖仓】数据湖和仓库:Databricks 和 Snowflake
【数据湖仓】数据湖和仓库:Databricks 和 Snowflake
|
消息中间件 存储 SQL
使用Databricks+Confluent进行实时数据采集入湖和分析【Databricks 数据洞察公开课】
本文介绍网约车模拟数据从产生,发布到流数据服务 Confluent,通过Databricks Structured Streaming进行实时数据处理,存储到LakeHouse,并使用spark和spark sql进行分析的应用实践。
693 0
使用Databricks+Confluent进行实时数据采集入湖和分析【Databricks 数据洞察公开课】
|
机器学习/深度学习 存储 数据采集
使用 Databricks 进行营销效果归因分析的应用实践【Databricks 数据洞察公开课】
本文介绍如何使用Databricks进行广告效果归因分析,完成一站式的部署机器学习,包括数据ETL、数据校验、模型训练/评测/应用等全流程。
747 0
使用 Databricks 进行营销效果归因分析的应用实践【Databricks 数据洞察公开课】
|
SQL 分布式计算 资源调度
使用Databricks进行零售业需求预测的应用实践【Databricks 数据洞察公开课】
本文从零售业需求预测痛点、商店商品模型预测的实践演示,介绍Databricks如何助力零售商进行需求、库存预测,实现成本把控和营收增长。
536 0
使用Databricks进行零售业需求预测的应用实践【Databricks 数据洞察公开课】
|
存储 SQL 人工智能
如何使用Delta Lake构建批流一体数据仓库【Databricks 数据洞察公开课】
Delta Lake是一个开源存储层,它为数据湖带来了可靠性。Delta Lake提供了ACID事务、可扩展的元数据处理,并统一了流式处理和批处理数据处理。Delta-Lake运行在现有数据湖之上,并且与Apache Spark API完全兼容。希望本篇能让大家更深入了解Delta Lake,最终可以实践到工作当中。
470 0
如何使用Delta Lake构建批流一体数据仓库【Databricks 数据洞察公开课】
|
SQL 存储 分布式计算
Delta Lake的演进历史及现状【Databricks 数据洞察公开课】
从大数据平台架构的演进、Delta Lake关键特性、版本迭代、重要功能等多方面,介绍Delta Lake的演进和优势。
1084 4
Delta Lake的演进历史及现状【Databricks 数据洞察公开课】
|
机器学习/深度学习 存储 弹性计算
使用Databricks+Mlflow进行机器学习模型的训练和部署【Databricks 数据洞察公开课】
介绍如何使用Databricks和MLflow搭建机器学习生命周期管理平台,实现从数据准备、模型训练、参数和性能指标追踪、以及模型部署的全流程。
1064 0
使用Databricks+Mlflow进行机器学习模型的训练和部署【Databricks 数据洞察公开课】
|
存储 数据采集 机器学习/深度学习
深度解析数据湖存储方案Lakehouse架构【Databricks 数据洞察公开课】
从数据仓库、数据湖的优劣势,湖仓一体架构的应用和优势等多方面深度解析Lakehouse架构。
1250 0
深度解析数据湖存储方案Lakehouse架构【Databricks 数据洞察公开课】