Hudi Log日志文件写入分析(二)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Hudi Log日志文件写入分析(二)

1. 介绍

前面介绍了log日志文件的相关类,接着分析记录写入log日志文件的具体实现。

2. 分析

写日志文件的入口在 HoodieMergeOnReadTable#handleUpdate,其核心代码如下

public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
      throws IOException {
    logger.info("Merging updates for commit " + commitTime + " for file " + fileId);
    if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
      logger.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId);
      return super.handleUpdate(commitTime, fileId, recordItr);
    } else {
      // 写日志文件的入口
      HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
      appendHandle.doAppend();
      appendHandle.close();
      return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
    }
  }

在处理 update时,如果日志文件不支持索引或者文件不是小文件,则会使用 HoodieApppendHandle#doAppend处理,其核心代码如下

public void doAppend() {
    while (recordItr.hasNext()) {
      // 获取记录
      HoodieRecord record = recordItr.next();
      // 初始化
      init(record);
      // 刷盘
      flushToDiskIfRequired(record);
      // 写入缓存
      writeToBuffer(record);
    }
    doAppend(header);
    estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
  }

有记录存在,则会进行初始化(init),初始化包括统计信息的初始化、HoodieLogFormatWriter的初始化等。

调用 flushToDiskIfRequired进行刷盘处理,其核心代码如下

private void flushToDiskIfRequired(HoodieRecord record) {
    // 当前记录条数大于等于block块可以存的最大记录条数
    if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
      // 重新计算记录的平均大小
      averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
      // append写入
      doAppend(header);
      estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
      // 重置当前记录条数
      numberOfRecords = 0;
    }
  }

Hudi会估算数据块中可以存放多少条记录,然后当已缓存的记录条数大于等于当前块中可存放的条数时,会将其append写入。

调用 writerToBuffer将记录缓存起来,其核心代码如下

private void writeToBuffer(HoodieRecord<T> record) {
    // 获取IndexedRecord便于写入log文件
    Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
    if (indexedRecord.isPresent()) { // 存在表示是新插入的记录
      recordList.add(indexedRecord.get());
    } else { // 不存在表示需要删除
      keysToDelete.add(record.getKey());
    }
    numberOfRecords++;
  }

可以看到其会保存需要插入或者删除的记录。

使用 doAppend写入日志文件,其核心代码如下

private void doAppend(Map<HeaderMetadataType, String> header) {
    try {
      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
      if (recordList.size() > 0) { // 新插入的记录不为空
        // 使用Writer写入Data类型的Block
        writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
        // 清空缓存
        recordList.clear();
      }
      if (keysToDelete.size() > 0) { // 删除的记录不为空
        // 使用Writer写入Delete类型的Block
        writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header));
        // 清空缓存
        keysToDelete.clear();
      }
    } catch (Exception e) {
      throw new HoodieAppendException("Failed while appending records to " + currentLogFile.getPath(), e);
    }
  }

可以看到,该方法会将缓存的记录和头部信息(时间、schema信息)组装成 HoodieLogBlock后写入日志,其中 appendBlock核心代码如下

public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException {
    // Find current version
    HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
        new HoodieLogFormatVersion(HoodieLogFormat.currentVersion);
    long currentSize = this.output.size();
    // 1. Write the magic header for the start of the block
    this.output.write(HoodieLogFormat.MAGIC);
    // bytes for header
    byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
    // content bytes
    byte[] content = block.getContentBytes();
    // bytes for footer
    byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
    // 2. Write the total size of the block (excluding Magic)
    this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));
    // 3. Write the version of this log block
    this.output.writeInt(currentLogFormatVersion.getVersion());
    // 4. Write the block type
    this.output.writeInt(block.getBlockType().ordinal());
    // 5. Write the headers for the log block
    this.output.write(headerBytes);
    // 6. Write the size of the content block
    this.output.writeLong(content.length);
    // 7. Write the contents of the data block
    this.output.write(content);
    // 8. Write the footers for the log block
    this.output.write(footerBytes);
    // 9. Write the total size of the log block (including magic) which is everything written
    // until now (for reverse pointer)
    this.output.writeLong(this.output.size() - currentSize);
    // Flush every block to disk
    flush();
    // roll over if size is past the threshold
    return rolloverIfNeeded();
  }

