1. 引入
Hudi提供了两种存储类型,即 CopyOnWrite(COW)
和 MergeOnRead(MOR)
。COW
在数据插入时会直接写入parquet数据文件,对于更新时也会直接更新并写入新的parquet数据文件;而 MOR
在数据插入时会写入parquet数据文件,对于更新时则一般会写入log增量日志文件,而后进行压缩合并。之前在Upsert在Hudi中的实现分析已经分析过在 COW
类型下Hudi是如何处理 upsert
,这篇文章主要分析在 MOR
类型下Hudi是如何处理 upsert
。
2. 分析
为 COW
类型时,对于记录的 upsert
,其步骤如下:
- 给记录打标签,即记录存在于哪些文件中,用于判断是进行更新还是插入操作。
- 创建分区器用于重新分区。会创建多个
bucket
,其对应分区总数,每个bucket
对应一个FileId
(已存在文件ID或新文件ID)和类型(INSERT
、UPDATE
)。对于INSERT
操作,在查找分区录下所有的小文件后,优先将记录插入到这些小文件中,若还剩余记录,则插入新文件。 - 重新进行分区,不同分区获取对应的
bucket
后,则可知对该分区上的记录进行何种操作(由bucket
类型决定),对于UPDATE
操作,则合并老记录后写入新的parquet文件;对于INSERT
操作,则直接写入新的parquet文件。
为 MOR
类型时,对于记录的 upsert
,总体步骤与上述类似,只是创建的分区器类型为 HoodieMergeOnReadTable.MergeOnReadUpsertPartitioner
,其为 HoodieCopyOnWriteTable.UpsertPartitioner
子类,两者在查找小文件时的表现不同。
2.1. Insert
对于记录的 insert
而言(分区对应的bucket类型为 INSERT
),最终会调用 HoodieMergeOnReadTable#handleInsert
方法来处理该操作,其核心代码如下
public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception { if (index.canIndexLogFiles()) { // 支持索引日志文件,则写入log日志文件 return new MergeOnReadLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx); } else { // 不支持索引日志文件,则直接插入parquet数据文件 return super.handleInsert(commitTime, idPfx, recordItr); } }
可以看到,其首先会判断所采用的索引是否支持索引日志文件,Hudi提供的三种类型的索引:HoodieBloomIndex
、 HBaseIndex
、 InMemoryHashIndex
,其中 HoodieBloomIndex
不支持索引日志文件,而其他两种均支持,支持索引表示可以对日志log文件进行插入操作,如只有log增量日志文件而无parquet数据文件(现在社区正打算对log增量日志文件支持索引,因此后续就可以直接写入log增量日志文件了)。
若支持索引日志文件,则会生成一个 MergeOnReadLazyInsertIterable
对象,其是 CopyOnWriteLazyInsertIterable
的子类,然后由其 consumeOneRecord
提供写入,其核心代码如下
protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) { final HoodieRecord insertPayload = payload.record; List<WriteStatus> statuses = new ArrayList<>(); if (handle == null) { handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix)); } if (handle.canWrite(insertPayload)) { // 还可继续写入 // 实际写入 handle.write(insertPayload, payload.insertValue, payload.exception); } else { // 已经满了 handle.close(); statuses.add(handle.getWriteStatus()); // 新生成对象继续处理写入 handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix)); // 实际写入 handle.write(insertPayload, payload.insertValue, payload.exception); } }
可以看到其会借助 HoodieAppendHandle#write
完成真正的写入,具体对于log文件格式及写入Hudi做了很多优化,后续专门分析。
若不支持索引日志文件,则会调用父类的方法处理插入,即会生成一个 CopyOnWriteLazyInsertIterable
对象来处理写入,其会写入parquet数据文件,前面文章Upsert在Hudi中的实现分析已经分析过,不再赘述。
2.2. Update
对于记录的 update
而言(分区对应的bucket类型为 UPDATE
),最终会调用 HoodieMergeOnReadTable#handleUpdate
方法来处理该操作,其核心代码如下
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException { if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { // 不支持索引日志文件并且小文件集合中包含该文件ID,则交由父类处理,会更新parquet文件 return super.handleUpdate(commitTime, fileId, recordItr); } else { // 写入日志文件 HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr); appendHandle.doAppend(); appendHandle.close(); return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator(); } }
可以看到,首先判断是否支持索引日志文件并且小文件集合中是否包含了正在操作的文件。
若不支持索引日志文件并且操作的文件为小文件,则直接调用父类的 HoodieCopyOnWrite#handleUpdate
方法将记录与老记录合并后写入新的parquet数据文件。
否则,则使用 HoodieAppendHandle
将记录写入log增量日志文件。
下面分析对于 HoodieMergeOnReadTable.MergeOnReadUpsertPartitioner
查找小文件的方法,这也与 HoodieCopyOnWriteTable.UpsertPartitioner
区分器的主要不同点。方法核心代码如下
protected List<SmallFile> getSmallFiles(String partitionPath) { List<SmallFile> smallFileLocations = new ArrayList<>(); // 获取commit、deltacommit的timeline HoodieTimeline commitTimeline = getCompletedCommitsTimeline(); if (!commitTimeline.empty()) { // 获取最后一个instant HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); List<FileSlice> allSmallFileSlices = new ArrayList<>(); if (!index.canIndexLogFiles()) { // 不支持索引日志文件 // 过滤所有的FileSlice,然后进行排序后选出第一个(最小)的FileSlice Option<FileSlice> smallFileSlice = Option.fromJavaOptional(getRTFileSystemView() .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) .filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getDataFile().get().getFileSize() < config.getParquetSmallFileLimit()) .sorted((FileSlice left, FileSlice right) -> left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize() ? -1 : 1) .findFirst()); if (smallFileSlice.isPresent()) { allSmallFileSlices.add(smallFileSlice.get()); } } else { // 支持索引日志文件 // 获取最新的所有FileSlice List<FileSlice> allFileSlices = getRTFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { if (isSmallFile(partitionPath, fileSlice)) { // 为小文件(结合数据文件和日志文件计算) allSmallFileSlices.add(fileSlice); } } } for (FileSlice smallFileSlice : allSmallFileSlices) { SmallFile sf = new SmallFile(); if (smallFileSlice.getDataFile().isPresent()) { // 数据文件存在 String filename = smallFileSlice.getDataFile().get().getFileName(); // 基于数据文件构造属性信息 sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice); smallFileLocations.add(sf); smallFiles.add(sf); } else { // 数据文件不存在 HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); // 基于第一个日志文件构造属性信息 sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), FSUtils.getFileIdFromLogPath(logFile.getPath())); sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice); smallFileLocations.add(sf); smallFiles.add(sf); } } } return smallFileLocations; }
该方法首先会过滤 completed
状态的 commit
、 deltacommit
类型的 timeline
,并找到最后一次 commit
(可能是 commit
、 deltacommit
),为 MOR
类型时, timeline
中的 commit
表示已完成的 compact
。
若不支持索引日志文件,则查找最新的所有 FileSlice
(由一个数据parquet数据文件和多个log增量日志文件组成)并且其数据文件大小小于配置的大小且无日志文件,然后排序后取最小的文件,该文件即为小文件(一个)。
若支持索引日志文件,则查找最新的所有 FileSlice
进行遍历,并利用log增量日志文件信息然后生成小文件(多个)。
该方法获取的小文件用于在 handleUpdate
时判断操作的文件是否为小文件,若为小文件并且不支持日志文件索引,则可直接更新该文件,否则生成新的log增量日志文件。
总结
对于 MOR
类型存储而言,数据写入及更新流程与 COW
大致相同;但对于 MOR
类型而言,在 insert
时,会根据是否支持索引日志文件来决定将记录写入log增量日志文件还是parquet数据文件(支持则写入log增量文件,否则写入parquet数据文件);在 update
时,其也会根据是否支持直接写入日志文件和更新的文件是否为小文件来决定是否合并新老记录写入parquet数据或者将新记录写入log增量日志文件中(不支持并且为小文件,则直接更新旧的parquet文件记录并写入新的parquet数据文件,否则写入log增量文件中)。