深度解析Delta Transaction Log

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
云解析DNS,个人版 1个月
全局流量管理 GTM,标准版 1个月
简介: 深度解析Delta Transaction Log

简要:

一、What is Delta

二、The Delta Log(Transaction Log)

三、Time Travel

四、Batch/Streaming Queries on Delta Tables

五、Demo

 

 

 

一、What is Delta

1Data Lake场景

国内热度很高的数据场景:过来系列流失批量数据,期望可以流失处理的同时达到批处理办事效果,真正对应AI &Reporting,那么就可以用SparkStreaming进行流式数据的接收处理,打通整个在线流。

image.png

Lambda架构离线和在线分支两个作业,不同情况部署会存在某一个或多个时刻节点,尤其具体需要详细对齐两边数据的时候,就涉及在线离线数据的校验问题。 也许可以用spark解决这样的问题,周期性跑校验任务,把 validation这一过程补齐,周期性补数据。

image.png

解决了数据校验的问题,离线数据还需要继续看数据修复问题,一段时间之后,发现过去某天的数据有缺失或者错误,需要重新回跑数据,进行过去数据的校验。

对于在线查询,也许修复的那天数据此刻属于未确定状态,也不确定当前是不是需要让用户查修复状态时保留哪个版本,需要进行不停的校验

image.png

修复以及数据更新的过程显而易见,假设已经确定这份数据有问题,那么线上查询以及对于用户,尤其是public或者AI场景的一些脏数据的清洗,包括在线即时查询的一些场景,需要停下来,等把完整数据更新之后再向用户开放。

理想中的Data Lake

点要求

1.  用连续处理的模式处理数据

2.  不再从批量和流式中做出选择

3.  增量处理新到达的数据

4.  低维护成本

image.png

Databricks在若干实践中,对理想模型的答案就是Data Lack,对应问题及解决方案

第一,具备同时读写并且有数据一致性保证的能力,通过数据格式,解决reader和writer通过快照进行隔离,Reader和Writer以及多Reader和多Rriter互相之间没有影响。

第二,具备高吞吐从大表读数据的能力,使用Spark来并发处理大表元数据。

第三脏数据回滚,提供Time Travel,可以回馈到任何一个之前处理过的快照时间点,发现若干次或者说某几次的写入操作有脏数据,可以做成退

第四,具备在线处理历史数据的能力,在不停留的情况下,流式作业是面向未来的,实时处理当下的数据处理历史数据的时候,原则上不应该影响到流式作业,流式作业可以继续跑所以通过同一作业处理历史数据

第五,不阻塞下游作业前提下处理迟到数据迟到数据可以直接入库,当冲突发生的时候做出对应的仲裁。

image.png

二、The Delta Log(Transaction Log)

1Deltaon Disk

基于Delta介绍,最关键的Transaction log的支持设计。一个Delta表,存磁盘上有一关键目录,_Delta_log目录里存的Delta Transaction Log

image.png

目录里有数字开头的Json文件,每个文件对一次Table Version一系列操作存在当前版本,版本不断递增所有Transaction Log可以数据共存

Delta Table当中最终数据文件存在 park里,如果从Delta搬去任何一个系统,也是兼容和支持的,只不过放了一个隐藏的添加,这是整个在磁盘上的文件格式。image.png

2Table = result of a set of actions

Delta当中对table的设计理念是把table抽象成对数据的操作,一系列的action最终凑成了一张表一个常见的表Metadata包括name、Schema及partitioning信息,这些信息在改动过程中,会触发一个Metadata的action。

action就最常见的是add a fild,把表里新增数据以file形式加到表里,Remove File不是简单delete的操作。

Delta中对Remove Fild的更多依赖是在于compassion,例如输写一段话,发现因为不同的写者对于 file的大小无法控制,如何处理让整个文件当中的大小文件均匀,在Delta中有一个optimized, SQL命令可以直接将若干文件进行compassion后把原始文件删掉。

Action对于streaming最为关键,当中有一个最小的原子的 batch,需要有Transaction的支持,前 streaming的小batch写入失败,需要回退到之前,需要一开始写入的时候set Transaction中间有了问题会得到原来的点,这是对于streaming当中的一个最好的支持。

Delta这个项目从历史来讲,它诞生于spark的StructuredStreaming,想在流式场景下处理复杂的并发问题,以及处理复杂的并发写表场景下,需要有更好的一种格式支持,最后诞生了Delta因为国外更高的数据标准导致Delta变成一个独立项目,并且对它的需求越来越多。

最后一个Action是Change Protocol对于一个table,假设将它定义是一个一系列操作的结果,table必然是要跨版本,或者跨多个Deltaversion进行一个长期兼容,尤其对于商业版场景,Change Protocol关键点就在于假Deltafeature或者说Delta的解析方式有变动,里面必须要打入应的版本信息,支持向前向后兼容。

 

3Implementing Atomicity

image.png

首先我们看一下原子性,例如Delta有两个版本,第一个version 0,Delta给原始的大table中想写入两个数据文件,需要把这两个文件delete新增一个数据文件。

