Hudi Log日志文件写入分析(二)

简介: Hudi Log日志文件写入分析(二)

1. 介绍

前面介绍了log日志文件的相关类,接着分析记录写入log日志文件的具体实现。

2. 分析

写日志文件的入口在 HoodieMergeOnReadTable#handleUpdate,其核心代码如下

public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
      throws IOException {
    logger.info("Merging updates for commit " + commitTime + " for file " + fileId);
    if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
      logger.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId);
      return super.handleUpdate(commitTime, fileId, recordItr);
    } else {
      // 写日志文件的入口
      HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
      appendHandle.doAppend();
      appendHandle.close();
      return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
    }
  }

在处理 update时,如果日志文件不支持索引或者文件不是小文件,则会使用 HoodieApppendHandle#doAppend处理,其核心代码如下

public void doAppend() {
    while (recordItr.hasNext()) {
      // 获取记录
      HoodieRecord record = recordItr.next();
      // 初始化
      init(record);
      // 刷盘
      flushToDiskIfRequired(record);
      // 写入缓存
      writeToBuffer(record);
    }
    doAppend(header);
    estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
  }

有记录存在,则会进行初始化(init),初始化包括统计信息的初始化、HoodieLogFormatWriter的初始化等。

调用 flushToDiskIfRequired进行刷盘处理,其核心代码如下

private void flushToDiskIfRequired(HoodieRecord record) {
    // 当前记录条数大于等于block块可以存的最大记录条数
    if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
      // 重新计算记录的平均大小
      averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
      // append写入
      doAppend(header);
      estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
      // 重置当前记录条数
      numberOfRecords = 0;
    }
  }

Hudi会估算数据块中可以存放多少条记录,然后当已缓存的记录条数大于等于当前块中可存放的条数时,会将其append写入。

调用 writerToBuffer将记录缓存起来,其核心代码如下

private void writeToBuffer(HoodieRecord<T> record) {
    // 获取IndexedRecord便于写入log文件
    Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
    if (indexedRecord.isPresent()) { // 存在表示是新插入的记录
      recordList.add(indexedRecord.get());
    } else { // 不存在表示需要删除
      keysToDelete.add(record.getKey());
    }
    numberOfRecords++;
  }

可以看到其会保存需要插入或者删除的记录。

使用 doAppend写入日志文件,其核心代码如下

private void doAppend(Map<HeaderMetadataType, String> header) {
    try {
      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
      if (recordList.size() > 0) { // 新插入的记录不为空
        // 使用Writer写入Data类型的Block
        writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
        // 清空缓存
        recordList.clear();
      }
      if (keysToDelete.size() > 0) { // 删除的记录不为空
        // 使用Writer写入Delete类型的Block
        writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header));
        // 清空缓存
        keysToDelete.clear();
      }
    } catch (Exception e) {
      throw new HoodieAppendException("Failed while appending records to " + currentLogFile.getPath(), e);
    }
  }

可以看到,该方法会将缓存的记录和头部信息(时间、schema信息)组装成 HoodieLogBlock后写入日志,其中 appendBlock核心代码如下

public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException {
    // Find current version
    HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
        new HoodieLogFormatVersion(HoodieLogFormat.currentVersion);
    long currentSize = this.output.size();
    // 1. Write the magic header for the start of the block
    this.output.write(HoodieLogFormat.MAGIC);
    // bytes for header
    byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
    // content bytes
    byte[] content = block.getContentBytes();
    // bytes for footer
    byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
    // 2. Write the total size of the block (excluding Magic)
    this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));
    // 3. Write the version of this log block
    this.output.writeInt(currentLogFormatVersion.getVersion());
    // 4. Write the block type
    this.output.writeInt(block.getBlockType().ordinal());
    // 5. Write the headers for the log block
    this.output.write(headerBytes);
    // 6. Write the size of the content block
    this.output.writeLong(content.length);
    // 7. Write the contents of the data block
    this.output.write(content);
    // 8. Write the footers for the log block
    this.output.write(footerBytes);
    // 9. Write the total size of the log block (including magic) which is everything written
    // until now (for reverse pointer)
    this.output.writeLong(this.output.size() - currentSize);
    // Flush every block to disk
    flush();
    // roll over if size is past the threshold
    return rolloverIfNeeded();
  }

