1. 介绍
Hudi提供了savepoint机制,即可对instant进行备份,当后续出现提交错误时,便可rollback至指定savepoint,这对于线上系统至为重要,而savepoint由hudi-CLI手动触发,下面分析savepoint的实现机制。
2. 分析
2.1 创建savepoint
创建savepoint的入口为 HoodieWriteClient#savepoint
,其核心代码如下
public boolean savepoint(String commitTime, String user, String comment) { HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // MERGE_ON_READ类型表不支持savepoint throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); } // 获取已完成的clean的最后一个instant Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) { // instant不存在 throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant); } try { String lastCommitRetained; if (cleanInstant.isPresent()) { // 若clean instant存在 // 反序列化出clean的相关信息 HoodieCleanMetadata cleanMetadata = AvroUtils .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get()); // 保存的最早的commit lastCommitRetained = cleanMetadata.getEarliestCommitToRetain(); } else { // 获取最早已完成的instant lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp(); } // savepoint的时间必须大于/等于最早保留的commit时间 Preconditions.checkArgument( HoodieTimeline.compareTimestamps(commitTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL), "Could not savepoint commit " + commitTime + " as this is beyond the lookup window " + lastCommitRetained); Map<String, List<String>> latestFilesMap = jsc .parallelize(FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) .mapToPair((PairFunction<String, String, List<String>>) partitionPath -> { ReadOptimizedView view = table.getROFileSystemView(); // 获取该partition下所有小于commitTime的文件 List<String> latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime) .map(HoodieDataFile::getFileName).collect(Collectors.toList()); return new Tuple2<>(partitionPath, latestFiles); }).collectAsMap(); // 转化为savepoint metadata. HoodieSavepointMetadata metadata = AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap); // 创建savepoint类型的instant,在meta目录下会创建inflight类型文件 table.getActiveTimeline().createNewInstant( new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime)); // 标记为complete状态,并将savepoint metadata存入文件 table.getActiveTimeline() .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime), AvroUtils.serializeSavepointMetadata(metadata)); return true; } catch (IOException e) { throw new HoodieSavepointException("Failed to savepoint " + commitTime, e); } }
可以看到,首先会根据是否已经有完成的clean类型的instant,如果存在则会反序列化出对应的 HoodieCleanMetadata
,并获取最早保留的commit的时间,然后获取所有分区路径下所有小于savepoint时间的文件,然后转化为 HoodieSavepointMetadata
后保存至元数据目录下的文件中。创建savepoint的最终结果就是在元数据目录下创建了一个*.savepoint的文件。
2.2 回滚savepoint
在创建完savepoint之后,便可回滚至指定的savepoint,其入口为 HoodieWriteClient#rollbackToSavepoint
,其核心代码如下
public boolean rollbackToSavepoint(String savepointTime) { HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); // rollback是需要手动操作,并且不支持并发写或compaction,rollback将会移除savepoint之后处于pending状态的compaction HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline(); HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint); if (!isSavepointPresent) { // 检查instant是否存在 throw new HoodieRollbackException("No savepoint for commitTime " + savepointTime); } // 恢复至指定savepoint的instant restoreToInstant(savepointTime); Option<HoodieInstant> lastInstant = activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant(); Preconditions.checkArgument(lastInstant.isPresent()); Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime), savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was " + lastInstant.get().getTimestamp()); return true; }
可以看到,回滚至指定savepoint时必须是手动执行,然后校验该instant是否存在,然后调用 restoreToInstant
恢复至指定instant, restoreToInstant
的核心代码如下
public void restoreToInstant(final String instantTime) throws HoodieRollbackException { HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); // 过滤出大于指定commit time的instant List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline() .getReverseOrderedInstants() .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime)) .collect(Collectors.toList()); // 创建新的instant String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime(); // ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder(); // 创建restore类型的instant,在元数据目录下创建inflight文件 table.getActiveTimeline().createNewInstant( new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant)); instantsToRollback.stream().forEach(instant -> { try { switch (instant.getAction()) { case HoodieTimeline.COMMIT_ACTION: case HoodieTimeline.DELTA_COMMIT_ACTION: // 回滚,从高到低 List<HoodieRollbackStat> statsForInstant = doRollbackAndGetStats(instant); instantsToStats.put(instant.getTimestamp(), statsForInstant); break; case HoodieTimeline.COMPACTION_ACTION: // 回滚,从高到低 List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant); instantsToStats.put(instant.getTimestamp(), statsForCompaction); break; default: throw new IllegalArgumentException("invalid action name " + instant.getAction()); } } catch (IOException io) { throw new HoodieRollbackException("unable to rollback instant " + instant, io); } }); try { // 结束恢复 finishRestore(context, instantsToStats.build(), instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), startRollbackInstant, instantTime); } catch (IOException io) { throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io); } }
可以看到会根据指定的instant过滤出大于该instant的所有instant(包括commit和compaction类型的待回滚的instant),然后依次遍历每个instant,其中 doRollbackAndGetStats
核心代码如下
private List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant instantToRollback) throws IOException { // 获取commit time final String commitToRollback = instantToRollback.getTimestamp(); HoodieTable<T> table = HoodieTable.getHoodieTable( createMetaClient(true), config, jsc); HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline(); HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); // Check if any of the commits is a savepoint - do not allow rollback on those commits // 获取所有的已完成的savepoint的timeline List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); savepoints.stream().forEach(s -> { if (s.contains(commitToRollback)) { // 为savepoint,则抛出异常,不允许回滚savepoint throw new HoodieRollbackException( "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s); } }); String lastCommit = commitToRollback; // 保证严格按照从高到低顺序回滚 if ((lastCommit != null) && !commitTimeline.empty() && !commitTimeline.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) { throw new HoodieRollbackException( "Found commits after time :" + lastCommit + ", please rollback greater commits first"); } // 获取处于inflight和requested状态的timeline List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) { throw new HoodieRollbackException( "Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first"); } // 进行rollback List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true); // 回滚索引 if (!index.rollbackCommit(commitToRollback)) { throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback); } return stats; }
可以看到,在进行回滚时会进行严格的检验,即从高的commit(timestamp大)到低的commit(timestamp小),在校验通过后会调用 rollback
进行实际的回滚, rollback
将会删除对应的instant和文件,具体细节后续会单独分析。
在回滚完后会调用 finishRestore
表示结束恢复,把一些统计信息持久化,其核心代码如下
private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats, List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); Option<Long> durationInMs = Option.empty(); Long numFilesDeleted = 0L; for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) { // 统计删除的文件数 List<HoodieRollbackStat> stats = commitToStat.getValue(); numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); } // 转化为metadata HoodieRestoreMetadata restoreMetadata = AvroUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats); // 保存至meta目录 table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime), AvroUtils.serializeRestoreMetadata(restoreMetadata)); if (!table.getActiveTimeline().getCleanerTimeline().empty()) { // 清理老的meta文件 FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), table.getActiveTimeline().getRestoreTimeline().getInstants()); } }
可以看到在rollback之后会生成一些统计信息,然后会将统计信息存储至元数据目录下。
3. 总结
Hudi提供了savepoint机制可对某一instant进行备份,然后可通过rollback回滚至指定的savepoint,但值得注意的是回滚只能从大的savepoint开始回滚,即存在多个savepoint的情况下,不能直接回退至较小的savepoint。而创建savepoint流程则会将待回滚的信息先存储至元数据目录,而回滚savepoint流程则会从最大的instant开始进行回滚,最后会将回滚的统计信息写入元数据目录。