数据工程师眼中的 Delta lake(Delta by example)

简介: SPARK+AI SUMMIT 2020中文精华版线上峰会带领大家一起回顾2020年的SPARK又产生了怎样的最佳实践,技术上取得了哪些突破,以及周边的生态发展。本文中Databricks开源组技术主管范文臣从数据工程师的角度出发向大家介绍Delta Lake。以下是视频内容精华整理。

一、Delta Lake的诞生

相信作为一个数据工程师,心中都有这么一个理想的工具:

  • 可以持续不断地对各种各样的数据源进行增量处理;
  • 批流合一;
  • 处理速率高效,智能化生成报表;
  • ······

image.png

想要实现上面的工具,一个最简单的办法就是先用一个Spark Streaming Job把各种各样的数据源写到一个表中,如下图,然后再根据业务需求选择是用流作业还是批作业去进行相应的查询工作。但是,这种方式会存在一些问题,比如因为是流式写入,会产生大量的小文件,对后续的性能产生很大的影响。

image.png

面对上面遇到的小文件问题,一个改进的方法如下图所示,是在上述方法中创建的表之后加一个批作业定时的将小文件合并起来,但是这个改进方法仍然有明显的缺点,那就是存在着小时级别的延迟,这种级别的延迟对于很多业务来讲是无法满足要求的。

image.png

为了解决上述延迟问题,Lambda架构畅行一时。其架构思路如下图所示,简单说就是分别用流和批的方式对数据源处理两次,然后将批和流的视角合起来提供给后续业务。Lambda架构虽然解决了上述的问题,但是也存在自身的缺点:

  • 因为业务逻辑在要用批和流的方式处理两次,而批和流的处理方式不一致,可能会导致某些问题;
  • 如果处理逻辑中加入了数据校验的工作,就需要在批和流上分别校验两次,一旦需要回滚等操作,数据修正也需要进行两次,费时费力;
  • 如果涉及到Merge、Update等操作,也需要进行两次修改,使得整个事务变得复杂;
  • ······

image.png

上面的几种方案都有自己的缺点,Lambda架构虽然看似有效但是架构过于复杂。那么,有没有一种方案可以将Lambda架构进行简化呢?其实,我们的目标很简单,就是让流作业处理我们的源数据,并且后续作业可以批流统一的处理,具体来说有:

  • 保证数据的一致性;
  • 保证每次是增量的读取;
  • 能够做回滚;
  • 能够访问历史记录;
  • 能够在不影响下游作业的同时合并小文件。

结合以上几点目标,有了目前的解决方案:Delta Lake + Structured Streaming = The Delta Architecture。这套方案的优点很明显,首先是批流合一的,其次Delta Lake可以很方便的做时间旅行类似的操作,且Delta Lake是单纯的储存层,与计算层分离,符合当前云数据计算的大方向,方便用户灵活的进行扩容。

二、Delta Lake的工作原理

Delta Lake的核心是其事务日志,它的表跟普通的表没有大的区别,但是在表下会建立一个隐藏文件,其中的JSON存储了一些关于事务的记录,如下图所示:

image.png

因此,在Delta Lake中,读取一张表也会重放这张表的历史记录,比如表的重命名、修改Schema等等操作。

更细节地来说,在Delta Lake中的每个JSON文件都是一次commit,这个commit是原子性的,保存了事务相关的详细记录。另外,Delta Lake还可以保证多个用户同时commit而不会产生冲突,它用的是一种基于乐观锁处理的方式,其逻辑如下图所示。这种解决冲突的方案适用于写比较少,读取比较多的场景,大家在使用的时候要注意场景是否适用。

image.png

假设我们要处理一个非常大的表,有百万级别的文件,那么如何高效的处理元数据呢?Delta Lake的处理方案如下图所示,用Spark来读取事务日志,然后Delta Lake隔一段时间对commit做一次合并,之后可以从Checkpoint开始应用后续的commit。

image.png

总结起来,Delta Lake解决数据一致性、增量读取、历史回溯等问题的方案即为下图所示:

image.png

三、Demo
从以下链接大家可以看到详细的Demo展示,还有详细的社区版本(免费)Databricks的设置方法:https://github.com/delta-io/delta/tree/master/examples/tutorials/saiseu19

