深度解析Delta Transaction Log-阿里云开发者社区

开发者社区> 大数据> 正文
登录阅读全文

深度解析Delta Transaction Log

简介: 深度解析Delta Transaction Log

简要:

一、What is Delta

二、The Delta Log(Transaction Log)

三、Time Travel

四、Batch/Streaming Queries on Delta Tables

五、Demo

 

 

 

一、What is Delta

1)Data Lake场景

国内热度很高的数据湖场景:如过来系列流失或批量数据,期望可以流失处理的同时又达到批处理的办事效果,真正对应AI & Reporting,那么就可以用SparkStreaming进行流式数据的接收处理,打通整个在线流。

image.png

Lambda架构离线和在线分支两个作业,不同情况部署会存在某一个或多个时刻节点,尤其具体需要详细对齐两边数据的时候,就涉及在线离线数据的校验问题。 也许可以用spark解决这样的问题,周期性跑校验任务,把 validation这一过程补齐,周期性补数据。

image.png

解决了数据校验的问题,离线数据还需要继续看数据修复问题,一段时间之后,发现过去某天的数据有缺失或者错误,需要重新回跑数据,进行过去数据的校验。

对于在线查询,也许修复的那天数据此刻属于未确定状态,也不确定当前是不是需要让用户查修复状态时保留哪个版本,需要进行不停的校验。

image.png

修复以及数据更新的过程显而易见,假设已经确定这份数据有问题,那么线上查询以及对于用户,尤其是public或者AI场景的一些脏数据的清洗,包括在线即时查询的一些场景,需要停下来,等把完整数据更新之后再向用户开放。

理想中的Data Lake

四点要求:

1.  用连续处理的模式处理数据

2.  不再从批量和流式中做出选择

3.  增量处理新到达的数据

4.  低维护成本

image.png

Databricks在若干实践中,对理想模型的答案就是Data Lack,对应问题及解决方案:

第一,具备同时读写并且有数据一致性保证的能力,通过数据格式,解决reader和writer通过快照进行隔离,且Reader和Writer以及多Reader和多Rriter互相之间没有影响。

第二,具备高吞吐从大表读数据的能力,使用Spark来并发处理大表元数据。

第三,脏数据回滚,提供Time Travel,可以回馈到任何一个之前处理过的快照时间点,发现若干次或者说某几次的写入操作有脏数据,可以做成回退。

第四,具备在线处理历史数据的能力,在不停留的情况下,流式作业是面向未来的,实时处理当下的数据,处理历史数据的时候,原则上不应该影响到流式作业,流式作业可以继续跑,所以通过同一作业处理历史数据。

第五,不阻塞下游作业前提下处理迟到数据,迟到数据可以直接入库,当冲突发生的时候做出对应的仲裁。

image.png

二、The Delta Log(Transaction Log)

1Deltaon Disk

基于Delta介绍,最关键的是Transaction log的支持和设计。一个Delta表,存磁盘上有一个关键目录,_Delta_log,目录里存的是Delta Transaction Log

image.png

目录里有数字开头的Json文件,每个文件对一次Table Version,一系列操作存在当前版本,版本不断递增,所有Transaction Log可以与数据共存。

Delta Table当中最终数据文件存在 park里,如果从Delta搬去任何一个系统,也是兼容和支持的,只不过放了一个隐藏的添加,这是整个它在磁盘上的文件格式。image.png

2Table = result of a set of actions

Delta当中对table的设计理念是把table抽象成对数据的操作,一系列的action最终凑成了一张表,一个常见的表的Metadata包括 name、Schema及partitioning等信息,这些信息在改动过程中,会触发一个Metadata的action。

action就最常见的是add a fild,把表里新增数据以file形式加到表里,Remove File,不是简单delete的操作。

Delta中对Remove Fild的更多依赖是在于compassion,例如输写一段话,发现因为不同的写者对于 file的大小无法控制,如何处理让整个文件当中的大小文件均匀,在Delta中有一个optimized, SQL命令可以直接将若干文件进行compassion后把原始文件删掉。

Action对于streaming最为关键,当中有一个最小的原子的 batch,需要有Transaction的支持,当前 streaming的小batch写入失败,需要回退到之前,需要一开始写入的时候set Transaction中间有了问题会得到原来的点,这是对于streaming当中的一个最好的支持。

