深度解析Delta Transaction Log

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 深度解析Delta Transaction Log

简要:

一、What is Delta

二、The Delta Log(Transaction Log)

三、Time Travel

四、Batch/Streaming Queries on Delta Tables

五、Demo

 

 

 

一、What is Delta

1Data Lake场景

国内热度很高的数据场景:过来系列流失批量数据,期望可以流失处理的同时达到批处理办事效果,真正对应AI &Reporting,那么就可以用SparkStreaming进行流式数据的接收处理,打通整个在线流。

image.png

Lambda架构离线和在线分支两个作业,不同情况部署会存在某一个或多个时刻节点,尤其具体需要详细对齐两边数据的时候,就涉及在线离线数据的校验问题。 也许可以用spark解决这样的问题,周期性跑校验任务,把 validation这一过程补齐,周期性补数据。

image.png

解决了数据校验的问题,离线数据还需要继续看数据修复问题,一段时间之后,发现过去某天的数据有缺失或者错误,需要重新回跑数据,进行过去数据的校验。

对于在线查询,也许修复的那天数据此刻属于未确定状态,也不确定当前是不是需要让用户查修复状态时保留哪个版本,需要进行不停的校验

image.png

修复以及数据更新的过程显而易见,假设已经确定这份数据有问题,那么线上查询以及对于用户,尤其是public或者AI场景的一些脏数据的清洗,包括在线即时查询的一些场景,需要停下来,等把完整数据更新之后再向用户开放。

理想中的Data Lake

点要求

1.  用连续处理的模式处理数据

2.  不再从批量和流式中做出选择

3.  增量处理新到达的数据

4.  低维护成本

image.png

Databricks在若干实践中,对理想模型的答案就是Data Lack,对应问题及解决方案

第一,具备同时读写并且有数据一致性保证的能力,通过数据格式,解决reader和writer通过快照进行隔离,Reader和Writer以及多Reader和多Rriter互相之间没有影响。

第二,具备高吞吐从大表读数据的能力,使用Spark来并发处理大表元数据。

第三脏数据回滚,提供Time Travel,可以回馈到任何一个之前处理过的快照时间点,发现若干次或者说某几次的写入操作有脏数据,可以做成退

第四,具备在线处理历史数据的能力,在不停留的情况下,流式作业是面向未来的,实时处理当下的数据处理历史数据的时候,原则上不应该影响到流式作业,流式作业可以继续跑所以通过同一作业处理历史数据

第五,不阻塞下游作业前提下处理迟到数据迟到数据可以直接入库,当冲突发生的时候做出对应的仲裁。

image.png

二、The Delta Log(Transaction Log)

1Deltaon Disk

基于Delta介绍,最关键的Transaction log的支持设计。一个Delta表,存磁盘上有一关键目录,_Delta_log目录里存的Delta Transaction Log

image.png

目录里有数字开头的Json文件,每个文件对一次Table Version一系列操作存在当前版本,版本不断递增所有Transaction Log可以数据共存

Delta Table当中最终数据文件存在 park里,如果从Delta搬去任何一个系统,也是兼容和支持的,只不过放了一个隐藏的添加,这是整个在磁盘上的文件格式。image.png

2Table = result of a set of actions

Delta当中对table的设计理念是把table抽象成对数据的操作,一系列的action最终凑成了一张表一个常见的表Metadata包括name、Schema及partitioning信息,这些信息在改动过程中,会触发一个Metadata的action。

action就最常见的是add a fild,把表里新增数据以file形式加到表里,Remove File不是简单delete的操作。

Delta中对Remove Fild的更多依赖是在于compassion,例如输写一段话,发现因为不同的写者对于 file的大小无法控制,如何处理让整个文件当中的大小文件均匀,在Delta中有一个optimized, SQL命令可以直接将若干文件进行compassion后把原始文件删掉。

Action对于streaming最为关键,当中有一个最小的原子的 batch,需要有Transaction的支持,前 streaming的小batch写入失败,需要回退到之前,需要一开始写入的时候set Transaction中间有了问题会得到原来的点,这是对于streaming当中的一个最好的支持。

Delta这个项目从历史来讲,它诞生于spark的StructuredStreaming,想在流式场景下处理复杂的并发问题,以及处理复杂的并发写表场景下,需要有更好的一种格式支持,最后诞生了Delta因为国外更高的数据标准导致Delta变成一个独立项目,并且对它的需求越来越多。

最后一个Action是Change Protocol对于一个table,假设将它定义是一个一系列操作的结果,table必然是要跨版本,或者跨多个Deltaversion进行一个长期兼容,尤其对于商业版场景,Change Protocol关键点就在于假Deltafeature或者说Delta的解析方式有变动,里面必须要打入应的版本信息,支持向前向后兼容。

 

3Implementing Atomicity

image.png

首先我们看一下原子性,例如Delta有两个版本,第一个version 0,Delta给原始的大table中想写入两个数据文件,需要把这两个文件delete新增一个数据文件。

