Hudi Log日志文件写入分析(二)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Hudi Log日志文件写入分析(二)

1. 介绍

前面介绍了log日志文件的相关类,接着分析记录写入log日志文件的具体实现。

2. 分析

写日志文件的入口在 HoodieMergeOnReadTable#handleUpdate,其核心代码如下

public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
      throws IOException {
    logger.info("Merging updates for commit " + commitTime + " for file " + fileId);
    if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
      logger.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId);
      return super.handleUpdate(commitTime, fileId, recordItr);
    } else {
      // 写日志文件的入口
      HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
      appendHandle.doAppend();
      appendHandle.close();
      return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
    }
  }

在处理 update时,如果日志文件不支持索引或者文件不是小文件,则会使用 HoodieApppendHandle#doAppend处理,其核心代码如下

public void doAppend() {
    while (recordItr.hasNext()) {
      // 获取记录
      HoodieRecord record = recordItr.next();
      // 初始化
      init(record);
      // 刷盘
      flushToDiskIfRequired(record);
      // 写入缓存
      writeToBuffer(record);
    }
    doAppend(header);
    estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
  }

有记录存在,则会进行初始化(init),初始化包括统计信息的初始化、HoodieLogFormatWriter的初始化等。

调用 flushToDiskIfRequired进行刷盘处理,其核心代码如下

private void flushToDiskIfRequired(HoodieRecord record) {
    // 当前记录条数大于等于block块可以存的最大记录条数
    if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
      // 重新计算记录的平均大小
      averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
      // append写入
      doAppend(header);
      estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
      // 重置当前记录条数
      numberOfRecords = 0;
    }
  }

Hudi会估算数据块中可以存放多少条记录,然后当已缓存的记录条数大于等于当前块中可存放的条数时,会将其append写入。

调用 writerToBuffer将记录缓存起来,其核心代码如下

private void writeToBuffer(HoodieRecord<T> record) {
    // 获取IndexedRecord便于写入log文件
    Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
    if (indexedRecord.isPresent()) { // 存在表示是新插入的记录
      recordList.add(indexedRecord.get());
    } else { // 不存在表示需要删除
      keysToDelete.add(record.getKey());
    }
    numberOfRecords++;
  }

可以看到其会保存需要插入或者删除的记录。

使用 doAppend写入日志文件,其核心代码如下

private void doAppend(Map<HeaderMetadataType, String> header) {
    try {
      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
      if (recordList.size() > 0) { // 新插入的记录不为空
        // 使用Writer写入Data类型的Block
        writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
        // 清空缓存
        recordList.clear();
      }
      if (keysToDelete.size() > 0) { // 删除的记录不为空
        // 使用Writer写入Delete类型的Block
        writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header));
        // 清空缓存
        keysToDelete.clear();
      }
    } catch (Exception e) {
      throw new HoodieAppendException("Failed while appending records to " + currentLogFile.getPath(), e);
    }
  }

可以看到,该方法会将缓存的记录和头部信息(时间、schema信息)组装成 HoodieLogBlock后写入日志,其中 appendBlock核心代码如下

public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException {
    // Find current version
    HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
        new HoodieLogFormatVersion(HoodieLogFormat.currentVersion);
    long currentSize = this.output.size();
    // 1. Write the magic header for the start of the block
    this.output.write(HoodieLogFormat.MAGIC);
    // bytes for header
    byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
    // content bytes
    byte[] content = block.getContentBytes();
    // bytes for footer
    byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
    // 2. Write the total size of the block (excluding Magic)
    this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));
    // 3. Write the version of this log block
    this.output.writeInt(currentLogFormatVersion.getVersion());
    // 4. Write the block type
    this.output.writeInt(block.getBlockType().ordinal());
    // 5. Write the headers for the log block
    this.output.write(headerBytes);
    // 6. Write the size of the content block
    this.output.writeLong(content.length);
    // 7. Write the contents of the data block
    this.output.write(content);
    // 8. Write the footers for the log block
    this.output.write(footerBytes);
    // 9. Write the total size of the log block (including magic) which is everything written
    // until now (for reverse pointer)
    this.output.writeLong(this.output.size() - currentSize);
    // Flush every block to disk
    flush();
    // roll over if size is past the threshold
    return rolloverIfNeeded();
  }

