Hudi Log日志文件读取分析(三)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Hudi Log日志文件读取分析(三)

1. 介绍

前面介绍了log日志文件的写入,接着分析log日志文件的读取。

2. 分析

读取日志文件的主要入口为 AbstractHoodieLogRecordScanner#scan,本文分为处理数据块、删除块、控制块来分别讲解其处理流程。

2.1 处理数据块/删除块

在构造 HoodieLogFormatReader后,会通过其 hasNextnext来读取日志文件中的 HoodieLogBlock并处理, scan方法中处理数据块/删除块的核心代码如下

public void scan() {
    HoodieLogFormatReader logFormatReaderWrapper = null;
    try {
      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
          logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
          readerSchema, readBlocksLazily, reverseReader, bufferSize);
      // 已扫描文件列表
      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
      while (logFormatReaderWrapper.hasNext()) { // 有下个block
        // 获取日志文件
        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
        // 获取下个block块
        HoodieLogBlock r = logFormatReaderWrapper.next();
        if (r.getBlockType() != CORRUPT_BLOCK
            && !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), this.latestInstantTime,
                HoodieTimeline.LESSER_OR_EQUAL)) {
          // 读取的文件块不为CORRUPT类型,并且文件块的时间大于最新的时间,表示非同一批的提交,退出处理
          break;
        }
        switch (r.getBlockType()) {
          case AVRO_DATA_BLOCK: // 数据块
            if (isNewInstantBlock(r) && !readBlocksLazily) { // 为新instant的block块并且为非延迟读取
              // 处理之前队列里的同一批block块
              processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size());
            }
            // 将当前block块放入队列
            currentInstantLogBlocks.push(r);
            break;
          case DELETE_BLOCK: // 删除块
            if (isNewInstantBlock(r) && !readBlocksLazily) { // 新instant的block块并且为非延迟读取
              // 处理之前队列里的同一批block块
              processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size());
            }
            // 将当前block块放入队列
            currentInstantLogBlocks.push(r);
            break;
          ...
  }

scan方法中,对于数据块和删除块的操作相同,即均会判断是不是新 instant对应的块(当前队列是否为空并且与上次写入队列的 block块的时间是否相同),若不是同一批,即instant的时间不同,那么调用 processQueuedBlocksForInstant开始处理,其核心代码如下

private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks, int numLogFilesSeen) throws Exception {
    while (!lastBlocks.isEmpty()) { // 队列不为空
      // 取出队尾block
      HoodieLogBlock lastBlock = lastBlocks.pollLast();
      switch (lastBlock.getBlockType()) { // 判断block类型
        case AVRO_DATA_BLOCK: // 数据块
          // 处理数据块
          processAvroDataBlock((HoodieAvroDataBlock) lastBlock);
          break;
        case DELETE_BLOCK: // 删除块
          // 处理删除的所有key
          Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
          break;
        case CORRUPT_BLOCK:
          LOG.warn("Found a corrupt block which was not rolled back");
          break;
        default:
          break;
      }
    }
  }

可以看到只要队列不为空,就会从队尾取出 Block块,然后根据类型处理数据块或者删除块。

2.1.1 处理数据块

使用 processAvroDataBlock对数据块进行处理,其核心代码如下

private void processAvroDataBlock(HoodieAvroDataBlock dataBlock) throws Exception {
    // 获取数据块的所有记录
    List<IndexedRecord> recs = dataBlock.getRecords();
    totalLogRecords.addAndGet(recs.size());
    for (IndexedRecord rec : recs) {
      // 构造HoodieRecord
      HoodieRecord extends HoodieRecordPayload> hoodieRecord =
          SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN);
      // 处理HoodieRecord,其对应子类的具体实现
      processNextRecord(hoodieRecord);
    }
  }

首先获取该数据块的所有记录,然后调用具体子类的 processNextRecord进行处理,以 HoodieMergedLogRecordScanner的实现为例说明( HoodieUnMergedLogRecordScanner直接进行回调),其核心代码如下

protected void processNextRecord(HoodieRecord extends HoodieRecordPayload> hoodieRecord) throws IOException {
    // 获取key
    String key = hoodieRecord.getRecordKey();
    if (records.containsKey(key)) { // Map缓存中包含该key,该Map基于Disk实现
      // 将内容合并
      HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData());
      // 放入Map缓存
      records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue));
    } else { // Map缓存中不包含该key
      // 直接放入缓存
      records.put(key, hoodieRecord);
    }
  }

可以看到首先会判断记录的key在缓存中是否存在,若存在,则将内容合并,否则放入缓存,该缓存基于磁盘实现,平衡内存占用,当无内存空间时,将数据写入磁盘。

2.1.2 处理删除块

处理删除块,会调用具体实现子类的 processNextDeletedKey方法来处理删除记录,其核心代码如下