Delta这个项目从历史来讲,它诞生于spark的StructuredStreaming,想在流式场景下处理复杂的并发问题,以及处理复杂的并发写表场景下,需要有更好的一种格式支持,最后诞生了Delta。 因为国外更高的数据标准导致Delta变成一个独立项目,并且对它的需求越来越多。

最后一个Action是Change Protocol,对于一个table,假设将它定义是一个一系列操作的结果,table必然是要跨版本,或者跨多个Deltaversion进行一个长期兼容,尤其对于商业版场景,Change Protocol关键点就在于假如Deltafeature或者说Delta的解析方式有变动,里面必须要打入对应的版本信息,支持向前向后兼容。

 

3Implementing Atomicity

image.png

首先我们看一下原子性,例如Delta当中有两个版本,第一个version 0,Delta给原始的大table中想写入两个数据文件,需要把这两个文件delete,新增一个数据文件。

相当对表的每一次操作都会蜕变成若干个原子的action,修改表的动作结束之后,会打成一个commit,变成对应的Metadata file放在 Transaction Log里面,这次行为可能包括add 、remote、 update 等。

 

4Ensuring Serializability

 image.png

所有的table的version都是以递增方式排列,如何处理并发的问题,例如有两个对应的读者,表初始有若干个version,从version0到version1,假设user1从user0开始读,usr2是从version1开始读,这个时候usr2先完成,user2可以接着往下走,user1从version0也可能走到version2这个点,就存在一个竞争关系。

Delta当中对于下一个version的竞争关系的定义,用order的形式然后去判定由哪一个order进行持续下一个 versionID的权限,通过判断,需要check关键的设计细节。

如果把Structure Streaming这种写的方式,或者日常使用Deltatable、have table的形式做一个归类,可以发现大部分场景是由append对原有的table进行少量的更新操作。

基于这样一个用户场景,将Delta的设计理念变成一个乐观持有的冲突解决并发方式。

5Solving Conflicts Optimistically

image.png

如何通过乐观的方式解决对应冲突,拆分步骤来看,

首先,在每一次写之前,确认当前读取的start version是什么。

第二,是记录所有read / write行为。

第三,尝试去commit,确定是否有冲突。

第四,前两步start version判断,和所有竞争者确认,它们之间是否存在操作冲突,如果没有冲突,可以尝试直接commit, commit ID就是拿到最新的一个ID。

最后,Delta集成在spark中,当前Delta可能会集成在更多的流式或者批处理系统,可以依赖上层进行重试,处理异常。

 

6)Handling Massive Metadata

image.png

怎么样处理大量的Metadata,有一系列解决方案,

第一,整个设计理念是一个自循环,Delta集成在spark系统中,我们用spark自身来处理自身产生的Metadata。

海量数据前提下,表达本身有可能也就变成一种海量数据源。既然持有一个大数据处理框架,自己产生的大数据问题就靠自己来解决,所以spark天然就变成处理Deltatable中的处理引擎。

 

7)Checkpoints

image.png

用spark处理所有Metadata,做一个check point的机制, 过一个时间点就做一次check point,这一次check point可以从下一次故障时间恢复的一个最近的时间点,每一个check on是在Delta范围中,有了 check point之后,不需要在list以前所有的commit log,直接在当前的 version打一个对应的check poin,不用再去读所有以版本号开头的 Jason file。

为什么选Parquet格式:

第一,顺势而为, spark里面当前默认的数据格式是Parquet

第二,对后续的一系列的操作可能只需要提取或改变某些字段.

 

8)Computing Delta’s State

image.png

第一个场景,假设没有任何辅助check point情况下,比如有三个Jason File对应三个的Delta version的时候,算到最新的,会把当前的最新的版本用内存cash的情况放在这里,用户会从最新的开始读,那么它的复用率是比较高的,cash也是复用spark当中的cash。

image.png

然后假设有多个文件通常进行一次读表操作,就是从最初的start version,如果用户没有指定start version,就是从最初开始,读到最后的一个,由spark进行计算。

image.png