可以看到,对于Block块的写入,顺序如下

  • 写入MAGIC( hudi);
  • 写入Block块的大小;
  • 写入版本号;
  • 写入 Block的类型;
  • 写入头部;
  • 写入数据内容;
  • 写入尾部;
  • 写入本次写数据的总大小;

调用 flush将数据写入文件,如果需要可能会滚动至下个日志文件,其中 rolloverIfNeeded核心代码如下

private Writer rolloverIfNeeded() throws IOException, InterruptedException {
    if (getCurrentSize() > sizeThreshold) { // 当前大小大于阈值(512M)
      // 生成一个新的日志文件,版本号+1
      HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
      // 关闭当前的Writer,会再次触发写文件
      close();
      // 返回新文件对应的Writer
      return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold, logWriteToken,
          rolloverLogWriteToken);
    }
    // 不需要滚动,直接返回
    return this;
  }

可以看到,若当前写入的文件大小大于配置的阈值时会滚动到下个版本的新文件,并返回新文件对应的Writer继续写入。另外对于 HoodieLogFormatWriter的初始化,其首先会判断当前文件是否存在,若存在,进一步判断该文件的 FileSystem是否支持 Append,现在只有HDFS、MAPRFS、IGNITE、VIEWFS文件系统支持Append,若支持,则接着Append,若不支持,则滚动到下个新文件写入;若不存在,则直接创建新文件写入。

3. 总结

对于日志文件的写入,Hudi采用基于 HoodieLogBlock为单元的写入粒度,其策略是先将记录缓存至内存,然后再批量构造成 Block后写入日志文件,而对于 Block的头部、实际内容、尾部的写入采用了指定的顺序,并且采用了自动滚动日志文件的方式写入(当日志文件大小达到指定配置大小时自动滚动到下一个文件继续写入)。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
4天前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
86 30
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
30天前
|
XML JSON Java
Logback 与 log4j2 性能对比:谁才是日志框架的性能王者?
【10月更文挑战第5天】在Java开发中,日志框架是不可或缺的工具,它们帮助我们记录系统运行时的信息、警告和错误,对于开发人员来说至关重要。在众多日志框架中,Logback和log4j2以其卓越的性能和丰富的功能脱颖而出,成为开发者们的首选。本文将深入探讨Logback与log4j2在性能方面的对比,通过详细的分析和实例,帮助大家理解两者之间的性能差异,以便在实际项目中做出更明智的选择。
188 3
|
1月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1611 14
|
29天前
|
SQL 数据库
为什么 SQL 日志文件很大,我应该如何处理?
为什么 SQL 日志文件很大,我应该如何处理?
|
26天前
|
Python
log日志学习
【10月更文挑战第9天】 python处理log打印模块log的使用和介绍
25 0
|
28天前
|
数据可视化
Tensorboard可视化学习笔记(一):如何可视化通过网页查看log日志
关于如何使用TensorBoard进行数据可视化的教程,包括TensorBoard的安装、配置环境变量、将数据写入TensorBoard、启动TensorBoard以及如何通过网页查看日志文件。
158 0
|
3月前
|
Kubernetes Ubuntu Windows
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)
129 3
|
1月前
|
存储 分布式计算 NoSQL
大数据-136 - ClickHouse 集群 表引擎详解1 - 日志、Log、Memory、Merge
大数据-136 - ClickHouse 集群 表引擎详解1 - 日志、Log、Memory、Merge
35 0
|
1月前
|
缓存 Linux 编译器
【C++】CentOS环境搭建-安装log4cplus日志组件包及报错解决方案
通过上述步骤,您应该能够在CentOS环境中成功安装并使用log4cplus日志组件。面对任何安装或使用过程中出现的问题,仔细检查错误信息,对照提供的解决方案进行调整,通常都能找到合适的解决之道。log4cplus的强大功能将为您的项目提供灵活、高效的日志管理方案,助力软件开发与维护。
52 0
|
2月前
|
Java
日志框架log4j打印异常堆栈信息携带traceId,方便接口异常排查
日常项目运行日志,异常栈打印是不带traceId,导致排查问题查找异常栈很麻烦。
下一篇
无影云桌面