1. 介绍
Timline
(时间轴)由很多 instant
构成,按照时间由小到大排列。当不断写入Hudi数据集时,Timeline上的 Instant
会不断增加,为减小 Timeline
的操作压力,会在 commit
时按照配置对 instant
进行归档,并从 Timeline
上将已归档的 instant
删除。
2. 分析
在每次 commit
时会调用 HoodieCommitArchiveLog#archiveIfRequired
来判断是否需要进行归档,其核心代码如下
public boolean archiveIfRequired(final JavaSparkContext jsc) throws IOException { try { List<HoodieInstant> instantsToArchive = getInstantsToArchive(jsc).collect(Collectors.toList()); boolean success = true; if (!instantsToArchive.isEmpty()) { this.writer = openWriter(); archive(instantsToArchive); success = deleteArchivedInstants(instantsToArchive); } return success; } finally { close(); } }
其中需要通过 getInstantsToArchive
获取需要归档的 Instant
,然后在进行归档,接着再将 Instant
删除。
2.1 获取Instant
通过 getInstantsToArchive
来获取待归档的所有 Instant
,其核心代码如下
private Stream<HoodieInstant> getInstantsToArchive(JavaSparkContext jsc) { // 根据配置获取最大/小保留数 int maxCommitsToKeep = config.getMaxCommitsToKeep(); int minCommitsToKeep = config.getMinCommitsToKeep(); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); // 获取完成的CLEAN类型的timeline HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline() .getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants(); // 按照action进行一次排序,并根据最大保留数过滤出需要处理的instant Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants() .collect(Collectors.groupingBy(s -> s.getAction())).entrySet().stream().map(i -> { if (i.getValue().size() > maxCommitsToKeep) { // 待处理的instant return i.getValue().subList(0, i.getValue().size() - minCommitsToKeep); } else { return new ArrayList<HoodieInstant>(); } }).flatMap(i -> i.stream()); // 获取所有已完成的instant的timeline HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); // 最早的处于pending的compaction Option<HoodieInstant> oldestPendingCompactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); // 获取第一个(最早)的savepoint Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant(); // 已完成的timeline不为空,并且instant大于最大保留数,进一步处理 if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) { // 将已完成clean类型的instant与其他instant连接在一起 instants = Stream.concat(instants, commitTimeline.getInstants().filter(s -> { // 如果savepoint不存在,则不用过滤,若存在,那么需要过滤出小于savepoint的所有instant return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), s.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); }).filter(s -> { // 再进行一次过滤,过滤出小于最早的pending的compaction,即大于的instant会被保留,不会被archive return oldestPendingCompactionInstant.map(instant -> { return HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER); }).orElse(true); }).limit(commitTimeline.countInstants() - minCommitsToKeep)); } // 所有instant组成的timeline(包括各种中间状态的instant) HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); // 按照时间和action进行分类 Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants() .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(), HoodieInstant.getComparableAction(i.getAction())))); // 所有待处理的instant return instants.flatMap(hoodieInstant -> groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream()); }
在获取待处理的 instant
时,会先根据配置项读取最大/最小需要保留的 commit
,接着单独处理 clean
类型的 instant
,之后根据最早的 savepoint
和处于 pending
的 compaction
来过滤出需要被处理的 instant
(时间小于最早的 savepoint
并且也小于最早的 compaction
),然后与 clean
类型的 instant
合并,最后返回待处理的所有 instant
。
2.2 归档instant
在获取所有待归档的 Instant
后,便调用 HoodieCommitArchiveLog#archive
开始归档,其核心代码如下
public void archive(List<HoodieInstant> instants) throws HoodieCommitException { try { // 获取完成的instant的timeline HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); List<IndexedRecord> records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { // 将instant转化为IndexedRecord records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); if (records.size() >= this.config.getCommitArchivalBatchSize()) { // 写入文件 writeToFile(wrapperSchema, records); } } catch (Exception e) { if (this.config.isFailOnTimelineArchivingEnabled()) { // 抛出异常 throw e; } } } // 写入剩余的records writeToFile(wrapperSchema, records); } catch (Exception e) { throw new HoodieCommitException("Failed to archive commits", e); } }
可以看到,首先会调用 convertToAvroRecord
将 instant
根据不同类型转化为 IndexedRecord
并放入集合中,然后调用 writeToFile
将集合的记录写入文件,其中 writeToFile
方法核心代码如下
private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception { if (records.size() > 0) // Block头信息 Map<HeaderMetadataType, String> header = Maps.newHashMap(); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString()); // 生成Avro Block块 HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header); // 添加block块写入文件 this.writer = writer.appendBlock(block); // 清空集合 records.clear(); } }
可以看到,写入文件流程非常简单,即将集合放入 Block
块中,然后再写入文档文件中(与写入日志文件类似)。
2.3 删除Instant
在归档完后,需要调用 HoodieCommitArchiveLog#deleteArchivedInstants
方法来删除Timeline中的已归档的 instant
,其核心代码如下
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException { boolean success = true; for (HoodieInstant archivedInstant : archivedInstants) { Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); try { if (metaClient.getFs().exists(commitFile)) { // 文件存在 // 删除 success &= metaClient.getFs().delete(commitFile, false); } } catch (IOException e) { throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e); } } // 获取最后一个instant Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> { return i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION))); }).max(Comparator.comparing(HoodieInstant::getTimestamp))); if (latestCommitted.isPresent()) { // 存在 // 删除aux目录下的文件 success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get()); } return success; }
可以看到删除 instant
的主要逻辑就是删除其对应的文件(包括元数据目录和aux目录)。
3. 总结
对于 archive
而言,在每次 commit
时都会判断是否需要进行 archive
。Hudi分为三个步骤处理 archive
,即找出待归档的instant(根据配置找出,必须小于最早的 savepoint
和 pending
状态的 compaction
)、归档 instant
(将instant对应的文件的内容按照日志文件的写入方式写入归档文件)、删除已归档 instant
(删除对应的文件)。