protected void processNextDeletedKey(HoodieKey hoodieKey) {
    // 生成空的内容,然后放入缓存
    records.put(hoodieKey.getRecordKey(), SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
  }

可以看到其会为对应的 key生成空的内容(达到软删除目的),然后放入缓存。

2.2 处理控制块

scan方法中处理控制块( command)的核心代码如下

case COMMAND_BLOCK:
            HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
            String targetInstantForCommandBlock =
                r.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
            switch (commandBlock.getType()) { // 控制类型
              case ROLLBACK_PREVIOUS_BLOCK: // 回滚
                int numBlocksRolledBack = 0;
                totalRollbacks.incrementAndGet();
                while (!currentInstantLogBlocks.isEmpty()) { // 当前队列不为空
                  // 取出块
                  HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
                  // 上个block为CORRUPT
                  if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
                    // 弹出
                    currentInstantLogBlocks.pop();
                    numBlocksRolledBack++;
                  } else if (lastBlock.getBlockType() != CORRUPT_BLOCK
                      && targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) { // 不为CORRUPT并且内容相等
                    // 弹出
                    currentInstantLogBlocks.pop();
                    numBlocksRolledBack++;
                  } else if (!targetInstantForCommandBlock
                      .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) { // 时间不相等
                    // 退出
                    break;
                  } 
                }
                break;
              default:
                throw new UnsupportedOperationException("Command type not yet supported.");
            }
            break;

可以看到,处理控制块时,会根据控制块的具体类型对已放入队列的块进行处理,现只支持回滚,对于队列中上一个类型为 CORRUPT的块或者类型不为 CORRUPT但时间相等的块,均从队列中弹出;当时间不相等时,则退出处理。

对于日志文件的 Block块的处理是基于 Deque处理,在处理完会放入基于磁盘的 Map中( HoodieMergedLogRecordScanner实现),然后可以通过 HoodieMergeLogRecordScanner#getRecords直接获取 Map或者通过 HoodieMergeLogRecordScanner#iterator获取该 Map的迭代器,现在社区对该 Map结构做重构,性能最多可提升6倍。

2.3 判断是否有下一个(hasNext)

在处理数据块之前,需要通过 HoodieLogFormatReader#hasNext方法判断是否还有下一个 HoodieLogBlock,其核心代码如下

public boolean hasNext() {
    if (currentReader == null) { // 为空则为false
      return false;
    } else if (currentReader.hasNext()) { // 有下一个
      return true;
    } else if (logFiles.size() > 0) { // 日志文件列表大于0
      try {
        HoodieLogFile nextLogFile = logFiles.remove(0);
        if (!readBlocksLazily) { // 非延迟读取block, 表示已经读完,则直接关闭
          this.currentReader.close();
        } else {
          // 加入集合中
          this.prevReadersInOpenState.add(currentReader);
        }
        // 生成新的reader
        this.currentReader =
            new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
      } catch (IOException io) {
        throw new HoodieIOException("unable to initialize read with log file ", io);
      }
      // 判断当前reader有无下一个
      return this.currentReader.hasNext();
    }
    return false;
  }

可以看到,如果当前读取器( HoodieLogFileReader)为 null,那么表示已经读完所有日志文件,直接返回 false;否则若当前读取器有下一个,那么返回 true;否则若日志文件列表大小大于0,那么读取下一个日志文件,并生成新的读取器( HoodieLogFileReader),然后再判断是否有下一个;否则直接返回 false。其中, HoodieLogFileReader#hasNext方法通过读取魔数( #HUDI#)来判断是否有下一个,核心代码如下

private boolean hasNextMagic() throws IOException {
    long pos = inputStream.getPos();
    // 1. Read magic header from the start of the block
    inputStream.readFully(MAGIC_BUFFER, 0, 6);
    if (!Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC)) { // 比较是否为魔数
      return false;
    }
    return true;
  }

2.4 读取下一个(next)

在调用 hasNext后,若存在下一个,那么需要调用 next获取 HoodieLogBlock,获取 Block块的核心代码如下

