深入剖析 Delta Lake:Schema Enforcement & Evolution

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Schema 约束和 Schema 演变相互补益,合理地结合起来使用将能方便地管理好数据,避免脏数据侵染,保证数据的完整可靠。

编译:辰山,阿里巴巴计算平台事业部 EMR 高级开发工程师,目前从事大数据存储方面的开发和优化工作


在实践经验中,我们知道数据总是在不断演变和增长,我们对于这个世界的心智模型必须要适应新的数据,甚至要应对我们从前未知的知识维度。表的 schema 其实和这种心智模型并没什么不同,需要定义如何对新的信息进行分类和处理。

这就涉及到 schema 管理的问题,随着业务问题和需求的不断演进,数据结构也会不断发生变化。通过 Delta Lake,能够很容易包含数据变化所带来的新的维度,用户能够通过简单的语义来控制表的 schema。相关工具主要包括 Schema 约束(Schema Enforcement)和 Schema 演变(Schema Evolution),前者用以防止用户脏数据意外污染表,后者用以自动添加适当的新数据列。本文将详细剖析这两个工具。

理解表的 Schemas

Apache Spark 的每一个 DataFrame 都包含一个 schema,用来定义数据的形态,例如数据类型、列信息以及元数据。在 Delta Lake 中,表的 schema 通过 JSON 格式存储在事务日志中。

什么是 Schema 约束?

Schema 约束(Schema Enforcement),也可称作 Schema Validation,是 Delta Lake 中的一种保护机制,通过拒绝不符合表 schema 的写入请求来保证数据质量。类似于一个繁忙的餐厅前台只接受预定坐席的顾客,这个机制会检查插入表格的每一列是否符合期望的列(换句话说,就是检查每个列是否已经“预定坐席”),那些不在期望名单上的写入将被拒绝。

Schema 约束如何工作?

Delta Lake 对写入进行 schema 校验,也就是说所有表格的写入操作都会用表的 schema 做兼容性检查。如果 schema 不兼容,Delta Lake 将会撤销这次事务(没有任何数据写入),并且返回相应的异常信息告知用户。

Delta Lake 通过以下准则判断一次写入是否兼容,即对写入的 DataFrame 必须满足:

• 不能包含目标表 schema 中不存在的列。相反,如果写入的数据没有包含所有的列是被允许的,这些空缺的列将会被赋值为 null。

• 不能包含与目标表类型不同的列。如果目标表包含 String 类型的数据,但 DataFrame 中对应列的数据类型为 Integer,Schema 约束将会返回异常,防止该次写入生效。

• 不能包含只通过大小写区分的列名。这意味着不能在一张表中同时定义诸如“Foo”和“foo”的列。不同于 Spark 可以支持大小写敏感和不敏感(默认为大小写不敏感)两种不同的模式,Delta Lake 保留大小写,但在 schema 存储上大小写不敏感。Parquet 在存储和返回列信息上面是大小写敏感的,因此为了防止潜在的错误、数据污染和丢失的问题,Delta Lake 引入了这个限制。

以下代码展示了一次写入过程,当添加一次新计算的列到 Delta Lake 表中。

# Generate a DataFrame of loans that we'll append to our Delta Lake table
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Show original DataFrame's schema
original_loans.printSchema()
 
"""
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
"""
 
# Show new DataFrame's schema
loans.printSchema()
 
"""
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
"""
 
# Attempt to append new DataFrame (with new column) to existing table
loans.write.format("delta") \
           .mode("append") \
           .save(DELTALAKE_PATH)

""" Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

不同于自动添加新的列,Delta Lake 受到 schema 约束并阻止了这次写入生效。并且为了帮助定位是哪个列造成了不匹配,Spark 会在错误栈中打印出两者的 schema 作为对照。

Schema 约束有何作用?

由于 Schema 约束是一种严格的校验,因此可以用于已清洗、转化完成的数据,保证数据不受污染,可用于生产或者消费。典型的应用场景包括直接用于以下用途的表:

• 机器学习算法

• BI 仪表盘

• 数据分析和可视化工具

• 任何要求高度结构化、强类型、语义 schema 的生产系统

为了准备好最终的数据,很多用户使用简单的“多跳”架构来逐步往表中添加结构。更多相关内容可以参考 Productionizing Machine Learning With Delta Lake.

当然,Schema 约束可以用在整个工作流程的任意地方,不过需要注意的是,有可能因为诸如不经意对写入数据添加了某个列,导致写入流失败的情况。

防止数据稀释

看到这,你可能会问,到底需不需要大费周章做 Schema 约束?毕竟,有时候一个意料之外的 schema 不匹配问题反而会影响整个工作流,特别是当新手使用 Delta Lake。为什么不直接让 schema 接受改变,这样我们就能任意写入 DataFrame 了。

俗话说,防患于未然,有些时候,如果不对 schema 进行强制约束,数据类型兼容性的问题将会很容易出现,看上去同质的数据源可能包含了边缘情况、污染列、错误变换的映射以及其他可怕的情况都可能会一夜之间污染了原始的表。所以更好的做法应该从根本上阻止这样的情况发生,通过 Schema 约束就能够做到,将这类错误显式地返回进行恰当的处理,而不是让它潜伏在数据中,看似写入时非常顺利,但埋下了无法预知的隐患。

Schema 约束能够确保表 schema 不会发生改变,除非你确切地执行了更改操作。它能有效的防止“数据稀释”——当新的列频繁添加,原本简洁的表结构可能因为数据泛滥而失去原有的含义和用处。Schema 约束的设计初衷就是通过设定严格的要求来保证质量,确保表数据不受污染。

另一方面,假如经过再三确认之后,确定的确需要添加新的列,那解决方法也非常简单,也就是下文即将介绍的 Schema 演变!

什么是 Schema 演变

Schema 演变(Schema Evolution)允许用户能够方便地修改表的当前 schema,来适应不断变化的数据。最常见的用法就是在执行添加和覆盖操作时,自动地添加一个或多个列来适应 schema。

Schema 演变如何工作?

继续沿用上文的例子,对于之前由于 schema 不匹配导致请求被拒绝的情况,开发人员可以方便地使用 Schema 演变来添加新的列。Schema 演变的使用方式是在 .write 或 .writeStream 的 Spark 命令后面添加上 .option('mergeSchema', 'true')。

# Add the mergeSchema option
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)

可以执行以下 Spark SQL 语句来察看图表。

# Create a plot with the new column to confirm the write was successful
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

当然,也可以选择通过添加 spark.databricks.delta.schema.autoMerge = True 到 Spark 配置文件中使得该选项对整个 Spark session 生效。不过需要注意的是,这样使用的话, Schema 约束将不再会对 schema 不匹配问题进行报警提示。

通过指定 mergeSchema 选项,所有在输入 DataFrame 中存在但在目标表中不存在的列都将被作为该事务操作的一部分添加到 schema 末尾。也允许添加嵌套字段,这些字段将被添加到对应列的末尾。

数据科学家可以利用这个选项来添加新的列(例如一个新增的跟踪指标,或是这个月的销售数据)到已有的机器学习表中,而不必废弃现有依赖于旧的列信息的模型。

以下对表的添加和覆盖操作都是合法的 Schema 演变的操作:

• 添加新列(这是最常用的场景)

• 修改数据类型,Null->其他类型,或者向上类型转换 Byte->Short->Integer

其他改动都是非法的 Schema 演变操作,需要通过添加 .option("overwriteSchema", "true") 选项来覆盖 schema 以及数据。举个例子,表原本包含一个类型为 integer 的列“Foo”,而新的 schema 需要改成 string 类型,那么所有的 Parquet 数据文件都需要覆盖重写。包括以下步骤:

• 删除这个列

• 修改列的数据类型

• 修改列名,仅用大小写区分(例如“Foo”和“foo”)
最后,在 Spark 3.0 中,支持了显式 DDL(通过 ALTER TABLE 方式),允许用户能够对 schema 执行以下操作:

• 添加列

• 修改列注释

• 设置表的属性来定义表的行为,例如设置事务日志的保留时间

Schema 演变有何作用?

Schema 演变可以用来显式地修改表的 schema(而不是意外添加了并不想要的列)。这提供了一种简单的方式来迁移 schema,因为它能自动添加上正确的列名和数据类型,而不需要进行显式的定义。

总结

Schema 约束能够拒绝与表不兼容的任何的新的列或者 schema 的改动。通过设置严格的限制,数据工程师们可以完全信任他们的数据,从而能够作出更好的商业决策。

另一方面,schema 演变则对 schema 约束进行了补充,使得一些期望的 schema 变更能够自动地生效。毕竟,添加一个新的列本就不应该是一件困难的事情。

Schema 约束和 Schema 演变相互补益,合理地结合起来使用将能方便地管理好数据,避免脏数据侵染,保证数据的完整可靠。

原文链接:https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html


相关阅读推荐:
Delta Lake,让你从复杂的Lambda架构中解放出来
【译】Databricks使用Spark Streaming和Delta Lake对流式数据进行数据质量监控介绍
【译】Delta Lake 0.5.0介绍
Delta Lake - 数据湖的数据可靠性


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!image.png
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。image.png
Apache Spark技术交流社区公众号,微信扫一扫关注image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
6月前
|
存储 SQL Apache
Apache Hudi与Delta Lake对比
Apache Hudi与Delta Lake对比
100 0
|
存储 SQL JSON
Delta Lake、Hudi与Iceberg详解
Delta Lake、Hudi与Iceberg详解
1017 0
Delta Lake、Hudi与Iceberg详解
|
6月前
|
分布式计算 测试技术 Apache
Apache Hudi vs Delta Lake:透明TPC-DS Lakehouse性能基准
Apache Hudi vs Delta Lake:透明TPC-DS Lakehouse性能基准
117 4
|
流计算
Delta Lake中CDC的实现
Delta Lake中CDC的实现
153 0
|
SQL 存储 算法
《Optimization of Common Table Expressions in MPP Database Systems》论文导读
Optimization of Common Table Expressions in MPP Database Systems
《Optimization of Common Table Expressions in MPP Database Systems》论文导读
|
SQL 存储 分布式计算
Data Lake 三剑客——Delta、Hudi、Iceberg 对比分析
定性上讲,三者均为 Data Lake 的数据存储中间层,其数据管理的功能均是基于一系列的 meta 文件。meta 文件的角色类似于数据库的 catalog/wal,起到 schema 管理、事务管理和数据管理的功能。
15841 2
Data Lake 三剑客——Delta、Hudi、Iceberg 对比分析
|
存储 SQL 分布式计算
Delta Lake,让你从复杂的Lambda架构中解放出来
Linux 基金会的 Delta Lake(Delta.io)是一个给数据湖提供可靠性的开源存储层软件。在 QCon 全球软件开发大会(上海站)2019 的演讲中,Databricks 公司的 Engineering Manager 李潇带我们了解了 Delta Lake 在实际生产中的应用与实践以及未来项目规划,本文便整理自此次演讲。
Delta Lake,让你从复杂的Lambda架构中解放出来
|
NoSQL 分布式计算 Spark
Tablestore+Delta Lake(快速开始)
本文介绍如何在E-MapReduce中通过Tablestore Spark Streaming Source将TableStore中的数据实时导入到Delta Lake中。
Tablestore+Delta Lake(快速开始)
|
SQL 消息中间件 JSON
Delta Lake在Soul的应用实践
传统离线数仓模式下,日志入库前首要阶段便是ETL,我们面临如下问题:天级ETL任务耗时久,影响下游依赖的产出时间;凌晨占用资源庞大,任务高峰期抢占大量集群资源;ETL任务稳定性不佳且出错需凌晨解决、影响范围大。为了解决天级ETL逐渐尖锐的问题,所以这次我们选择了近来逐渐进入大家视野的数据湖架构,基于阿里云EMR的Delta Lake,我们进一步打造优化实时数仓结构,提升部分业务指标实时性,满足更多更实时的业务需求。
Delta Lake在Soul的应用实践
|
SQL JSON 分布式计算
不通过 Spark 获取 Delta Lake Snapshot
Delta Lake 进行数据删除或更新操作时实际上只是对被删除数据文件做了一个 remove 标记,在进行 vacuum 前并不会进行物理删除,因此一些例如在 web 上获取元数据或进行部分数据展示的操作如果直接从表路径下获取 parquet 文件信息,读到的可能是历史已经被标记删除的数据。
不通过 Spark 获取 Delta Lake Snapshot
下一篇
无影云桌面