相当对表的每一次操作都会蜕变成若干个原子的action,修改表的动作结束之后,会打成一个commit,变成对应的Metadata file放在 Transaction Log里面,这次行为可能包括add remote、 update 等。

 

4Ensuring Serializability

 image.png

所有的table的version都是以递增方式排列,如何处理并发的问题,例如有两个对应的读者,表初始有若干个version,从version0到version1,假设user1从user0开始读,usr2是从version1开始读,这个时候usr2先完成,user2可以接着往下走,user1从version0也可能走到version2这个点,就存在一个竞争关系。

Delta当中对于下一个version的竞争关系的定义,用order的形式然后去判定由哪一个order进行持续下一个 versionID的权限,通过判断,需要check关键的设计细节。

如果把Structure Streaming这种写的方式,或者日常使用Deltatablehave table的形式做一个归类,可以发现大部分场景是由append对原有的table进行少量的更新操作。

基于这样一个用户场景,将Delta的设计理念变成一个乐观持有的冲突解决并发方式。

5Solving Conflicts Optimistically

image.png

如何通过乐观的方式解决对应冲突,拆分步骤来看,

首先在每一次写之前,确认当前读取的start version是什么

第二,是记录所有read / write行为。

第三尝试去commit,确定是否有冲突

第四,前两start version判断,和所有竞争者确认,它们之间是否存在操作冲突,如果没有冲突,可以尝试直接commit, commit ID就是拿到最新的一个ID。

最后,Delta集成在spark中,当前Delta可能会集成在更多的流式或者批处理系统,可以依赖上层进行重试,处理异常

 

6Handling Massive Metadata

image.png

怎么样处理大量的Metadata,有一系列解决方案,

第一,整个设计理念是一个自循环,Delta集成在spark系统中,我们用spark自身来处理自身产生的Metadata

海量数据前提下,表达本身有可能也就变成一种海量数据源。既然持有一个大数据处理框架,自己产生的大数据问题就靠自己来解决,所以spark天然就变成处理Deltatable的处理引擎。

 

7Checkpoints

image.png

用spark处理所有Metadata,做一个check point的机制, 过一个时间点就做一次check point,这一次check point可以从下一次故障时间恢复的一个最近的时间点,每一个check on是在Delta范围中,有了 check point之后,不需要在list以前所有的commit log,直接在当前的 version打一个对应的check poin,不用再去读所有以版本号开头的 Jason file

为什么选Parquet格式

第一顺势而为, spark里面当前默认的数据格式是Parquet

第二对后续的一系列的操作可能只需要提取或改变某些字段.

 

8Computing Delta’s State

image.png

第一个场景,假设没有任何辅助check point情况下,比如有三个Jason File对应三个的Delta version的时候,算到最新的,会把当前的最新的版本用内存cash的情况放在这里,用户会从最新的开始读,那么它的复用率是比较高的,cash也是复用spark当中的cash。

image.png

然后假设有多个文通常进行一次读表操作,就是从最初的start version,如果用户没有指定start version,就是从最初开始,读到最后的一个,由spark进行计算。

image.png

假设这时中间有某checkpoint,可以跳过所有Jason file,从最近能找到的check file开始读,把check file解析出来,拿到增量的Jason  version

image.png

三、Time Travel

1Time Travelling by version

image.png

Delta中直接提供SQL语法version As of可以指定对应某一个version,指定后扫表操作相当于从 version开始,小于 version以前的所有操作可以被屏蔽。

2Time Travelling by timestamp

image.png

可支持直接在table上面加at的语法,在 DataFrame API里面也有对应参数可支持把version传进去,真正读表的操作,映射到Delta内部的实现方式,Deltalog会拿到snapshot,以对应的version作为结束。

对应 table的at形式, DataFrame的API需要注意是时间和version并不是一一对应

 

举例:

Delta的一个Metadata1071开始首先我们记录文件系统当中每一个commit file的时间,这是最常见的一个事件。

image.png

但时间有一个问题72号commit和73号commit的先后顺序是调转的时间乱序,在分布式系统当中这个时间记录到Meta当中会有这种问题,提前commit的文件,有可能在文件系统当中的记录时间反而小,处理这些问题Delta在time traveling的时候会先列出一份文件,拿到对应的文件列表之后,按照最可信的 ID进行排序,一旦发现时钟道具的时候,会把第二个时钟加上一毫秒,来解决这样的问题,然后让整个的commit按照需求的logic时间排序list。

image.png

拿到对应的时间序之后,去找到对应的时间节点,从那个点开始,后续的Metadata开始录入读取。

image.png

对于当前Time travel中get Timestamp处理方式,刚才提供的1492年10月28号,刚好是在这两个时间点中间,拿这个包含时间点最近的往上copy的ID 1071,对应翻译到Data log里,从1071的快照开始录取。

image.png

有两点限制:

Ø  需在time travel时确认 transaction log存在。

Ø  需有对应的Delta file

 

四、Batch / Streaming Queries on Delta Tables

