Hudi Log日志文件写入分析(二)

简介: Hudi Log日志文件写入分析(二)

1. 介绍

前面介绍了log日志文件的相关类,接着分析记录写入log日志文件的具体实现。

2. 分析

写日志文件的入口在 HoodieMergeOnReadTable#handleUpdate,其核心代码如下

public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
      throws IOException {
    logger.info("Merging updates for commit " + commitTime + " for file " + fileId);
    if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
      logger.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId);
      return super.handleUpdate(commitTime, fileId, recordItr);
    } else {
      // 写日志文件的入口
      HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
      appendHandle.doAppend();
      appendHandle.close();
      return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
    }
  }

在处理 update时,如果日志文件不支持索引或者文件不是小文件,则会使用 HoodieApppendHandle#doAppend处理,其核心代码如下

public void doAppend() {
    while (recordItr.hasNext()) {
      // 获取记录
      HoodieRecord record = recordItr.next();
      // 初始化
      init(record);
      // 刷盘
      flushToDiskIfRequired(record);
      // 写入缓存
      writeToBuffer(record);
    }
    doAppend(header);
    estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
  }

有记录存在,则会进行初始化(init),初始化包括统计信息的初始化、HoodieLogFormatWriter的初始化等。

调用 flushToDiskIfRequired进行刷盘处理,其核心代码如下

private void flushToDiskIfRequired(HoodieRecord record) {
    // 当前记录条数大于等于block块可以存的最大记录条数
    if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
      // 重新计算记录的平均大小
      averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
      // append写入
      doAppend(header);
      estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
      // 重置当前记录条数
      numberOfRecords = 0;
    }
  }

Hudi会估算数据块中可以存放多少条记录,然后当已缓存的记录条数大于等于当前块中可存放的条数时,会将其append写入。

调用 writerToBuffer将记录缓存起来,其核心代码如下

private void writeToBuffer(HoodieRecord<T> record) {
    // 获取IndexedRecord便于写入log文件
    Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
    if (indexedRecord.isPresent()) { // 存在表示是新插入的记录
      recordList.add(indexedRecord.get());
    } else { // 不存在表示需要删除
      keysToDelete.add(record.getKey());
    }
    numberOfRecords++;
  }

可以看到其会保存需要插入或者删除的记录。

使用 doAppend写入日志文件,其核心代码如下

private void doAppend(Map<HeaderMetadataType, String> header) {
    try {
      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
      if (recordList.size() > 0) { // 新插入的记录不为空
        // 使用Writer写入Data类型的Block
        writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
        // 清空缓存
        recordList.clear();
      }
      if (keysToDelete.size() > 0) { // 删除的记录不为空
        // 使用Writer写入Delete类型的Block
        writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header));
        // 清空缓存
        keysToDelete.clear();
      }
    } catch (Exception e) {
      throw new HoodieAppendException("Failed while appending records to " + currentLogFile.getPath(), e);
    }
  }

可以看到,该方法会将缓存的记录和头部信息(时间、schema信息)组装成 HoodieLogBlock后写入日志,其中 appendBlock核心代码如下

public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException {
    // Find current version
    HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
        new HoodieLogFormatVersion(HoodieLogFormat.currentVersion);
    long currentSize = this.output.size();
    // 1. Write the magic header for the start of the block
    this.output.write(HoodieLogFormat.MAGIC);
    // bytes for header
    byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
    // content bytes
    byte[] content = block.getContentBytes();
    // bytes for footer
    byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
    // 2. Write the total size of the block (excluding Magic)
    this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));
    // 3. Write the version of this log block
    this.output.writeInt(currentLogFormatVersion.getVersion());
    // 4. Write the block type
    this.output.writeInt(block.getBlockType().ordinal());
    // 5. Write the headers for the log block
    this.output.write(headerBytes);
    // 6. Write the size of the content block
    this.output.writeLong(content.length);
    // 7. Write the contents of the data block
    this.output.write(content);
    // 8. Write the footers for the log block
    this.output.write(footerBytes);
    // 9. Write the total size of the log block (including magic) which is everything written
    // until now (for reverse pointer)
    this.output.writeLong(this.output.size() - currentSize);
    // Flush every block to disk
    flush();
    // roll over if size is past the threshold
    return rolloverIfNeeded();
  }