可以看到,对于Block块的写入,顺序如下

  • 写入MAGIC( hudi);
  • 写入Block块的大小;
  • 写入版本号;
  • 写入 Block的类型;
  • 写入头部;
  • 写入数据内容;
  • 写入尾部;
  • 写入本次写数据的总大小;

调用 flush将数据写入文件,如果需要可能会滚动至下个日志文件,其中 rolloverIfNeeded核心代码如下

private Writer rolloverIfNeeded() throws IOException, InterruptedException {
    if (getCurrentSize() > sizeThreshold) { // 当前大小大于阈值(512M)
      // 生成一个新的日志文件,版本号+1
      HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
      // 关闭当前的Writer,会再次触发写文件
      close();
      // 返回新文件对应的Writer
      return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold, logWriteToken,
          rolloverLogWriteToken);
    }
    // 不需要滚动,直接返回
    return this;
  }

可以看到,若当前写入的文件大小大于配置的阈值时会滚动到下个版本的新文件,并返回新文件对应的Writer继续写入。另外对于 HoodieLogFormatWriter的初始化,其首先会判断当前文件是否存在,若存在,进一步判断该文件的 FileSystem是否支持 Append,现在只有HDFS、MAPRFS、IGNITE、VIEWFS文件系统支持Append,若支持,则接着Append,若不支持,则滚动到下个新文件写入;若不存在,则直接创建新文件写入。

3. 总结

对于日志文件的写入,Hudi采用基于 HoodieLogBlock为单元的写入粒度,其策略是先将记录缓存至内存,然后再批量构造成 Block后写入日志文件,而对于 Block的头部、实际内容、尾部的写入采用了指定的顺序,并且采用了自动滚动日志文件的方式写入(当日志文件大小达到指定配置大小时自动滚动到下一个文件继续写入)。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
1天前
|
Oracle 关系型数据库 分布式数据库
实时计算 Flink版产品使用合集之日志文件快速增长如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
16 2
|
1天前
|
存储 关系型数据库 数据库
关系型数据库文件方式存储LOG FILE(日志文件)
【5月更文挑战第11天】关系型数据库文件方式存储LOG FILE(日志文件)
6 1
|
3天前
|
JavaScript Java API
【JavaEE】Spring Boot - 日志文件
【JavaEE】Spring Boot - 日志文件
6 0
|
3天前
|
Oracle 关系型数据库
|
3天前
|
存储 监控 NoSQL
【MongoDB 专栏】MongoDB 的日志管理与分析
【5月更文挑战第11天】MongoDB日志管理与分析至关重要,包括系统日志和操作日志,用于监控、故障排查和性能优化。合理配置日志详细程度、存储位置和保留策略,使用日志分析工具提升效率,发现性能瓶颈和安全性问题。日志分析有助于优化查询、调整配置,确保数据安全,并可与其他监控系统集成。面对日志量增长的挑战,需采用新技术如分布式存储和数据压缩来保障存储和传输。随着技术发展,不断进化日志管理与分析能力,以支持MongoDB的稳定高效运行。
【MongoDB 专栏】MongoDB 的日志管理与分析
|
3天前
|
Go 文件存储 iOS开发
LabVIEW崩溃后所产生的错误日志文件的位置
LabVIEW崩溃后所产生的错误日志文件的位置
11 0
|
1天前
|
关系型数据库 MySQL 数据库
mysql数据库bin-log日志管理
mysql数据库bin-log日志管理
|
2天前
|
运维 监控 安全
Java一分钟之-Log4j与日志记录的重要性
【5月更文挑战第16天】Log4j是Java常用的日志框架,用于灵活地记录程序状态和调试问题。通过设置日志级别和过滤器,可避免日志输出混乱。为防止日志文件过大,可配置滚动策略。关注日志安全性,如Log4j 2.x的CVE-2021-44228漏洞,及时更新至安全版本。合理使用日志能提升故障排查和系统监控效率。
14 0
|
3天前
|
C++
JNI Log 日志输出
JNI Log 日志输出
19 1
|
3天前
|
存储 运维 大数据
聊聊日志硬扫描,阿里 Log Scan 的设计与实践
泛日志(Log/Trace/Metric)是大数据的重要组成,伴随着每一年业务峰值的新脉冲,日志数据量在快速增长。同时,业务数字化运营、软件可观测性等浪潮又在对日志的存储、计算提出更高的要求。

热门文章

最新文章