关于Batch / Streaming Queries on Delta Tables中的模拟读取场景,首先Batch量的spark读取表的模式,开始先list,在Delta中称为update the state,每次出发,从commit log里读取当前所有file以及当前表述所存在状态与此同时进行系列的预先filter操作

image.png

根据Metadata当前的记录状态,进行评比,例如正在读的时间点有一些写入到一半的文件,有可能触发 correct的问题可以在读的时候,把文件预先提掉,最后执行query,这是最简单的批量power的执行方式。

image.png

复杂点的流失场景,首先计算系列表的状态,同时去partitionfilters流失不同的情况在于不停的list里面的file进行流失处理模式一开始读snapshot,从snapshot结束后,开始读取更新文件,snapshot的消费依据就是Spark Streaming里面对应有两个配置,第一个是max tesper trigger,另一个是maxBytesper trigger。

每次出发读多少文件,所有snapshot用这两个配置消费结束之后,尝试开始读新入文件,符合流失处理引擎的模式。

新写入的文件,开始不停的在表里面进行list,从commitID之后,逐渐一步的把新文件进行计算

其中需要注意的两个点

在 list snapshot时会读详细的Metadata,对于optimize的file有一个Metadata叫data change,生成新文件并没有带来任何新数,那么data change是force, streaming作业也不会读,这就可以在流失情况下去优化这张表,把小文件进行一个合并。

流式作业的场景下,如何不影响到线上作业运行,可以设置 economic,在流水作业触发的时候,假设依赖文件有变动,可以不停留,最就是vacuum操作,假设vacuum操作其实是Delta里面最严厉的一种删除操作,需要确认vacuum操作的影响。


image.png

做到 start Version和start Timestamp,这两个feature可以用在流式写入,不想从最初的 batch数据开始读,就可以使用 start Version或者是start Timestamp,它会综合刚才的原理,可以一定系列file change,也会对vacuum有一系列限制有一个细节需要注意, start Version以及 start Timestamp本身的点是不包含在启动当中。

 五、Demo

image.png

大家如果对 Databricks产品以及Databricks品牌原生spark系列的商业服务有兴趣,可以使用阿里云Databricks数据洞察 (DDI) 架构产品

相关文章
|
3月前
|
存储 监控 安全
深入解析Sysmon日志:增强网络安全与威胁应对的关键一环
在不断演进的网络安全领域中,保持对威胁的及时了解至关重要。Sysmon日志在这方面发挥了至关重要的作用,通过提供有价值的见解,使组织能够加强其安全姿态。Windows在企业环境中是主导的操作系统,因此深入了解Windows事件日志、它们的独特特性和局限性,并通过Sysmon进行增强,变得至关重要。
|
3月前
|
存储 关系型数据库 MySQL
MySQL中的Redo Log、Undo Log和Binlog:深入解析
【10月更文挑战第21天】在数据库管理系统中,日志是保障数据一致性和完整性的关键机制。MySQL作为一种广泛使用的关系型数据库管理系统,提供了多种日志类型来满足不同的需求。本文将详细介绍MySQL中的Redo Log、Undo Log和Binlog,从背景、业务场景、功能、底层实现原理、使用措施等方面进行详细分析,并通过Java代码示例展示如何与这些日志进行交互。
402 0
|
4月前
|
存储 缓存 关系型数据库
redo log 原理解析
redo log 原理解析
68 0
redo log 原理解析
|
5月前
|
存储 运维 监控
深入Linux核心:文件系统与日志解析
【8月更文挑战第20天】
103 2
|
5月前
|
Java Shell Linux
【Linux入门技巧】新员工必看:用Shell脚本轻松解析应用服务日志
关于如何使用Shell脚本来解析Linux系统中的应用服务日志,提供了脚本实现的详细步骤和技巧,以及一些Shell编程的技能扩展。
83 0
【Linux入门技巧】新员工必看:用Shell脚本轻松解析应用服务日志
|
5月前
|
数据库 Java 监控
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
72 0
|
5月前
|
SQL 安全 测试技术
【数据守护者必备】SQL数据备份与恢复策略全解析:从全量到日志备份,手把手教你确保企业信息万无一失的实战技巧!
【8月更文挑战第31天】数据库是企业核心业务数据的基石,为防止硬件故障、软件错误或人为失误导致的数据丢失,制定可靠的备份与恢复策略至关重要。本文通过一个在线购物平台的案例,详细介绍了使用 SQL Server 进行全量备份、差异备份及事务日志备份的方法,并演示了如何利用 SQL Server Agent 实现自动化备份任务。此外,还提供了数据恢复的具体步骤和测试建议,确保数据安全与业务连续性。
274 0
|
6月前
|
数据采集 分布式计算 DataWorks
DataWorks产品使用合集之任务工作流中遇到了日志信息显示参数值没有正确解析的问题,该如何处理
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
7月前
|
SQL DataWorks Oracle
DataWorks产品使用合集之datax解析oracle增量log日志该如何操作
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
75 0
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
110 2

推荐镜像

更多