可以看到,对于Block块的写入,顺序如下

  • 写入MAGIC( hudi);
  • 写入Block块的大小;
  • 写入版本号;
  • 写入 Block的类型;
  • 写入头部;
  • 写入数据内容;
  • 写入尾部;
  • 写入本次写数据的总大小;

调用 flush将数据写入文件,如果需要可能会滚动至下个日志文件,其中 rolloverIfNeeded核心代码如下

private Writer rolloverIfNeeded() throws IOException, InterruptedException {
    if (getCurrentSize() > sizeThreshold) { // 当前大小大于阈值(512M)
      // 生成一个新的日志文件,版本号+1
      HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
      // 关闭当前的Writer,会再次触发写文件
      close();
      // 返回新文件对应的Writer
      return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold, logWriteToken,
          rolloverLogWriteToken);
    }
    // 不需要滚动,直接返回
    return this;
  }

可以看到,若当前写入的文件大小大于配置的阈值时会滚动到下个版本的新文件,并返回新文件对应的Writer继续写入。另外对于 HoodieLogFormatWriter的初始化,其首先会判断当前文件是否存在,若存在,进一步判断该文件的 FileSystem是否支持 Append,现在只有HDFS、MAPRFS、IGNITE、VIEWFS文件系统支持Append,若支持,则接着Append,若不支持,则滚动到下个新文件写入;若不存在,则直接创建新文件写入。

3. 总结

对于日志文件的写入,Hudi采用基于 HoodieLogBlock为单元的写入粒度,其策略是先将记录缓存至内存,然后再批量构造成 Block后写入日志文件,而对于 Block的头部、实际内容、尾部的写入采用了指定的顺序,并且采用了自动滚动日志文件的方式写入(当日志文件大小达到指定配置大小时自动滚动到下一个文件继续写入)。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
3天前
|
机器学习/深度学习 前端开发 数据挖掘
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断(下)
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
74 11
|
4天前
|
Java
log4j异常日志过滤规则配置
log4j异常日志过滤规则配置
15 0
|
8天前
|
运维 Oracle 关系型数据库
Oracle日志文件:数据王国的“记事本”
【4月更文挑战第19天】Oracle日志文件是数据库稳定运行的关键,记录数据变更历史,用于恢复和故障处理。它们协调并发操作,确保数据一致性和完整性。日志文件实时写入操作信息并定期刷新到磁盘,便于数据恢复。然而,日志文件需备份和归档以保证安全性,防止数据丢失。日志文件,数据王国的“记事本”,默默守护数据安全。
|
9天前
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断2
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
15 0
|
9天前
|
机器学习/深度学习 前端开发 数据挖掘
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
39 0
|
17天前
|
运维 安全 Ubuntu
`/var/log/syslog` 和 `/var/log/messages` 日志详解
`/var/log/syslog` 和 `/var/log/messages` 是Linux系统的日志文件,分别在Debian和Red Hat系发行版中记录系统事件和错误。它们包含时间戳、日志级别、PID及消息内容,由`rsyslog`等守护进程管理。常用命令如`tail`和`grep`用于查看和搜索日志。日志级别从低到高包括`debug`到`emerg`,表示不同严重程度的信息。注意保护日志文件的安全,防止未授权访问,并定期使用`logrotate`进行文件轮转以管理磁盘空间。
24 1
|
24天前
|
Java
使用Java代码打印log日志
使用Java代码打印log日志
81 1
|
26天前
|
Linux Shell
Linux手动清理Linux脚本日志定时清理日志和log文件执行表达式
Linux手动清理Linux脚本日志定时清理日志和log文件执行表达式
78 1
|
30天前
|
SQL 关系型数据库 MySQL
MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复
对于MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复。二进制日志是MySQL中记录所有数据库更改操作的日志文件。要进行时间点恢复,您需要执行以下步骤: 1. 确保MySQL配置文件中启用了二进制日志功能。在配置文件(通常是my.cnf或my.ini)中找到以下行,并确保没有被注释掉: Copy code log_bin = /path/to/binary/log/file 2. 在需要进行恢复的时间点之前创建一个数据库备份。这将作为恢复的基准。 3. 找到您要恢复到的时间点的二进制日志文件和位置。可以通过执行以下命令来查看当前的二进制日志文件和位
100 1
|
1月前
|
监控 Shell Linux
【Shell 命令集合 系统管理 】Linux 自动轮转(log rotation)日志文件 logrotate命令 使用指南
【Shell 命令集合 系统管理 】Linux 自动轮转(log rotation)日志文件 logrotate命令 使用指南
51 0