1. 介绍
Hudi
将记录写入数据 parquet
文件或日志 log
文件,而这些文件在内存中是如何进行管理呢?如之前的文章中提到过的 HoodieFileGroup
、 FileSlice
等与数据文件和日志文件是什么对应关系?本篇详细分析 Hudi
的文件管理。
2. 分析
2.1 文件组
HoodieFileGroup
表示文件组,其包含的字段信息如下
public class HoodieFileGroup implements Serializable { // 文件组ID private final HoodieFileGroupId fileGroupId; // FileSlices,按照提交时间大小排序 private final TreeMap<String, FileSlice> fileSlices; // 时间轴 private final HoodieTimeline timeline; // 上一次完成的Instant,充当水位基准. private final Option<HoodieInstant> lastInstant; }
其中, HoodieFileGroup
由 HoodieFileGroupId
唯一标识;每个 HoodieFileGroup
中会包含一个 TreeMap<CommitTime,FileSlice>
,按照 CommitTime
从大到小排序;为方便操作会保存一个 Timeline
,以及最后完成的 Instant
。
2.2 文件组ID
HoodieFileGroupId
表示文件组ID,其包含字段信息如下
public class HoodieFileGroupId implements Serializable { // 分区路径 private final String partitionPath; // 文件ID private final String fileId; }
每个文件组ID由分区路径和文件ID唯一标识,不同的分区或不同的文件ID均属于不同的 HoodieFileGroup
。
2.3 文件片
FileSlice
表示文件片,其包含字段信息如下
public class FileSlice implements Serializable { // 文件组ID private HoodieFileGroupId fileGroupId; // Instant的时间 private String baseInstantTime; // 数据文件 private HoodieDataFile dataFile; // 日志文件列表,按照更低版本排序,在MOR时存在,COW时为空 private final TreeSet<HoodieLogFile> logFiles; }
一个 FileSlice
对应一个数据文件和日志文件列表,并且其包含一个基准时间(数据文件和日志文件都有相同的时间基准)。
2.4 数据文件
HoodieDataFile
表示数据文件,其包含字段信息如下
public class HoodieDataFile implements Serializable { // 文件状态 private transient FileStatus fileStatus; // 文件全路径 private final String fullPath; // 文件大小 private long fileLen; }
每个数据文件包含了一个文件状态,文件的全路径以及文件的长度。
2.5 日志文件
HoodieLogFile
包含的字段信息如下
public class HoodieLogFile implements Serializable { // 日志文件扩展名 public static final String DELTA_EXTENSION = ".log"; // 日志文件基准版本 public static final Integer LOGFILE_BASE_VERSION = 1; // 文件状态 private transient FileStatus fileStatus; // 文件路径 private final String pathStr; // 文件大小 private long fileLen; }
日志文件与数据文件包含信息类似,日志文件的初始化版本为1。
2.6 生成文件组
下面以 AbstractTableFileSystemView#buildFileGroups
为例,分析 HoodieFileGroup
的生成逻辑,其核心代码如下
protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieDataFile> dataFileStream, Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) { // 获取所有数据文件对应的分区路径、文件ID(相同的分区路径、文件ID会对应数据文件列表) Map<Pair<String, String>, List<HoodieDataFile>> dataFiles = dataFileStream.collect(Collectors.groupingBy((dataFile) -> { String partitionPathStr = getPartitionPathFromFilePath(dataFile.getPath()); return Pair.of(partitionPathStr, dataFile.getFileId()); })); // 获取所有日志文件对应的分区路径、文件ID(相同的分区路径、文件ID会对应日志文件列表) Map<Pair<String, String>, List<HoodieLogFile>> logFiles = logFileStream.collect(Collectors.groupingBy((logFile) -> { String partitionPathStr = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), logFile.getPath().getParent()); return Pair.of(partitionPathStr, logFile.getFileId()); })); // 初始化所有的数据文件和日志文件(过滤掉相同的) Set<Pair<String, String>> fileIdSet = new HashSet<>(dataFiles.keySet()); fileIdSet.addAll(logFiles.keySet()); List<HoodieFileGroup> fileGroups = new ArrayList<>(); fileIdSet.forEach(pair -> { // 获取文件ID String fileId = pair.getValue(); // 生成新的文件组 HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline); if (dataFiles.containsKey(pair)) { // 包含在数据文件集合中 // 添加该数据文件 dataFiles.get(pair).forEach(group::addDataFile); } if (logFiles.containsKey(pair)) { // 包含在日志文件集合中 // 添加该日志文件 logFiles.get(pair).forEach(group::addLogFile); } if (addPendingCompactionFileSlice) { // 添加pending的compaction的FileSlice Option<Pair<String, CompactionOperation>> pendingCompaction = getPendingCompactionOperationWithInstant(group.getFileGroupId()); if (pendingCompaction.isPresent()) { // 存在pending的compaction // 添加至文件组 group.addNewFileSliceAtInstant(pendingCompaction.get().getKey()); } } fileGroups.add(group); }); return fileGroups; }
可以看到,对于文件组的构建,首先会对指定分区的所有数据文件和日志文件进行一次排序(按照分区路径和文件ID),然后对每个 <分区路径,文件ID>
生成一个文件组,并将具有相同 <分支路径,文件ID>
的日志文件和数据文件放入该文件组。
下面简要介绍数据文件和日志文件的文件名的生成。
2.7 数据文件名生成
public static String makeDataFileName(String commitTime, String writeToken, String fileId) { return String.format("%s_%s_%s.parquet", fileId, writeToken, commitTime); }
使用文件ID、writeToken、提交时间组成完整的文件名,其中writeToken的生成方法如下
public static String makeWriteToken(int taskPartitionId, int stageId, long taskAttemptId) { return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId); }
2.8 日志文件名生成
public static String makeLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version, String writeToken) { String suffix = (writeToken == null) ? String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version) : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken); return LOG_FILE_PREFIX + suffix; }
使用文件ID、提交时间、日志文件扩展名、版本号、writeToken生成完整的文件名。
而对于文件ID、提交时间等可直接按照 _
进行分割或正则表达式来获取。
3. 总结
Hudi
中对文件的管理的核心是 HoodieFileGroup
,由 <分区路径,文件ID>
唯一标识,并且会保存不同的 FileSlice
,每个 FileSlice
包含最多一个数据文件和一个日志文件列表,对于有相同文件ID但不同提交时间的数据文件会保存在同一个 HoodieFileGroup
,而不同文件ID会保存在不同 HoodieFileGroup
中;而对于有相同文件ID和提交时间的数据文件和日志文件会被放入同一个 FileSlice
,对于具有相同文件ID,但不同提交时间的日志文件和数据文件会被放入不同的 FileSlice
。