简要:
一、What is Delta
二、The Delta Log(Transaction Log)
三、Time Travel
四、Batch/Streaming Queries on Delta Tables
五、Demo
一、What is Delta
1)Data Lake场景
国内热度很高的数据湖场景:如过来系列流失或批量数据,期望可以流失处理的同时又达到批处理的办事效果,真正对应AI &Reporting,那么就可以用SparkStreaming进行流式数据的接收处理,打通整个在线流。
Lambda架构离线和在线分支两个作业,不同情况部署会存在某一个或多个时刻节点,尤其具体需要详细对齐两边数据的时候,就涉及在线离线数据的校验问题。 也许可以用spark解决这样的问题,周期性跑校验任务,把 validation这一过程补齐,周期性补数据。
解决了数据校验的问题,离线数据还需要继续看数据修复问题,一段时间之后,发现过去某天的数据有缺失或者错误,需要重新回跑数据,进行过去数据的校验。
对于在线查询,也许修复的那天数据此刻属于未确定状态,也不确定当前是不是需要让用户查修复状态时保留哪个版本,需要进行不停的校验。
修复以及数据更新的过程显而易见,假设已经确定这份数据有问题,那么线上查询以及对于用户,尤其是public或者AI场景的一些脏数据的清洗,包括在线即时查询的一些场景,需要停下来,等把完整数据更新之后再向用户开放。
理想中的Data Lake
四点要求:
1. 用连续处理的模式处理数据
2. 不再从批量和流式中做出选择
3. 增量处理新到达的数据
4. 低维护成本
Databricks在若干实践中,对理想模型的答案就是Data Lack,对应问题及解决方案:
第一,具备同时读写并且有数据一致性保证的能力,通过数据格式,解决reader和writer通过快照进行隔离,且Reader和Writer以及多Reader和多Rriter互相之间没有影响。
第二,具备高吞吐从大表读数据的能力,使用Spark来并发处理大表元数据。
第三,脏数据回滚,提供Time Travel,可以回馈到任何一个之前处理过的快照时间点,发现若干次或者说某几次的写入操作有脏数据,可以做成回退。
第四,具备在线处理历史数据的能力,在不停留的情况下,流式作业是面向未来的,实时处理当下的数据,处理历史数据的时候,原则上不应该影响到流式作业,流式作业可以继续跑,所以通过同一作业处理历史数据。
第五,不阻塞下游作业前提下处理迟到数据,迟到数据可以直接入库,当冲突发生的时候做出对应的仲裁。
二、The Delta Log(Transaction Log)
1)Deltaon Disk
基于Delta介绍,最关键的是Transaction log的支持和设计。一个Delta表,存磁盘上有一个关键目录,_Delta_log,目录里存的是Delta Transaction Log。
目录里有数字开头的Json文件,每个文件对一次Table Version,一系列操作存在当前版本,版本不断递增,所有Transaction Log可以与数据共存。
Delta Table当中最终数据文件存在 park里,如果从Delta搬去任何一个系统,也是兼容和支持的,只不过放了一个隐藏的添加,这是整个它在磁盘上的文件格式。
2)Table = result of a set of actions
Delta当中对table的设计理念是把table抽象成对数据的操作,一系列的action最终凑成了一张表,一个常见的表的Metadata包括name、Schema及partitioning等信息,这些信息在改动过程中,会触发一个Metadata的action。
action就最常见的是add a fild,把表里新增数据以file形式加到表里,Remove File,不是简单delete的操作。
Delta中对Remove Fild的更多依赖是在于compassion,例如输写一段话,发现因为不同的写者对于 file的大小无法控制,如何处理让整个文件当中的大小文件均匀,在Delta中有一个optimized, SQL命令可以直接将若干文件进行compassion后把原始文件删掉。
Action对于streaming最为关键,当中有一个最小的原子的 batch,需要有Transaction的支持,当前 streaming的小batch写入失败,需要回退到之前,需要一开始写入的时候set Transaction中间有了问题会得到原来的点,这是对于streaming当中的一个最好的支持。
Delta这个项目从历史来讲,它诞生于spark的StructuredStreaming,想在流式场景下处理复杂的并发问题,以及处理复杂的并发写表场景下,需要有更好的一种格式支持,最后诞生了Delta。因为国外更高的数据标准导致Delta变成一个独立项目,并且对它的需求越来越多。
最后一个Action是Change Protocol,对于一个table,假设将它定义是一个一系列操作的结果,table必然是要跨版本,或者跨多个Deltaversion进行一个长期兼容,尤其对于商业版场景,Change Protocol关键点就在于假如Deltafeature或者说Delta的解析方式有变动,里面必须要打入对应的版本信息,支持向前向后兼容。
3)Implementing Atomicity
首先我们看一下原子性,例如Delta当中有两个版本,第一个version 0,Delta给原始的大table中想写入两个数据文件,需要把这两个文件delete,新增一个数据文件。
相当对表的每一次操作都会蜕变成若干个原子的action,修改表的动作结束之后,会打成一个commit,变成对应的Metadata file放在 Transaction Log里面,这次行为可能包括add 、remote、 update 等。
4)Ensuring Serializability
所有的table的version都是以递增方式排列,如何处理并发的问题,例如有两个对应的读者,表初始有若干个version,从version0到version1,假设user1从user0开始读,usr2是从version1开始读,这个时候usr2先完成,user2可以接着往下走,user1从version0也可能走到version2这个点,就存在一个竞争关系。
Delta当中对于下一个version的竞争关系的定义,用order的形式然后去判定由哪一个order进行持续下一个 versionID的权限,通过判断,需要check关键的设计细节。
如果把Structure Streaming这种写的方式,或者日常使用Deltatable、have table的形式做一个归类,可以发现大部分场景是由append对原有的table进行少量的更新操作。
基于这样一个用户场景,将Delta的设计理念变成一个乐观持有的冲突解决并发方式。
5)Solving Conflicts Optimistically
如何通过乐观的方式解决对应冲突,拆分步骤来看,
首先,在每一次写之前,确认当前读取的start version是什么。
第二,是记录所有read / write行为。
第三,尝试去commit,确定是否有冲突。
第四,前两步start version判断,和所有竞争者确认,它们之间是否存在操作冲突,如果没有冲突,可以尝试直接commit, commit ID就是拿到最新的一个ID。
最后,Delta集成在spark中,当前Delta可能会集成在更多的流式或者批处理系统,可以依赖上层进行重试,处理异常。
6)Handling Massive Metadata
怎么样处理大量的Metadata,有一系列解决方案,
第一,整个设计理念是一个自循环,Delta集成在spark系统中,我们用spark自身来处理自身产生的Metadata。
海量数据前提下,表达本身有可能也就变成一种海量数据源。既然持有一个大数据处理框架,自己产生的大数据问题就靠自己来解决,所以spark天然就变成处理Deltatable中的处理引擎。
7)Checkpoints
用spark处理所有Metadata,做一个check point的机制, 过一个时间点就做一次check point,这一次check point可以从下一次故障时间恢复的一个最近的时间点,每一个check on是在Delta范围中,有了 check point之后,不需要在list以前所有的commit log,直接在当前的 version打一个对应的check poin,不用再去读所有以版本号开头的 Jason file。
为什么选Parquet格式:
第一,顺势而为, spark里面当前默认的数据格式是Parquet
第二,对后续的一系列的操作可能只需要提取或改变某些字段.
8)Computing Delta’s State
第一个场景,假设没有任何辅助check point情况下,比如有三个Jason File对应三个的Delta version的时候,算到最新的,会把当前的最新的版本用内存cash的情况放在这里,用户会从最新的开始读,那么它的复用率是比较高的,cash也是复用spark当中的cash。
然后假设有多个文件通常进行一次读表操作,就是从最初的start version,如果用户没有指定start version,就是从最初开始,读到最后的一个,由spark进行计算。
假设这时中间有某个checkpoint,可以跳过所有Jason file,从最近能找到的check file开始读,把check file解析出来,拿到增量的Jason version。
三、Time Travel
1)Time Travelling by version
Delta中直接提供SQL语法version As of可以指定对应某一个version,指定后扫表操作相当于从 version开始,小于 version以前的所有操作可以被屏蔽。
2)Time Travelling by timestamp
可支持直接在table上面加at的语法,在 DataFrame API里面也有对应参数可支持,把version传进去,真正读表的操作,映射到Delta内部的实现方式,Deltalog会拿到snapshot,以对应的version作为结束。
对应 table的at形式, DataFrame的API,需要注意的是时间和version并不是一一对应。
举例:
从Delta的一个Metadata1071开始首先我们记录文件系统当中每一个commit file的时间,这是最常见的一个事件。
但时间有一个问题,72号commit和73号commit的先后顺序是调转的,时间乱序,在分布式系统当中这个时间记录到Meta当中会有这种问题,提前commit的文件,有可能在文件系统当中的记录时间反而小,处理这些问题,Delta在time traveling的时候会先列出一份文件,拿到对应的文件列表之后,按照最可信的 ID进行排序,一旦发现时钟道具的时候,会把第二个时钟加上一毫秒,来解决这样的问题,然后让整个的commit按照需求的logic时间排序list。
拿到对应的时间序之后,去找到对应的时间节点,从那个点开始,后续的Metadata开始录入、读取。
对于当前Time travel中get Timestamp的处理方式,刚才提供的1492年10月28号,刚好是在这两个时间点中间,拿这个包含时间点最近的往上copy的ID 1071,对应翻译到Data log里,从1071的快照开始录取。
有两点限制:
Ø 需在time travel时确认 transaction log存在。
Ø 需有对应的Delta file。
四、Batch / Streaming Queries on Delta Tables
关于Batch / Streaming Queries on Delta Tables中的模拟读取场景,首先Batch批量的spark读取表的模式,开始先list,在Delta中称为update the state,每次出发,从commit log里读取当前所有file以及当前表述所存在状态,与此同时进行系列的预先filter操作。
根据Metadata当前的记录状态,进行评比,例如正在读的时间点有一些写入到一半的文件,有可能触发 correct的问题,可以在读的时候,把文件预先提掉,最后执行query,这是最简单的批量power的执行方式。
复杂点的流失场景,首先计算系列表的状态,同时去partitionfilters,流失不同的情况在于不停的list里面的file进行流失处理模式,一开始读snapshot,从snapshot结束后,开始读取更新文件,snapshot的消费依据就是Spark Streaming里面对应有两个配置,第一个是max tesper trigger,另一个是maxBytesper trigger。
每次出发读多少文件,所有snapshot用这两个配置消费结束之后,尝试开始读新入文件,符合流失处理引擎的模式。
新写入的文件,开始不停的在表里面进行list,从commitID之后,逐渐一步的把新文件进行计算。
其中需要注意的两个点:
在 list snapshot时会读详细的Metadata,对于optimize的file有一个Metadata叫data change,生成新文件并没有带来任何新数据,那么data change是force, streaming作业也不会读,这就可以在流失情况下去优化这张表,把小文件进行一个合并。
流式作业的场景下,如何不影响到线上作业运行,可以设置 economic,在流水作业触发的时候,假设依赖文件有变动,可以不停留,最后就是vacuum操作,假设vacuum操作其实是Delta里面最严厉的一种删除操作,需要确认vacuum操作的影响。
做到 start Version和start Timestamp,这两个feature可以用在流式写入,不想从最初的 batch数据开始读,就可以使用 start Version或者是start Timestamp,它会综合刚才的原理,可以一定系列file change,也会对vacuum有一系列限制,有一个细节需要注意, start Version以及 start Timestamp本身的点是不包含在启动当中。
五、Demo
大家如果对 Databricks产品以及Databricks品牌、原生spark系列的商业服务有兴趣,可以使用阿里云Databricks数据洞察 (DDI) 架构产品。