private HoodieLogBlock readBlock() throws IOException {
    int blocksize = -1;
    int type = -1;
    HoodieLogBlockType blockType = null;
    Map<HeaderMetadataType, String> header = null;
    try {
      // 1 Read the total size of the block
      blocksize = (int) inputStream.readLong();
    } catch (EOFException | CorruptedLogFileException e) {
      // 异常时创建CORRUPT的Block
      return createCorruptBlock();
    }
    boolean isCorrupted = isBlockCorrupt(blocksize);
    if (isCorrupted) {
      return createCorruptBlock();
    }
    // 2. Read the version for this log format
    this.nextBlockVersion = readVersion();
    // 3. Read the block type for a log block
    if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) {
      type = inputStream.readInt();
      Preconditions.checkArgument(type < HoodieLogBlockType.values().length, "Invalid block byte type found " + type);
      blockType = HoodieLogBlockType.values()[type];
    }
    // 4. Read the header for a log block, if present
    if (nextBlockVersion.hasHeader()) {
      header = HoodieLogBlock.getLogMetadata(inputStream);
    }
    int contentLength = blocksize;
    // 5. Read the content length for the content
    if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) {
      contentLength = (int) inputStream.readLong();
    }
    // 6. Read the content or skip content based on IO vs Memory trade-off by client
    // TODO - have a max block size and reuse this buffer in the ByteBuffer
    // (hard to guess max block size for now)
    long contentPosition = inputStream.getPos();
    byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, contentLength, readBlockLazily);
    // 7. Read footer if any
    Map<HeaderMetadataType, String> footer = null;
    if (nextBlockVersion.hasFooter()) {
      footer = HoodieLogBlock.getLogMetadata(inputStream);
    }
    // 8. Read log block length, if present. This acts as a reverse pointer when traversing a
    // log file in reverse
    long logBlockLength = 0;
    if (nextBlockVersion.hasLogBlockLength()) {
      logBlockLength = inputStream.readLong();
    }
    // 9. Read the log block end position in the log file
    long blockEndPos = inputStream.getPos();
    switch (blockType) {
      // 创建不同的Block
      case AVRO_DATA_BLOCK:
        if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
          return HoodieAvroDataBlock.getBlock(content, readerSchema);
        } else {
          return HoodieAvroDataBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
              contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
        }
      case DELETE_BLOCK:
        return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
            contentPosition, contentLength, blockEndPos, header, footer);
      case COMMAND_BLOCK:
        return HoodieCommandBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
            contentPosition, contentLength, blockEndPos, header, footer);
      default:
        throw new HoodieNotSupportedException("Unsupported Block " + blockType);
    }
  }

可以看到,对于 Block的读取,与写入时的顺序相同,然后根据不同类型生成不同的 Block

3. 总结

日志文件的读取,与日志文件写入的顺序相同。在读取后会将不同类型的 HoodieLogBlock先放入 Deque中处理,然后会根据不同的读取策略( HoodieUnMergedLogRecordScannerHoodieMergedLogRecordScanner)进行不同的处理,如 Merged策略会将同一key的内容进行合并(会处理删除和真实数据内容的合并),然后再将合并后的结果放入缓存中供读取;而 UnMerged策略则直接对 HoodieRecord进行回调处理。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
19天前
|
存储 缓存 关系型数据库
图解MySQL【日志】——Redo Log
Redo Log(重做日志)是数据库中用于记录数据页修改的物理日志,确保事务的持久性和一致性。其主要作用包括崩溃恢复、提高性能和保证事务一致性。Redo Log 通过先写日志的方式,在内存中缓存修改操作,并在适当时候刷入磁盘,减少随机写入带来的性能损耗。WAL(Write-Ahead Logging)技术的核心思想是先将修改操作记录到日志文件中,再择机写入磁盘,从而实现高效且安全的数据持久化。Redo Log 的持久化过程涉及 Redo Log Buffer 和不同刷盘时机的控制参数(如 `innodb_flush_log_at_trx_commit`),以平衡性能与数据安全性。
28 5
图解MySQL【日志】——Redo Log
|
1月前
|
存储 SQL 关系型数据库
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
107 35
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
|
2天前
|
缓存 Java 编译器
|
10天前
|
SQL 存储 自然语言处理
让跨 project 联查更轻松,SLS StoreView 查询和分析实践
让跨 project 联查更轻松,SLS StoreView 查询和分析实践
|
2月前
|
机器学习/深度学习 人工智能 运维
智能日志分析:用AI点亮运维的未来
智能日志分析:用AI点亮运维的未来
342 15
|
13天前
|
SQL 分布式计算 Serverless
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
21天前
|
存储 关系型数据库 MySQL
图解MySQL【日志】——Undo Log
Undo Log(回滚日志)是 MySQL 中用于实现事务原子性和一致性的关键机制。在默认的自动提交模式下,MySQL 隐式开启事务,每条增删改语句都会记录到 Undo Log 中。其主要作用包括:
32 0
|
SQL 数据采集 监控
基于日志服务数据加工分析Java异常日志
采集并脱敏了整个5月份的项目异常日志,准备使用日志服务数据加工做数据清洗以及分析。本案例是基于使用阿里云相关产品(OSS,RDS,SLS等)的SDK展开自身业务。需要对异常日志做解析,将原始日志中时间、错误码、错误信息、状态码、产品信息、请求方法、出错行号提取出来。然后根据提取出来的不同产品信息做多目标分发处理。对清洗后的数据做异常日志数据分析。
888 0
基于日志服务数据加工分析Java异常日志
|
4月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
1181 31
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
3月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。