开发者学堂课程【Databricks数据洞察公开课:如何使用 Delta Lake 构建批流一体数据仓库】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/1058/detail/15564
如何使用 Delta Lake 构建批流一体数据仓库
内容介绍
一、Data lake 的项目背景
二、理想的 Data Lake
三、构建批流一体数据湖
四、DLF + DDI 的一站式数据湖构建与分析
一、Data lake 的项目背景
下面介绍的是 Data lake 的项目背景以及想要解决的问题,从问题以及背景展开之后,介绍相关的实现原理。最后通过一个 live demo 形式串起来 Data lake当中的核心理念以及如何在生产环境中的使用。
首先可以了解其背景。目前的用户都有很多相关的经验去构建数仓,去处理数据。产业界其实耗费了大量的资源来构建相关的系统。可以发现半结构化数据、实时数据、批量数据和用户数据一系列的数据存储在各个地方,然后以各个不同的处理形式来为用户提供服务。其实用户期望的理想形式是什么?
是一种更一体化或者说更加聚焦的一个系统,可以让更专业的事情人来干更专业的事情。希望一套系统有流式和批量同时的处理能力。可以提供推荐服务,可以提供报警服务,可以提供用户分析一系列的问题。但是现实的情况是,低质量不可靠的数据,导致一体化的事情很难去进行。
另外差强人的性能不一定能达到实时的入库以及实时的查询的一个性能的要求。
如此背景下,应运而生了对应的 Data lake ,接下来用一个常见的用户场景来看一下没有 Data lake 怎么解决问题。在最常见的 data lake 场景中。例如有一系列的流失数据,不停的从 kafka 系统来流入。然后用户期望具有实时处理的能力,又可以把数据周期性放在 data lake 当中。同时需要整套系统的出口是具有 AI 能力以及 Reporting 能力的。
第一条处理流较为简单,用 APACHE Spark 直接用 Streaming 来打通实时流。
与此同时,想要离线流的时候,历史查询可以使用 λ-arch 结构对应的方式。APACHE Spark 提供了很好的抽象设计,用一种代码或者说一种 API 然后来完成流和实时的 λ-arch 架构设计。那么从历史数据的查询,进一步用 Spark 来进行 SQL 分析以及 Spark SQL 的作业的形式来产生 AI 及 reporting 的能力。需要面对的第一个问题就是关于数据的校验。
流式数据和批量数据,假设以 λ-arch架构的形式存在的时候,如何去确认在某一个时间点查出来的数据是对的,到底流式的数据和批量的数据相差多少?
批量数据什么时候该与流式数据进行同步?必然而然,还需要引入的就是 validate 的问题,尤其是报表系统面向用户的精确的数据分析系统,需要有一个 validation 培训的步骤。所以需要一支旁支,然后来解决流式和批量之间的同步问题以及对应的验证问题。假设解决了问题之后,系统 run 一段时间,会发现对应的某个 partition 的数据出了问题,当天的脏数据,在若干天之后发现需要修正。
通常来讲应该把线上的查询停掉,然后需要修复出去,修复出去以后重新恢复线上的任务。在这过程中,其实无形的给系统架构又加上了一个修复以及过去版本回复的能力。
假设解决完了 reprocess 的问题之后,比如,在 AI 和 reporting 的最终的出口端,可以看到,有新的一系列的需求。比方说有一天业务部门或者说上级部门合作部门提出是否给一些 schema change 。比方说新加一个字段,当数据越来越多人用,把 user ID 维度加进去,这个时候要怎么处理?
导致 data lake 去加 schema 停留,然后对应的数据需要重新处理。解决完一个问题,然后又会有新的问题。如果 case by case 去解决的话,会导致系统不停的往上打补丁。一个原本上比较简单或者一体化的需求,会变得越来越的冗余和复杂。所以理想当中的 data lake 应该是什么样?
二、理想的Data Lake
理想当中的 Data lake 应该是入口,出口对应的系统来干对应的事情。唯一的核心就是中间的 Data lake 层,也就是对应的数据治理以及数据入仓的整个过程。可以用连续处理的模式来处理数据。
同时增量的数据也可以增量,以增量的streaming 的方式来去处理新到达的数据。用户不需要再从批量和流式中作出选择,或者说批量和流式互相之间做出退让。而是在流式的时候需要考虑批量或者在批量的时候要考虑流式的作业,不应该 by design 。
如果可以一体化整个 data lake 架构,自然而然拥有很好的维护成本。Data lake 一系列的问题是如何在 Data lake 当中去解决的。
(1) 需要具备同时读写并且有数据一致性保证的一个能力。在 Data
lake 当中,reader 和 writer 是通过一种快照机制来进行隔离的。也就是说 reader 和 writer 可以各自写入和读出,互不影响。
(2) 就是需要具备有高吞吐的从大表中读数据的能力,其实可以想象
当一个表大了之后,其本身的源数据、快照、checkpoint 版本以及变更 C 码一系列所有的源数据操作本身就会变成一个大数据的问题。 Data lake 当中设计非常棒的一点,就是将 meta data 也视为大数据问题,通过 Spark 框架自身来并发处理大表的源数据问题。所以在 Data lake 当中是不具备,不会去单点去处理meta data 的情况。
(3) 就是历史数据以及脏数据的回滚,需要有 time travel 的能力来
回溯到某一个时间点,去进行一个数据清洗。
(4) 就是具备在线处理历史数据的能力。在历史数据回填当中,依然
可以实时的处理当前流入的新的数据,不需要停留,也不需要考虑哪些是实时,哪些是离线。
(5) 是可以在不阻断下游作业的前提下处理迟到的数据,可以直接入
表。以上五点完全地解决之后,就可以用 Data lake 来替代一系列批流分制的架构设计。
什么是基于 Data lake 的架构设计?
其架构设计一系列的语言数据,或者最低的级别就是表。将数据一层一层的分成基础数据表,中间数据表以及最终的高质量数据表。
所有的一切只需要关注的就是表的上游和下游,两者之间的依赖关系是不是变得更加的简单和干净,只需要关注业务层面的数据组织。所以 Data lake 是统一批量流式的持续数据流的模型。
三、构建批流一体数据湖
通过一个 demo 的形式来演示如何在 daybreaks 数据洞察里面实现 批流一体的操作。
假设有一个用户场景,数据工程师在处理 rot 设备的数据的时候,需要端对端的 pipeline 的方式来实时的处理数据。
如果选择社区版的 Spark struck streaming 结合PARQUENT 开源格式来完成项目,后面会带来一系列的问题,比如说随着 streaming 的进行,会产生越来越多的小文件,小文件如果过多的话,会对查询造成很大的影响,有一个很大的负担。
第二个是随着迭代,比如会增加字段或者是减少字段或一些逻辑的变化。
可能会导致停掉流失计算,然后需要对 schema 进行一个变更,还要对历史数据进行一个回灌等等一系列的麻烦的操作。所以需要用到 Data lake 架构来实现批流一体操作。
下面演示如何实现批流一体操作的。首先定义三个不同格式的路径。
PARQUENT 的格式,一个 SILVERPATH 格式,其实是可利用的数据,就是初步创建的 data 数据。还有一个 gold 数据,相当于是一个报表数据,有一定聚合后的数据,后面是添加了用来测试的一个参数,不至于造成数据资源过大。
接下来创建一些自定义的 data frame , data frame 每次会创建 1 万条数据,这 1 万条数据是通过ID 来模拟。 如果为 0 的话,那么 action 的字段会 open ,如果不为 0 ,那么会 close ,会使 expr 的 open 100 close 数据,同时 random 一个data 以及 random 一个 device ID 字段。
下面会将 date frame 写入到的 PARQUENT文件里面。以 date partition 的 overwrite 形式写进去。
当查询数据后,此数据是一半的 open ,一半的 close ,各 5000 条。
下面使用 Spark streaming 的方式往 PARQUENT 文件里面去写入数据。
同样创建了一个自定义的一个流失数据,每秒是 100 条。通过指定一个 option 为 open 的字段。同时 random 一个 date 以及random 一个device 。然后实时的往里面写,每 5 秒写一次,同时写 point 一个路径,写入 PARQUENT 的文件里面。查询 PARQUENT 数据,会有什么样的变化。当查询的时候,只能查到 update 的数据,但是却查不到历史数据。
因为前面已经添加了 5000 条 open 和 5000 条 close的 action ,但是却查不到。其实在 APACHE 社区版的 Spark 设计之初的时候,对于同一个目录的源数据管理是不具备流失和批量同时管理的能力的。所以只能查到最新的一个流式数据的变化。
以同样的方式来定义一个 Data 的数据,会有什么样的效果呢?以 data 作为 partisan by K 往 SILVERPATH 里面去写,写入的格式是Data ,然后查询,同样也是各 5000 条。
通过创建 stream date 从流的方式,然后往里面写入到Data 的文件里面。
点击查询,数据是否变化。数据是已经能够查到历史数据,所以其优势是在处理流失数据的过程中不影响批处理的查询。
接下来以 SILVERPATH 作为数据源,作为流式的数据源,进行查询。同样在流式数据里面添加一个以 action date 和 device ID 作为 partition 和 group by ,加一个 count ,将聚合数据写入到 gold path 里面。
下一步是查询 gold path ,其数据是否发生变化。经查询,数据已经同步。如果在数据处理中,例如,迭代的过程中会有一些字段的添加或删除。以字段添加为例。对于 data 要如何操作呢?
首先创建一个 data frame 的一个 new data ,增加了一个 user ID 的字段,其次是点击执行。同样以 append 的方式把数据去 append 到 SILVERPATH 里面。SILVERPATH 只有 action date 和 device ID 的,却没有 user ID 的。在添加的过程中需要查看是否变化。
此时代码会提醒 schema 文件已经发生了变更,是否需要去确认?用mode schema 来确认,用户是否允许发送变更呢?然后添加一个 option mode schema 为 true 的 option ,继续写入。
此时,数据已经写入到 SILVERPATH 数据里面。经查询,对于四月一号之后的数据是已经成功添加到了 UID 的字段。查询四月一号之前的数据, user 是为 null 的,代表没有的。如果添加字段或者是修改 schema ,只需要在定义的过程中把 schema 添加进去,然后通过 option 的方式到 mode 。更改 schema 的表里面或者 Data 的文件目录里面,代表成功了。
下面是 RC 的 transaction ,就是其事物的介绍。对于四月一号之前和之后的数据,添加了user ID 字段。如果四月一号之前的数据同样也把历史数据替换过来。那么如何操作呢?
如果传统意义上,可能需要停掉流数据,然后把历史数据添加进去以后,再开启。那么 data 架构,是可以直接 update ,把历史数据更新回来。
查询十月一号之前的数据,经查询,其优质 ID 已经更新,添加了user ID 字段。同样以 SILVERPATH 作为原数据,把数据进行处理以后,写入到 golden path 里面作为一个报表数据,然后替换路径。经查询,已完成。
下面介绍的是 time travel 概念,就是在执行批处理计算的时候,用户可能想回到历史版本。
比如,已经添加了 user ID ,那如何想回到之前没有 user ID 字段的版本呢?有一个功能,就是 describe history 可以看到数据版本。如果不指定,查到的肯定是最新的版本。
可以看到每一个 version 对应的数据是实现方式。接下来定义 version 为 10 的版本号。如果不定义,查到数据是最新版本。如果定义 version AS of 版,和之前的 10 号版本作对比。查到的数据明显看到此版本是没有 user ID 字段的。前面最新的版本是有 user ID 字段的。关于 data 有很多的案例,会发布公众号上,所以可以去翻阅和查看。
四、DLF + DDI 的一站式数据湖构建与分析
下面讲解的是 DLF + DDI 的一站式数据湖构建与分析。架构图可以看到 DIF 可以实时同步 MySQL 的数据,然后写入到 data 里面。同时 demo 里面的DDI 的 struck dreaming 通过实时的处理流数据,然后同步到 data 数据湖。
此架构使用计算存储分离。计算是在 data 架构里,存储是在 OSS ,源数据统一管理在 DIF 源数据湖,同时实现批处理的作业。