一、Delta Lake的诞生
相信作为一个数据工程师,心中都有这么一个理想的工具:
- 可以持续不断地对各种各样的数据源进行增量处理;
- 批流合一;
- 处理速率高效,智能化生成报表;
- ······
想要实现上面的工具,一个最简单的办法就是先用一个Spark Streaming Job把各种各样的数据源写到一个表中,如下图,然后再根据业务需求选择是用流作业还是批作业去进行相应的查询工作。但是,这种方式会存在一些问题,比如因为是流式写入,会产生大量的小文件,对后续的性能产生很大的影响。
面对上面遇到的小文件问题,一个改进的方法如下图所示,是在上述方法中创建的表之后加一个批作业定时的将小文件合并起来,但是这个改进方法仍然有明显的缺点,那就是存在着小时级别的延迟,这种级别的延迟对于很多业务来讲是无法满足要求的。
为了解决上述延迟问题,Lambda架构畅行一时。其架构思路如下图所示,简单说就是分别用流和批的方式对数据源处理两次,然后将批和流的视角合起来提供给后续业务。Lambda架构虽然解决了上述的问题,但是也存在自身的缺点:
- 因为业务逻辑在要用批和流的方式处理两次,而批和流的处理方式不一致,可能会导致某些问题;
- 如果处理逻辑中加入了数据校验的工作,就需要在批和流上分别校验两次,一旦需要回滚等操作,数据修正也需要进行两次,费时费力;
- 如果涉及到Merge、Update等操作,也需要进行两次修改,使得整个事务变得复杂;
- ······
上面的几种方案都有自己的缺点,Lambda架构虽然看似有效但是架构过于复杂。那么,有没有一种方案可以将Lambda架构进行简化呢?其实,我们的目标很简单,就是让流作业处理我们的源数据,并且后续作业可以批流统一的处理,具体来说有:
- 保证数据的一致性;
- 保证每次是增量的读取;
- 能够做回滚;
- 能够访问历史记录;
- 能够在不影响下游作业的同时合并小文件。
结合以上几点目标,有了目前的解决方案:Delta Lake + Structured Streaming = The Delta Architecture。这套方案的优点很明显,首先是批流合一的,其次Delta Lake可以很方便的做时间旅行类似的操作,且Delta Lake是单纯的储存层,与计算层分离,符合当前云数据计算的大方向,方便用户灵活的进行扩容。
二、Delta Lake的工作原理
Delta Lake的核心是其事务日志,它的表跟普通的表没有大的区别,但是在表下会建立一个隐藏文件,其中的JSON存储了一些关于事务的记录,如下图所示:
因此,在Delta Lake中,读取一张表也会重放这张表的历史记录,比如表的重命名、修改Schema等等操作。
更细节地来说,在Delta Lake中的每个JSON文件都是一次commit,这个commit是原子性的,保存了事务相关的详细记录。另外,Delta Lake还可以保证多个用户同时commit而不会产生冲突,它用的是一种基于乐观锁处理的方式,其逻辑如下图所示。这种解决冲突的方案适用于写比较少,读取比较多的场景,大家在使用的时候要注意场景是否适用。
假设我们要处理一个非常大的表,有百万级别的文件,那么如何高效的处理元数据呢?Delta Lake的处理方案如下图所示,用Spark来读取事务日志,然后Delta Lake隔一段时间对commit做一次合并,之后可以从Checkpoint开始应用后续的commit。
总结起来,Delta Lake解决数据一致性、增量读取、历史回溯等问题的方案即为下图所示:
三、Demo
从以下链接大家可以看到详细的Demo展示,还有详细的社区版本(免费)Databricks的设置方法:https://github.com/delta-io/delta/tree/master/examples/tutorials/saiseu19 。
Demo中提供了Python API和Scala API的实现文件,大家可以根据自己的实际情况进行尝试。上面链接的Demo中展示的主要features有:
- Schema Enforcement:在做Pipeline的时候我们一定要保证数据质量,因此Schema Enforcement可以帮助我们做到这点。
- Schema Evolution:随着公司业务的发展,一开始的表结构可能不适用于当前的业务,Schema Evolution可以帮助我们进行表结构的演化。
- Delete from Delta Lake table:Delete操作可以控制表的无限制增长,并且通过事务日志来进行操作,实际上数据没有被删掉,只是在Log中进行了标记。
- Audit Delta Lake Table History:通过此功能可以看到对表的详细历史操作。
- Travel back in time:有了表的历史数据,我们便可以访问表在各个历史节点的数据。
- Vacuum old versions of Delta Lake tables:Delta Lake通过标记的方式来实现删除,随着时间的增长会占用大量储存空间,Vacuum操作将删除在一定时间内从表中删除的数据文件,实现物理删除,默认会保留七天内的数据。
- Upsert into Delta Lake table using Merge:在一个命令中同时做update和insert操作。
上述Features的具体代码可以在Github中查看。
四、Q&A
Q1:Delta Lake可以线上使用吗?支持实时增删查改吗?
A1:Delta 最新发布了0.7.0,支持Spark 3.0。Databricks已经有很多客户在使用Delta Lake,其他公司也有在用,比如eBay。实时增删查改如demo演示的那样都是支持的。
Q2:是否可以纯SQL实现?
A2:Delta Lake是一个数据储存层,如果是与Hive等引擎做整合,只支持基本的SELECT/INSERT,没有支持DELETE等SQL操作,只能用Delta Lake自己的Scala或Python API。如果使用的是Spark 3.0的话,像MERGE、DELTE等都支持SQL AQI,可以直接用SQL开发。但是某些管理操作比如VACCUM没有对应的SQL API,还是要用Delta Lake自己的Scala或Python API。
关键词:Databricks、Spark、Delta Lake、Schema Enforcement
获取更多 Spark+AI SUMMIT 精彩演讲视频回放,立刻点击前往:
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月4日第一场<<
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月5日第二场<<