假设这时中间有某个checkpoint,可以跳过所有Jason file,从最近能找到的check file开始读,把check file解析出来,拿到增量的Jason  version。

image.png

三、Time Travel

1)Time Travelling by version

image.png

Delta中直接提供SQL语法version As of可以指定对应某一个version,指定后扫表操作相当于从 version开始,小于 version以前的所有操作可以被屏蔽。

2)Time Travelling by timestamp

image.png

可支持直接在table上面加at的语法,在 DataFrame API里面也有对应参数可支持,把version传进去,真正读表的操作,映射到Delta内部的实现方式,Deltalog会拿到snapshot,以对应的version作为结束。

对应 table的at形式, DataFrame的API,需要注意的是时间和version并不是一一对应。

 

举例:

从Delta的一个Metadata1071开始首先我们记录文件系统当中每一个commit file的时间,这是最常见的一个事件。

image.png

但时间有一个问题,72号commit和73号commit的先后顺序是调转的,时间乱序,在分布式系统当中这个时间记录到Meta当中会有这种问题,提前commit的文件,有可能在文件系统当中的记录时间反而小,处理这些问题,Delta在time traveling的时候会先列出一份文件,拿到对应的文件列表之后,按照最可信的 ID进行排序,一旦发现时钟道具的时候,会把第二个时钟加上一毫秒,来解决这样的问题,然后让整个的commit按照需求的logic时间排序list。

image.png

拿到对应的时间序之后,去找到对应的时间节点,从那个点开始,后续的Metadata开始录入、读取。

image.png

对于当前Time travel中get Timestamp的处理方式 ,刚才提供的1492年10月28号,刚好是在这两个时间点中间,拿这个包含时间点最近的往上copy的ID 1071,对应翻译到Data log里,从1071的快照开始录取。

image.png

有两点限制:

Ø  需在time travel时确认 transaction log存在。

Ø  需有对应的Delta file。

 

四、Batch / Streaming Queries on Delta Tables

关于Batch / Streaming Queries on Delta Tables中的模拟读取场景,首先Batch批量的spark读取表的模式,开始先list,在Delta中称为update the state,每次出发,从commit log里读取当前所有file以及当前表述所存在状态,与此同时进行系列的预先filter操作。

image.png

根据Metadata当前的记录状态,进行评比,例如正在读的时间点有一些写入到一半的文件,有可能触发 correct的问题,可以在读的时候,把文件预先提掉,最后执行query,这是最简单的批量power的执行方式。

image.png

复杂点的流失场景,首先计算系列表的状态,同时去partitionfilters,流失不同的情况在于不停的list里面的file进行流失处理模式,一开始读snapshot,从snapshot结束后,开始读取更新文件,snapshot的消费依据就是Spark Streaming里面对应有两个配置,第一个是max tesper trigger,另一个是maxBytesper trigger。

每次出发读多少文件,所有snapshot用这两个配置消费结束之后,尝试开始读新入文件,符合流失处理引擎的模式。

新写入的文件,开始不停的在表里面进行list,从commitID之后,逐渐一步的把新文件进行计算。

其中需要注意的两个点:

在 list snapshot时会读详细的Metadata,对于optimize的file有一个Metadata叫data change,生成新文件并没有带来任何新数据,那么data change是force, streaming作业也不会读,这就可以在流失情况下去优化这张表,把小文件进行一个合并。

流式作业的场景下,如何不影响到线上作业运行,可以设置 economic,在流水作业触发的时候,假设依赖文件有变动,可以不停留,最后就是vacuum操作,假设vacuum操作其实是Delta里面最严厉的一种删除操作,需要确认vacuum操作的影响。


image.png

做到 start Version和start Timestamp,这两个feature可以用在流式写入,不想从最初的 batch数据开始读,就可以使用 start Version或者是start Timestamp,它会综合刚才的原理,可以一定系列file change,也会对vacuum有一系列限制,有一个细节需要注意, start Version以及 start Timestamp本身的点是不包含在启动当中。

 五、Demo

image.png

大家如果对 Databricks产品以及Databricks品牌、原生spark系列的商业服务有兴趣,可以使用阿里云Databricks数据洞察 (DDI) 架构产品。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

其他文章