可以看到,对于Block块的写入,顺序如下

  • 写入MAGIC( hudi);
  • 写入Block块的大小;
  • 写入版本号;
  • 写入 Block的类型;
  • 写入头部;
  • 写入数据内容;
  • 写入尾部;
  • 写入本次写数据的总大小;

调用 flush将数据写入文件,如果需要可能会滚动至下个日志文件,其中 rolloverIfNeeded核心代码如下

private Writer rolloverIfNeeded() throws IOException, InterruptedException {
    if (getCurrentSize() > sizeThreshold) { // 当前大小大于阈值(512M)
      // 生成一个新的日志文件,版本号+1
      HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
      // 关闭当前的Writer,会再次触发写文件
      close();
      // 返回新文件对应的Writer
      return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold, logWriteToken,
          rolloverLogWriteToken);
    }
    // 不需要滚动,直接返回
    return this;
  }

可以看到,若当前写入的文件大小大于配置的阈值时会滚动到下个版本的新文件,并返回新文件对应的Writer继续写入。另外对于 HoodieLogFormatWriter的初始化,其首先会判断当前文件是否存在,若存在,进一步判断该文件的 FileSystem是否支持 Append,现在只有HDFS、MAPRFS、IGNITE、VIEWFS文件系统支持Append,若支持,则接着Append,若不支持,则滚动到下个新文件写入;若不存在,则直接创建新文件写入。

3. 总结

对于日志文件的写入,Hudi采用基于 HoodieLogBlock为单元的写入粒度,其策略是先将记录缓存至内存,然后再批量构造成 Block后写入日志文件,而对于 Block的头部、实际内容、尾部的写入采用了指定的顺序,并且采用了自动滚动日志文件的方式写入(当日志文件大小达到指定配置大小时自动滚动到下一个文件继续写入)。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
1月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
|
17天前
|
SQL 关系型数据库 MySQL
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
MySQL事务日志-Undo Log工作原理分析
|
24天前
|
存储 运维 监控
Linux--深入理与解linux文件系统与日志文件分析
深入理解 Linux 文件系统和日志文件分析,对于系统管理员和运维工程师来说至关重要。文件系统管理涉及到文件的组织、存储和检索,而日志文件则记录了系统和应用的运行状态,是排查故障和维护系统的重要依据。通过掌握文件系统和日志文件的管理和分析技能,可以有效提升系统的稳定性和安全性。
45 7
|
27天前
|
监控 安全 Linux
启用Linux防火墙日志记录和分析功能
为iptables启用日志记录对于监控进出流量至关重要
|
1月前
|
监控 应用服务中间件 定位技术
要统计Nginx的客户端IP,可以通过分析Nginx的访问日志文件来实现
要统计Nginx的客户端IP,可以通过分析Nginx的访问日志文件来实现
142 3
|
2月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
723 31
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
3月前
|
XML JSON Java
Logback 与 log4j2 性能对比:谁才是日志框架的性能王者?
【10月更文挑战第5天】在Java开发中,日志框架是不可或缺的工具,它们帮助我们记录系统运行时的信息、警告和错误,对于开发人员来说至关重要。在众多日志框架中,Logback和log4j2以其卓越的性能和丰富的功能脱颖而出,成为开发者们的首选。本文将深入探讨Logback与log4j2在性能方面的对比,通过详细的分析和实例,帮助大家理解两者之间的性能差异,以便在实际项目中做出更明智的选择。
406 3
|
1月前
|
存储 监控 安全
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
|
3月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1785 14
MySQL事务日志-Redo Log工作原理分析
|
2月前
|
存储 监控 安全
什么是日志管理,如何进行日志管理?
日志管理是对IT系统生成的日志数据进行收集、存储、分析和处理的实践,对维护系统健康、确保安全及获取运营智能至关重要。本文介绍了日志管理的基本概念、常见挑战、工具的主要功能及选择解决方案的方法,强调了定义管理目标、日志收集与分析、警报和报告、持续改进等关键步骤,以及如何应对数据量大、安全问题、警报疲劳等挑战,最终实现日志数据的有效管理和利用。
223 0