介绍
Hudi维护着一条对Hudi数据集所有操作的不同 Instant
组成的 Timeline
(时间轴),通过时间轴,用户可以轻易的进行增量查询或基于某个历史时间点的查询,这也是Hudi对外提供基于时间点查询的核心能力之一,本篇将详细介绍 Timeline
。
Timeline
Timeline
在Hudi中被定义为 HoodieTimeline
接口,该接口定义了针对 Instant
的不同操作,包括 commit
、 deltacommit
、 clean
、 rollback
、 savepoint
、 compaction
、 restore
,以及对这些操作进行过滤的方法以及针对 Instant
的状态和操作类型生成不同的文件名的方法,这些操作的含义如下
commit
:将记录原子写入数据集。deltacommit
:将一批记录原子写入到MergeOnRead
存储类型的数据集(写入增量日志log文件中)。clean
:删除数据集中不再需要的旧版本文件。rollback
:表示当commit/deltacommit
不成功时进行回滚,其会删除在写入过程中产生的部分文件。savepoint
:将某些文件组标记为已保存,以便其不会被删除。在发生灾难需要恢复数据的情况下,它有助于将数据集还原到时间轴上的某个点。compaction
:将基于行的log日志文件转变成列式parquet数据文件。compaction
在时间轴上表现为特殊提交。restore
:将从某个savepoint
恢复。
Timeline
与 Instant
密切相关,每条 Timeline
必须包含零或多个 Instant
。所有 Instant
构成了 Timeline
, Instant
在Hudi中被定义为 HoodieInstant
,其主要包含三个字段
State state = State.COMPLETED; String action; String timestamp;
state
:状态,如 requested
、 inflight
、 completed
等状态,状态会转变,如当提交完成时会从 inflight
状态转变为 completed
状态。
action
:操作,对数据集执行的操作类型,如 commit
、 deltacommit
等。
tmiestamp
:时间戳,发生的时间戳,Hudi会保证单调递增。
HoodieTimeline
接口中也定义了一些基于 Instant
的方法,如过滤及生成文件名,其子类继承结构如下
其中 HoodieDefaultTimeline
为缺省实现,实现了在 Timeline
上对 Instant
操作的具体逻辑,如过滤所有处理 inflight
、 completed
状态的 Instant
组成的 HoodieTimeline
、及指定某提交时间后的所有 Instant
组成的 HoodieTimeline
、及是否包含某个 Instant
等方法。
而 HoodieDefaultTimeline
有 HoodieActiveTimeline
、 HoodieArchivedTimeline
两个实现,其中, HoodieActiveTimeline
表示活动的 Timeline
,其包含过去12小时(可配置大小)之内所有的 Instant
,也是最重要的实现。而对应12小时(可配置大小)之前的 Instant
则构成了归档的 Timeline
,即 HoodieArchivedTimeline
。
HoodieActiveTimeline
中包含了操作(创建/删除) inflight
、 completed
、 requested
状态的 Instant
对应的文件(所有文件均在 .hoodie
元数据目录下)。
当生成 HoodieActiveTimeline
实例时,会借助 HoodieTableMetaClient#scanHoodieInstantsFromFileSystem
来读取 .hoodie
元数据目录下的所有有效提交,然后生成对应状态的 Instant
,其核心代码如下
public static List<HoodieInstant> scanHoodieInstantsFromFileSystem(FileSystem fs, Path metaPath, Set<String> includedExtensions) throws IOException { return Arrays.stream(HoodieTableMetaClient.scanFiles(fs, metaPath, path -> { // Include only the meta files with extensions that needs to be included String extension = FSUtils.getFileExtension(path.getName()); return includedExtensions.contains(extension); })).sorted(Comparator.comparing( // Sort the meta-data by the instant time (first part of the file name) fileStatus -> FSUtils.getInstantTime(fileStatus.getPath().getName()))) // create HoodieInstantMarkers from FileStatus, which extracts properties .map(HoodieInstant::new).collect(Collectors.toList()); }
该方法会扫描所有的文件,然后判断文件的扩展名是不是包含在指定的提交类型(包括 .commit
、 .inflight
、 .deltacommit
、 .savepoint
等)集合中,并且会按照提交时间大小进行排序后返回。文件名的生成在 HoodieTimline
接口中定义,一般由 提交时间+操作类型
组成,如 20150315123625.commit
。
HoodieActiveTimeline
还提供了生成下一次提交时间的方法,核心代码如下
public static String createNewCommitTime() { lastInstantTime.updateAndGet((oldVal) -> { String newCommitTime = null; do { newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); } while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL)); return newCommitTime; }); return lastInstantTime.get(); }
可以看到, Timeline
会维护上一个 Instant
的提交时间,然后在根据当前时间生成下一个提交时间时会判断其是否大于上一次提交时间,若不大于,则会一直循环调用生成,这样便保证了提交时间的单调递增,现在提交时间只精确到秒,粒度相对较粗,后续社区有计划将其支持到毫秒。
HoodieActiveTimeline
的子类 RollbackTimeline
表示所有的 Instant
均为 rollback
类型,即有效的提交类型集合中只包含 rollback
类型,其他均与父类相同。
HoodieArchivedTimeline
表示12小时(可配置大小)之前的所有 Instant
。在构造该实例时会从归档目录下的 commits
文件中读取已归档的数据并生成对应的 Instant
。
总结
Timeline
(时间轴)是Hudi中非常重要的概念,基于历史时间点的查询及增量查询均需由 Timeline
提供支持,因此了解 Timeline
对于理解Hudi支持何种查询非常有用。