相当对表的每一次操作都会蜕变成若干个原子的action,修改表的动作结束之后,会打成一个commit,变成对应的Metadata file放在 Transaction Log里面,这次行为可能包括add remote、 update 等。

 

4Ensuring Serializability

 image.png

所有的table的version都是以递增方式排列,如何处理并发的问题,例如有两个对应的读者,表初始有若干个version,从version0到version1,假设user1从user0开始读,usr2是从version1开始读,这个时候usr2先完成,user2可以接着往下走,user1从version0也可能走到version2这个点,就存在一个竞争关系。

Delta当中对于下一个version的竞争关系的定义,用order的形式然后去判定由哪一个order进行持续下一个 versionID的权限,通过判断,需要check关键的设计细节。

如果把Structure Streaming这种写的方式,或者日常使用Deltatablehave table的形式做一个归类,可以发现大部分场景是由append对原有的table进行少量的更新操作。

基于这样一个用户场景,将Delta的设计理念变成一个乐观持有的冲突解决并发方式。

5Solving Conflicts Optimistically

image.png

如何通过乐观的方式解决对应冲突,拆分步骤来看,

首先在每一次写之前,确认当前读取的start version是什么

第二,是记录所有read / write行为。

第三尝试去commit,确定是否有冲突

第四,前两start version判断,和所有竞争者确认,它们之间是否存在操作冲突,如果没有冲突,可以尝试直接commit, commit ID就是拿到最新的一个ID。

最后,Delta集成在spark中,当前Delta可能会集成在更多的流式或者批处理系统,可以依赖上层进行重试,处理异常

 

6Handling Massive Metadata

image.png

怎么样处理大量的Metadata,有一系列解决方案,

第一,整个设计理念是一个自循环,Delta集成在spark系统中,我们用spark自身来处理自身产生的Metadata

海量数据前提下,表达本身有可能也就变成一种海量数据源。既然持有一个大数据处理框架,自己产生的大数据问题就靠自己来解决,所以spark天然就变成处理Deltatable的处理引擎。

 

7Checkpoints

image.png

用spark处理所有Metadata,做一个check point的机制, 过一个时间点就做一次check point,这一次check point可以从下一次故障时间恢复的一个最近的时间点,每一个check on是在Delta范围中,有了 check point之后,不需要在list以前所有的commit log,直接在当前的 version打一个对应的check poin,不用再去读所有以版本号开头的 Jason file

为什么选Parquet格式

第一顺势而为, spark里面当前默认的数据格式是Parquet

第二对后续的一系列的操作可能只需要提取或改变某些字段.

 

8Computing Delta’s State

image.png

第一个场景,假设没有任何辅助check point情况下,比如有三个Jason File对应三个的Delta version的时候,算到最新的,会把当前的最新的版本用内存cash的情况放在这里,用户会从最新的开始读,那么它的复用率是比较高的,cash也是复用spark当中的cash。

image.png

然后假设有多个文通常进行一次读表操作,就是从最初的start version,如果用户没有指定start version,就是从最初开始,读到最后的一个,由spark进行计算。

image.png

假设这时中间有某checkpoint,可以跳过所有Jason file,从最近能找到的check file开始读,把check file解析出来,拿到增量的Jason  version

image.png

三、Time Travel

1Time Travelling by version

image.png

Delta中直接提供SQL语法version As of可以指定对应某一个version,指定后扫表操作相当于从 version开始,小于 version以前的所有操作可以被屏蔽。

2Time Travelling by timestamp

image.png

可支持直接在table上面加at的语法,在 DataFrame API里面也有对应参数可支持把version传进去,真正读表的操作,映射到Delta内部的实现方式,Deltalog会拿到snapshot,以对应的version作为结束。

对应 table的at形式, DataFrame的API需要注意是时间和version并不是一一对应

 

举例:

Delta的一个Metadata1071开始首先我们记录文件系统当中每一个commit file的时间,这是最常见的一个事件。

image.png

但时间有一个问题72号commit和73号commit的先后顺序是调转的时间乱序,在分布式系统当中这个时间记录到Meta当中会有这种问题,提前commit的文件,有可能在文件系统当中的记录时间反而小,处理这些问题Delta在time traveling的时候会先列出一份文件,拿到对应的文件列表之后,按照最可信的 ID进行排序,一旦发现时钟道具的时候,会把第二个时钟加上一毫秒,来解决这样的问题,然后让整个的commit按照需求的logic时间排序list。

image.png

拿到对应的时间序之后,去找到对应的时间节点,从那个点开始,后续的Metadata开始录入读取。

image.png

对于当前Time travel中get Timestamp处理方式,刚才提供的1492年10月28号,刚好是在这两个时间点中间,拿这个包含时间点最近的往上copy的ID 1071,对应翻译到Data log里,从1071的快照开始录取。

image.png

有两点限制:

Ø  需在time travel时确认 transaction log存在。

Ø  需有对应的Delta file

 

四、Batch / Streaming Queries on Delta Tables

关于Batch / Streaming Queries on Delta Tables中的模拟读取场景,首先Batch量的spark读取表的模式,开始先list,在Delta中称为update the state,每次出发,从commit log里读取当前所有file以及当前表述所存在状态与此同时进行系列的预先filter操作

image.png

根据Metadata当前的记录状态,进行评比,例如正在读的时间点有一些写入到一半的文件,有可能触发 correct的问题可以在读的时候,把文件预先提掉,最后执行query,这是最简单的批量power的执行方式。

image.png

复杂点的流失场景,首先计算系列表的状态,同时去partitionfilters流失不同的情况在于不停的list里面的file进行流失处理模式一开始读snapshot,从snapshot结束后,开始读取更新文件,snapshot的消费依据就是Spark Streaming里面对应有两个配置,第一个是max tesper trigger,另一个是maxBytesper trigger。

每次出发读多少文件,所有snapshot用这两个配置消费结束之后,尝试开始读新入文件,符合流失处理引擎的模式。

新写入的文件,开始不停的在表里面进行list,从commitID之后,逐渐一步的把新文件进行计算

其中需要注意的两个点

在 list snapshot时会读详细的Metadata,对于optimize的file有一个Metadata叫data change,生成新文件并没有带来任何新数,那么data change是force, streaming作业也不会读,这就可以在流失情况下去优化这张表,把小文件进行一个合并。

流式作业的场景下,如何不影响到线上作业运行,可以设置 economic,在流水作业触发的时候,假设依赖文件有变动,可以不停留,最就是vacuum操作,假设vacuum操作其实是Delta里面最严厉的一种删除操作,需要确认vacuum操作的影响。


image.png

做到 start Version和start Timestamp,这两个feature可以用在流式写入,不想从最初的 batch数据开始读,就可以使用 start Version或者是start Timestamp,它会综合刚才的原理,可以一定系列file change,也会对vacuum有一系列限制有一个细节需要注意, start Version以及 start Timestamp本身的点是不包含在启动当中。

 五、Demo

image.png

大家如果对 Databricks产品以及Databricks品牌原生spark系列的商业服务有兴趣,可以使用阿里云Databricks数据洞察 (DDI) 架构产品

相关文章
|
1月前
|
设计模式
二十三种设计模式全面解析-职责链模式的高级应用-日志记录系统
二十三种设计模式全面解析-职责链模式的高级应用-日志记录系统
|
8天前
|
监控 Java API
【Spring Boot】深入解密Spring Boot日志:最佳实践与策略解析
【Spring Boot】深入解密Spring Boot日志:最佳实践与策略解析
20 1
|
3天前
|
SQL Java 数据库连接
Mybatis日志SQL解析
Mybatis日志SQL解析
6 0
|
1月前
|
域名解析 缓存 监控
【域名解析 DNS 专栏】DNS 查询日志分析:洞察网络行为与优化建议
【5月更文挑战第28天】DNS查询日志分析对于理解和优化网络行为至关重要。通过日志,可洞察用户访问偏好、流量分布,进而进行缓存优化、负载均衡和安全检测。简单Python代码示例展示了如何读取和分析日志。根据分析结果,可针对性设置优化策略,提升网络性能、稳定性和安全性。不断探索新的分析方法,充分挖掘DNS查询日志的价值,以驱动网络持续优化。
|
18天前
|
存储 SQL NoSQL
ClickHouse(16)ClickHouse日志表引擎Log详细解析
ClickHouse的Log引擎系列适用于小数据量(<1M行)的表,包括StripeLog、Log和TinyLog。这些引擎将数据存储在磁盘,追加写入,不支持更新和索引,写入非原子可能导致数据损坏。Log和StripeLog支持并发访问和并行读取,Log按列存储,StripeLog将所有数据存于一个文件。TinyLog是最简单的,不支持并行读取和并发访问,每列存储在单独文件中。适用于一次性写入、多次读取的场景。
30 0
|
1月前
|
数据采集 监控 数据可视化
日志解析神器——Logstash中的Grok过滤器使用详解
日志解析神器——Logstash中的Grok过滤器使用详解
87 4
|
1月前
|
消息中间件 测试技术 Python
Python使用多线程解析超大日志文件
Python使用多线程解析超大日志文件
109 0
|
1月前
|
SQL 监控 关系型数据库
数据库日志解析:深入了解MySQL中的各类日志
数据库日志解析:深入了解MySQL中的各类日志
401 0
|
1月前
|
存储 人工智能 监控
日志服务 SLS 深度解析:拥抱云原生和 AI,基于 SLS 的可观测分析创新
阿里云日志服务 SLS 全面拥抱云原生和 AI,近一年持续进行技术创新,此次云栖大会上发布了在稳定可靠、高性能、开放易用、AI 加持、低成本等五个方面的全面升级。
102138 4
|
1月前
|
缓存 关系型数据库 MySQL
这个错误提示表明Flink CDC在解析MySQL的二进制日志时,找不到对应表的TableMap事件。
这个错误提示表明Flink CDC在解析MySQL的二进制日志时,找不到对应表的TableMap事件。
158 2

热门文章

最新文章

推荐镜像

更多