1. 介绍
前面介绍了log日志文件的写入,接着分析log日志文件的读取。
2. 分析
读取日志文件的主要入口为 AbstractHoodieLogRecordScanner#scan
,本文分为处理数据块、删除块、控制块来分别讲解其处理流程。
2.1 处理数据块/删除块
在构造 HoodieLogFormatReader
后,会通过其 hasNext
和 next
来读取日志文件中的 HoodieLogBlock
并处理, scan
方法中处理数据块/删除块的核心代码如下
public void scan() { HoodieLogFormatReader logFormatReaderWrapper = null; try { logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize); // 已扫描文件列表 Set<HoodieLogFile> scannedLogFiles = new HashSet<>(); while (logFormatReaderWrapper.hasNext()) { // 有下个block // 获取日志文件 HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); // 获取下个block块 HoodieLogBlock r = logFormatReaderWrapper.next(); if (r.getBlockType() != CORRUPT_BLOCK && !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), this.latestInstantTime, HoodieTimeline.LESSER_OR_EQUAL)) { // 读取的文件块不为CORRUPT类型,并且文件块的时间大于最新的时间,表示非同一批的提交,退出处理 break; } switch (r.getBlockType()) { case AVRO_DATA_BLOCK: // 数据块 if (isNewInstantBlock(r) && !readBlocksLazily) { // 为新instant的block块并且为非延迟读取 // 处理之前队列里的同一批block块 processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size()); } // 将当前block块放入队列 currentInstantLogBlocks.push(r); break; case DELETE_BLOCK: // 删除块 if (isNewInstantBlock(r) && !readBlocksLazily) { // 新instant的block块并且为非延迟读取 // 处理之前队列里的同一批block块 processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size()); } // 将当前block块放入队列 currentInstantLogBlocks.push(r); break; ... }
在 scan
方法中,对于数据块和删除块的操作相同,即均会判断是不是新 instant
对应的块(当前队列是否为空并且与上次写入队列的 block
块的时间是否相同),若不是同一批,即instant的时间不同,那么调用 processQueuedBlocksForInstant
开始处理,其核心代码如下
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks, int numLogFilesSeen) throws Exception { while (!lastBlocks.isEmpty()) { // 队列不为空 // 取出队尾block HoodieLogBlock lastBlock = lastBlocks.pollLast(); switch (lastBlock.getBlockType()) { // 判断block类型 case AVRO_DATA_BLOCK: // 数据块 // 处理数据块 processAvroDataBlock((HoodieAvroDataBlock) lastBlock); break; case DELETE_BLOCK: // 删除块 // 处理删除的所有key Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey); break; case CORRUPT_BLOCK: LOG.warn("Found a corrupt block which was not rolled back"); break; default: break; } } }
可以看到只要队列不为空,就会从队尾取出 Block
块,然后根据类型处理数据块或者删除块。
2.1.1 处理数据块
使用 processAvroDataBlock
对数据块进行处理,其核心代码如下
private void processAvroDataBlock(HoodieAvroDataBlock dataBlock) throws Exception { // 获取数据块的所有记录 List<IndexedRecord> recs = dataBlock.getRecords(); totalLogRecords.addAndGet(recs.size()); for (IndexedRecord rec : recs) { // 构造HoodieRecord HoodieRecord extends HoodieRecordPayload> hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN); // 处理HoodieRecord,其对应子类的具体实现 processNextRecord(hoodieRecord); } }
首先获取该数据块的所有记录,然后调用具体子类的 processNextRecord
进行处理,以 HoodieMergedLogRecordScanner
的实现为例说明( HoodieUnMergedLogRecordScanner
直接进行回调),其核心代码如下
protected void processNextRecord(HoodieRecord extends HoodieRecordPayload> hoodieRecord) throws IOException { // 获取key String key = hoodieRecord.getRecordKey(); if (records.containsKey(key)) { // Map缓存中包含该key,该Map基于Disk实现 // 将内容合并 HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData()); // 放入Map缓存 records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); } else { // Map缓存中不包含该key // 直接放入缓存 records.put(key, hoodieRecord); } }
可以看到首先会判断记录的key在缓存中是否存在,若存在,则将内容合并,否则放入缓存,该缓存基于磁盘实现,平衡内存占用,当无内存空间时,将数据写入磁盘。
2.1.2 处理删除块
处理删除块,会调用具体实现子类的 processNextDeletedKey
方法来处理删除记录,其核心代码如下
protected void processNextDeletedKey(HoodieKey hoodieKey) { // 生成空的内容,然后放入缓存 records.put(hoodieKey.getRecordKey(), SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(), hoodieKey.getPartitionPath(), getPayloadClassFQN())); }
可以看到其会为对应的 key
生成空的内容(达到软删除目的),然后放入缓存。
2.2 处理控制块
在 scan
方法中处理控制块( command
)的核心代码如下
case COMMAND_BLOCK: HoodieCommandBlock commandBlock = (HoodieCommandBlock) r; String targetInstantForCommandBlock = r.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME); switch (commandBlock.getType()) { // 控制类型 case ROLLBACK_PREVIOUS_BLOCK: // 回滚 int numBlocksRolledBack = 0; totalRollbacks.incrementAndGet(); while (!currentInstantLogBlocks.isEmpty()) { // 当前队列不为空 // 取出块 HoodieLogBlock lastBlock = currentInstantLogBlocks.peek(); // 上个block为CORRUPT if (lastBlock.getBlockType() == CORRUPT_BLOCK) { // 弹出 currentInstantLogBlocks.pop(); numBlocksRolledBack++; } else if (lastBlock.getBlockType() != CORRUPT_BLOCK && targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) { // 不为CORRUPT并且内容相等 // 弹出 currentInstantLogBlocks.pop(); numBlocksRolledBack++; } else if (!targetInstantForCommandBlock .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) { // 时间不相等 // 退出 break; } } break; default: throw new UnsupportedOperationException("Command type not yet supported."); } break;
可以看到,处理控制块时,会根据控制块的具体类型对已放入队列的块进行处理,现只支持回滚,对于队列中上一个类型为 CORRUPT
的块或者类型不为 CORRUPT
但时间相等的块,均从队列中弹出;当时间不相等时,则退出处理。
对于日志文件的 Block
块的处理是基于 Deque
处理,在处理完会放入基于磁盘的 Map
中( HoodieMergedLogRecordScanner
实现),然后可以通过 HoodieMergeLogRecordScanner#getRecords
直接获取 Map
或者通过 HoodieMergeLogRecordScanner#iterator
获取该 Map
的迭代器,现在社区对该 Map
结构做重构,性能最多可提升6倍。
2.3 判断是否有下一个(hasNext)
在处理数据块之前,需要通过 HoodieLogFormatReader#hasNext
方法判断是否还有下一个 HoodieLogBlock
,其核心代码如下
public boolean hasNext() { if (currentReader == null) { // 为空则为false return false; } else if (currentReader.hasNext()) { // 有下一个 return true; } else if (logFiles.size() > 0) { // 日志文件列表大于0 try { HoodieLogFile nextLogFile = logFiles.remove(0); if (!readBlocksLazily) { // 非延迟读取block, 表示已经读完,则直接关闭 this.currentReader.close(); } else { // 加入集合中 this.prevReadersInOpenState.add(currentReader); } // 生成新的reader this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } // 判断当前reader有无下一个 return this.currentReader.hasNext(); } return false; }
可以看到,如果当前读取器( HoodieLogFileReader
)为 null
,那么表示已经读完所有日志文件,直接返回 false
;否则若当前读取器有下一个,那么返回 true
;否则若日志文件列表大小大于0,那么读取下一个日志文件,并生成新的读取器( HoodieLogFileReader
),然后再判断是否有下一个;否则直接返回 false
。其中, HoodieLogFileReader#hasNext
方法通过读取魔数( #HUDI#
)来判断是否有下一个,核心代码如下
private boolean hasNextMagic() throws IOException { long pos = inputStream.getPos(); // 1. Read magic header from the start of the block inputStream.readFully(MAGIC_BUFFER, 0, 6); if (!Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC)) { // 比较是否为魔数 return false; } return true; }
2.4 读取下一个(next)
在调用 hasNext
后,若存在下一个,那么需要调用 next
获取 HoodieLogBlock
,获取 Block
块的核心代码如下
private HoodieLogBlock readBlock() throws IOException { int blocksize = -1; int type = -1; HoodieLogBlockType blockType = null; Map<HeaderMetadataType, String> header = null; try { // 1 Read the total size of the block blocksize = (int) inputStream.readLong(); } catch (EOFException | CorruptedLogFileException e) { // 异常时创建CORRUPT的Block return createCorruptBlock(); } boolean isCorrupted = isBlockCorrupt(blocksize); if (isCorrupted) { return createCorruptBlock(); } // 2. Read the version for this log format this.nextBlockVersion = readVersion(); // 3. Read the block type for a log block if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) { type = inputStream.readInt(); Preconditions.checkArgument(type < HoodieLogBlockType.values().length, "Invalid block byte type found " + type); blockType = HoodieLogBlockType.values()[type]; } // 4. Read the header for a log block, if present if (nextBlockVersion.hasHeader()) { header = HoodieLogBlock.getLogMetadata(inputStream); } int contentLength = blocksize; // 5. Read the content length for the content if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) { contentLength = (int) inputStream.readLong(); } // 6. Read the content or skip content based on IO vs Memory trade-off by client // TODO - have a max block size and reuse this buffer in the ByteBuffer // (hard to guess max block size for now) long contentPosition = inputStream.getPos(); byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, contentLength, readBlockLazily); // 7. Read footer if any Map<HeaderMetadataType, String> footer = null; if (nextBlockVersion.hasFooter()) { footer = HoodieLogBlock.getLogMetadata(inputStream); } // 8. Read log block length, if present. This acts as a reverse pointer when traversing a // log file in reverse long logBlockLength = 0; if (nextBlockVersion.hasLogBlockLength()) { logBlockLength = inputStream.readLong(); } // 9. Read the log block end position in the log file long blockEndPos = inputStream.getPos(); switch (blockType) { // 创建不同的Block case AVRO_DATA_BLOCK: if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { return HoodieAvroDataBlock.getBlock(content, readerSchema); } else { return HoodieAvroDataBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, contentPosition, contentLength, blockEndPos, readerSchema, header, footer); } case DELETE_BLOCK: return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, contentPosition, contentLength, blockEndPos, header, footer); case COMMAND_BLOCK: return HoodieCommandBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, contentPosition, contentLength, blockEndPos, header, footer); default: throw new HoodieNotSupportedException("Unsupported Block " + blockType); } }
可以看到,对于 Block
的读取,与写入时的顺序相同,然后根据不同类型生成不同的 Block
。
3. 总结
日志文件的读取,与日志文件写入的顺序相同。在读取后会将不同类型的 HoodieLogBlock
先放入 Deque
中处理,然后会根据不同的读取策略( HoodieUnMergedLogRecordScanner
、 HoodieMergedLogRecordScanner
)进行不同的处理,如 Merged
策略会将同一key的内容进行合并(会处理删除和真实数据内容的合并),然后再将合并后的结果放入缓存中供读取;而 UnMerged
策略则直接对 HoodieRecord
进行回调处理。