Demo中提供了Python API和Scala API的实现文件,大家可以根据自己的实际情况进行尝试。上面链接的Demo中展示的主要features有:

  • Schema Enforcement:在做Pipeline的时候我们一定要保证数据质量,因此Schema Enforcement可以帮助我们做到这点。
  • Schema Evolution:随着公司业务的发展,一开始的表结构可能不适用于当前的业务,Schema Evolution可以帮助我们进行表结构的演化。
  • Delete from Delta Lake table:Delete操作可以控制表的无限制增长,并且通过事务日志来进行操作,实际上数据没有被删掉,只是在Log中进行了标记。
  • Audit Delta Lake Table History:通过此功能可以看到对表的详细历史操作。
  • Travel back in time:有了表的历史数据,我们便可以访问表在各个历史节点的数据。
  • Vacuum old versions of Delta Lake tables:Delta Lake通过标记的方式来实现删除,随着时间的增长会占用大量储存空间,Vacuum操作将删除在一定时间内从表中删除的数据文件,实现物理删除,默认会保留七天内的数据。
  • Upsert into Delta Lake table using Merge:在一个命令中同时做update和insert操作。

上述Features的具体代码可以在Github中查看。

四、Q&A

Q1:Delta Lake可以线上使用吗?支持实时增删查改吗?
A1:Delta 最新发布了0.7.0,支持Spark 3.0。Databricks已经有很多客户在使用Delta Lake,其他公司也有在用,比如eBay。实时增删查改如demo演示的那样都是支持的。

Q2:是否可以纯SQL实现?
A2:Delta Lake是一个数据储存层,如果是与Hive等引擎做整合,只支持基本的SELECT/INSERT,没有支持DELETE等SQL操作,只能用Delta Lake自己的Scala或Python API。如果使用的是Spark 3.0的话,像MERGE、DELTE等都支持SQL AQI,可以直接用SQL开发。但是某些管理操作比如VACCUM没有对应的SQL API,还是要用Delta Lake自己的Scala或Python API。


关键词:Databricks、Spark、Delta Lake、Schema Enforcement

获取更多 Spark+AI SUMMIT 精彩演讲视频回放,立刻点击前往:
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月4日第一场<<
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月5日第二场<<

相关文章
|
XML JSON 自然语言处理
《ANTLR 4权威指南 》一导读
ANTLR是一款强大的语法分析器生成工具,可用于读取、处理、执行和翻译结构化的文本或二进制文件。它被广泛应用于学术领域和工业生产实践,是众多语言、工具和框架的基石。
11042 2
|
分布式计算 运维 监控
Fusion 引擎赋能:流利说如何用阿里云 Serverless Spark 实现数仓计算加速
本文介绍了流利说与阿里云合作,利用EMR Serverless Spark优化数据处理的全过程。流利说是科技驱动的教育公司,通过AI技术提升用户英语水平。原有架构存在资源管理、成本和性能等痛点,采用EMR Serverless Spark后,实现弹性资源管理、按需计费及性能优化。方案涵盖数据采集、存储、计算到查询的完整能力,支持多种接入方式与高效调度。迁移后任务耗时减少40%,失败率降低80%,成本下降30%。未来将深化合作,探索更多行业解决方案。
912 1
|
缓存 负载均衡 前端开发
前端必会的nginx知识点
【8月更文挑战第22天】前端必会的nginx知识点
364 0
|
SQL JSON 数据格式
ES中如何实现空值和非空值的查询
ES中如何实现空值和非空值的查询
5827 0
|
JSON 前端开发 Java
Jackson使用详解
Spring MVC 默认采用Jackson解析Json,尽管还有一些其它同样优秀的json解析工具,例如Fast Json、GSON,但是出于最小依赖的考虑,也许Json解析第一选择就应该是Jackson。
1694 0
Jackson使用详解
|
弹性计算 负载均衡 前端开发
高可用之弹性伸缩
【2月更文挑战第30天】弹性伸缩旨在实现服务容量按需线性扩展,依赖于敏捷基础设施和资源池共享。它包括弹性扩容、缩容和自愈三个层面。
|
设计模式 供应链 数据可视化
DDD - 事件风暴从理论到落地
DDD - 事件风暴从理论到落地
1184 1
|
机器学习/深度学习 缓存 分布式计算
我们来看一个简单的Python代码示例,它使用`joblib`模块来并行执行一个函数:
我们来看一个简单的Python代码示例,它使用`joblib`模块来并行执行一个函数: