PalDB 写数据过程

简介: 开篇PalDB 介绍PalDB 写数据过程PalDB 读数据过程PalDB 线程安全版本PalDB写数据DemoStoreWriter writer = PalDB.createWriter(new File("store.paldb"));writer.put("foo", "bar");writer.put(1213, new int[] {1, 2, 3});writer.close();PalDB写数据流程 PalDB写入过程主要分为2个阶段:kv的写入,PalDB文件的生成。

开篇


PalDB写数据Demo

StoreWriter writer = PalDB.createWriter(new File("store.paldb"));
writer.put("foo", "bar");
writer.put(1213, new int[] {1, 2, 3});
writer.close();


PalDB写数据流程

 PalDB写入过程主要分为2个阶段:kv的写入,PalDB文件的生成。PalDB在整个文件索引生成的整个过程包括:

  • 写入key到index的临时文件
  • 写入value到data的临时文件
  • 准备合并数据并生成meta元素的临时文件
  • 合并meta元素的临时文件、key的临时文件,value的临时文件到最终的PalDB文件。
img_9e496b71c251441eb05538b4dbe9c979.png
PalDB写数据流程


PalDB写数据相关数据结构

 PalDB在写入过程中会把数据临时存在内存当中,存放的方式也挺新颖的。它在内部维护了一个数组,数组的大小由最大key的长度确定,譬如你的key的最大长度为6,那么数组的大小就是6,每个数组维护这个长度的key所有信息,如下标为5的数组元素维护着所有key长度为5的信息。下列的数据结构都是按照这个方法来进行存储的。

  • File[] indexFiles 维护key对应的index文件
  • DataOutputStream[] indexStreams 维护key对应文件流
  • private File[] dataFiles 维护value对应的data文件
  • DataOutputStream[] dataStreams 维护value对应的data文件流
  • int[] maxOffsetLengths 维护index指向data的位移的存储字段类型
  • int[] keyCounts 维护对应key的数量
img_504f1d4908f4d1b8cdaddf83e9cbd14f.png
PalDB临时内存数据存储
public class StorageWriter {

  // Configuration
  private final Configuration config;
  private final double loadFactor;
  // Output
  private final File tempFolder;
  private final OutputStream outputStream;
  // Index stream
  private File[] indexFiles;
  private DataOutputStream[] indexStreams;
  // Data stream
  private File[] dataFiles;
  private DataOutputStream[] dataStreams;
  // Cache last value
  private byte[][] lastValues;
  private int[] lastValuesLength;
  // Data length
  private long[] dataLengths;
  // Index length
  private long indexesLength;
  // Max offset length
  private int[] maxOffsetLengths;
  // Number of keys
  private int keyCount;
  private int[] keyCounts;
  // Number of values
  private int valueCount;
  // Number of collisions
  private int collisions;

  private HashUtils hashUtils;


PalDB写数据流程源码解析

写入kv过程

 kv的写入过程主要包括key的写入过程和value的写入过程,整体流程如下:

  • 通过getIndexStream获取长度为keyLength对应的key的输入流。
  • 通过indexStream.write(key)往index的输入流写入key。
  • 通过LongPacker.packLong(indexStream, dataLength)往index的输入流写入key对应data的偏移量,指向dataStream的位移。
  • 通过DataOutputStream dataStream = getDataStream(keyLength)获取长度为keyLength对应的value的输入流
  • 通过LongPacker.packInt(dataStream, value.length)写入value的长度
  • 通过dataStream.write(value);写入value的值

 在整个写入过程中会有一些辅助变量和一些技巧节省内存

  • dataLengths[keyLength]保存长度为keyLength的key下的data的长度
  • lastValues[keyLength]保存长度为keyLength的key上一个保存的value
  • lastValuesLength保存长度为keyLength的key对应value的保存位移
  • 如果keyLength下本次保存的值和lastValue相同,那么只保存上一个偏移量即可
  • 提供了一个节省内存的trick就是相同长度下的key的value按照顺序排列即可
public void put(byte[] key, byte[] value)
      throws IOException {
    int keyLength = key.length;

    //Get the Output stream for that keyLength, each key length has its own file
    // 获取指定长度的key对应的输出流
    DataOutputStream indexStream = getIndexStream(keyLength);

    // Write key
    // 写入key
    indexStream.write(key);

    // Check if the value is identical to the last inserted
    // 判断本次数据是否和上次写入数据一致
    byte[] lastValue = lastValues[keyLength];
    boolean sameValue = lastValue != null && Arrays.equals(value, lastValue);

    // Get data stream and length
    // 获取数据的偏移量,
    long dataLength = dataLengths[keyLength];
    if (sameValue) {
      //如果本次写入数据和上次一致,就把dataLength指向上一个value的起始位置
      dataLength -= lastValuesLength[keyLength];
    }

    // Write offset and record max offset length
    //写入key对应value的位移dataLength
    int offsetLength = LongPacker.packLong(indexStream, dataLength);
    maxOffsetLengths[keyLength] = Math.max(offsetLength, maxOffsetLengths[keyLength]);

    // Write if data is not the same
    // 本次数据和上次不一致的情况下开始写入value
    if (!sameValue) {
      // Get stream
      // 获取指定长度key对应的value的输出流
      DataOutputStream dataStream = getDataStream(keyLength);

      // Write size and value
      // 写入value的长度和value的值
      int valueSize = LongPacker.packInt(dataStream, value.length);
      dataStream.write(value);

      // Update data length
      // 更新数据偏移量,这里的偏移量是下一个写入value的偏移量
      dataLengths[keyLength] += valueSize + value.length;

      // Update last value
      //更新上一次写入的value值
      lastValues[keyLength] = value;
      //更新keyLength对应的key的保存的data的长度
      lastValuesLength[keyLength] = valueSize + value.length;

      // value的计数
      valueCount++;
    }

    //更新key的数量
    keyCount++;
    //更新长度为keyLength的key的个数
    keyCounts[keyLength]++;
  }



private DataOutputStream getIndexStream(int keyLength)
      throws IOException {
    // Resize array if necessary
    // 每次按照keyLength进行扩容
    if (indexStreams.length <= keyLength) {
      indexStreams = Arrays.copyOf(indexStreams, keyLength + 1);
      indexFiles = Arrays.copyOf(indexFiles, keyLength + 1);
      keyCounts = Arrays.copyOf(keyCounts, keyLength + 1);
      maxOffsetLengths = Arrays.copyOf(maxOffsetLengths, keyLength + 1);
      lastValues = Arrays.copyOf(lastValues, keyLength + 1);
      lastValuesLength = Arrays.copyOf(lastValuesLength, keyLength + 1);
      dataLengths = Arrays.copyOf(dataLengths, keyLength + 1);
    }

    // Get or create stream
    DataOutputStream dos = indexStreams[keyLength];
    if (dos == null) {
      File file = new File(tempFolder, "temp_index" + keyLength + ".dat");
      file.deleteOnExit();
      indexFiles[keyLength] = file;

      dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
      indexStreams[keyLength] = dos;

      //这里之所以进行++是因为value写入的过程中会先写入1byte的数据,dos.writeByte(0);
      dataLengths[keyLength]++;
    }
    return dos;
  }



private DataOutputStream getDataStream(int keyLength)
      throws IOException {
    // Resize array if necessary
    if (dataStreams.length <= keyLength) {
      dataStreams = Arrays.copyOf(dataStreams, keyLength + 1);
      dataFiles = Arrays.copyOf(dataFiles, keyLength + 1);
    }

    DataOutputStream dos = dataStreams[keyLength];
    if (dos == null) {
      File file = new File(tempFolder, "data" + keyLength + ".dat");
      file.deleteOnExit();
      dataFiles[keyLength] = file;

      dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
      dataStreams[keyLength] = dos;

      // Write one byte so the zero offset is reserved
      dos.writeByte(0);
    }
    return dos;
  }


合并key和value文件生成PalDB文件

 PalDB通过close动作来完成PalDB文件的生成,主要完成的事情包括:

  • 将value对应的输出流刷新到文件
  • 将key对应的输出流刷新到文件
  • 生成元数据文件metadata.dat文件 writeMetadata(metadataDataOutputStream);
  • 将不同长度对应的key对应的index文件进行重建 buildIndex(i)
  • 将metaData、不同长度的key对应的临时文件、不同长度的key下的value对应的临时文件进行合并生成PalDB最终文件
  • mergeFiles方法执行最终的合并动作,做的事情就是将文件按照key的长度的顺序写入一个文件当中
public void close()
      throws IOException {
    // Close the data and index streams
    for (DataOutputStream dos : dataStreams) {
      if (dos != null) {
        dos.close();
      }
    }
    for (DataOutputStream dos : indexStreams) {
      if (dos != null) {
        dos.close();
      }
    }

    // Stats
    LOGGER.log(Level.INFO, "Number of keys: {0}", keyCount);
    LOGGER.log(Level.INFO, "Number of values: {0}", valueCount);

    // Prepare files to merge
    List<File> filesToMerge = new ArrayList<File>();

    try {

      //Write metadata file
      File metadataFile = new File(tempFolder, "metadata.dat");
      metadataFile.deleteOnExit();
      FileOutputStream metadataOututStream = new FileOutputStream(metadataFile);
      DataOutputStream metadataDataOutputStream = new DataOutputStream(metadataOututStream);
      writeMetadata(metadataDataOutputStream);
      metadataDataOutputStream.close();
      metadataOututStream.close();
      filesToMerge.add(metadataFile);

      // Build index file
      for (int i = 0; i < indexFiles.length; i++) {
        if (indexFiles[i] != null) {
          filesToMerge.add(buildIndex(i));
        }
      }

      // Stats collisions
      LOGGER.log(Level.INFO, "Number of collisions: {0}", collisions);

      // Add data files
      for (File dataFile : dataFiles) {
        if (dataFile != null) {
          filesToMerge.add(dataFile);
        }
      }

      // Merge and write to output
      checkFreeDiskSpace(filesToMerge);
      mergeFiles(filesToMerge, outputStream);
    } finally {
      outputStream.close();
      cleanup(filesToMerge);
    }
  }



private void mergeFiles(List<File> inputFiles, OutputStream outputStream)
      throws IOException {
    long startTime = System.nanoTime();

    //Merge files
    for (File f : inputFiles) {
      if (f.exists()) {
        FileInputStream fileInputStream = new FileInputStream(f);
        BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
        try {
          LOGGER.log(Level.INFO, "Merging {0} size={1}", new Object[]{f.getName(), f.length()});

          byte[] buffer = new byte[8192];
          int length;
          while ((length = bufferedInputStream.read(buffer)) > 0) {
            outputStream.write(buffer, 0, length);
          }
        } finally {
          bufferedInputStream.close();
          fileInputStream.close();
        }
      } else {
        LOGGER.log(Level.INFO, "Skip merging file {0} because it doesn't exist", f.getName());
      }
    }
  }


元数据文件生成过程

 元数据的生成过程就把PalDB的索引文件的相关元信息进行写入,具体内容看代码的自解释。核心的包括:

  • 写入key相关的统计信息
  • 写入key的起始位移
  • 写入value的起始位移
private void writeMetadata(DataOutputStream dataOutputStream)
      throws IOException {
    //Write format version
    dataOutputStream.writeUTF(FormatVersion.getLatestVersion().name());

    //Write time
    dataOutputStream.writeLong(System.currentTimeMillis());

    //Prepare
    int keyLengthCount = getNumKeyCount();
    int maxKeyLength = keyCounts.length - 1;

    //Write size (number of keys)
    dataOutputStream.writeInt(keyCount);

    //Write the number of different key length
    dataOutputStream.writeInt(keyLengthCount);

    //Write the max value for keyLength
    dataOutputStream.writeInt(maxKeyLength);

    // For each keyLength
    long datasLength = 0l;
    for (int i = 0; i < keyCounts.length; i++) {
      if (keyCounts[i] > 0) {
        // Write the key length
        dataOutputStream.writeInt(i);

        // Write key count
        dataOutputStream.writeInt(keyCounts[i]);

        // Write slot count
        int slots = (int) Math.round(keyCounts[i] / loadFactor);
        dataOutputStream.writeInt(slots);

        // Write slot size
        int offsetLength = maxOffsetLengths[i];
        dataOutputStream.writeInt(i + offsetLength);

        // Write index offset
        dataOutputStream.writeInt((int) indexesLength);

        // Increment index length
        indexesLength += (i + offsetLength) * slots;

        // Write data length
        dataOutputStream.writeLong(datasLength);

        // Increment data length
        datasLength += dataLengths[i];
      }
    }

    //Write serializers
    try {
      Serializers.serialize(dataOutputStream, config.getSerializers());
    } catch (Exception e) {
      throw new RuntimeException();
    }

    //Write the position of the index and the data
    // 元数据文件+key文件+value文件的顺序,先写入key索引文件起始位移,再写入value的data文件的起始位移。
    int indexOffset = dataOutputStream.size() + (Integer.SIZE / Byte.SIZE) + (Long.SIZE / Byte.SIZE);
    dataOutputStream.writeInt(indexOffset);
    dataOutputStream.writeLong(indexOffset + indexesLength);
  }


key索引文件重建过程

 索引文件的重建过程做的事情就是把key从原来的文件重建到根据key数据和负载因子生成的索引文件中,做的事情包括:

  • 根据key的数量和负载因子重新计算新的索引文件Math.round(count / loadFactor)
  • 从旧的key的索引文件中读取key的数据并进行hash定位到slot位置
  • 在指定的slot位置写入key的值以及key对应value在data文件中偏移量
private File buildIndex(int keyLength)
      throws IOException {
    // 根据该长度下key的数目/负载因子计算存储的slot的格式
    long count = keyCounts[keyLength];
    int slots = (int) Math.round(count / loadFactor);
    int offsetLength = maxOffsetLengths[keyLength];

    //注意slotSize的计算方式,slot里面保存的内容包括key的长度以及指向data的偏移量占用的字节数
    int slotSize = keyLength + offsetLength;

    // Init index
    File indexFile = new File(tempFolder, "index" + keyLength + ".dat");
    RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, "rw");
    try {
      // 设置重建key的文件的长度
      indexAccessFile.setLength(slots * slotSize);
      FileChannel indexChannel = indexAccessFile.getChannel();
      MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length());

      // Init reading stream
      // 初始化输入流
      File tempIndexFile = indexFiles[keyLength];
      DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile)));
      try {
        byte[] keyBuffer = new byte[keyLength];
        byte[] slotBuffer = new byte[slotSize];
        byte[] offsetBuffer = new byte[offsetLength];

        // Read all keys
        // 遍历key的数量重新写入到新建的索引文件当中
        for (int i = 0; i < count; i++) {
          // Read key
          tempIndexStream.readFully(keyBuffer);

          // Read offset
          long offset = LongPacker.unpackLong(tempIndexStream);

          // Hash,根据key进行重hash后确定放置到具体的slot位置
          long hash = (long) hashUtils.hash(keyBuffer);

          boolean collision = false;
          for (int probe = 0; probe < count; probe++) {
            int slot = (int) ((hash + probe) % slots);
            byteBuffer.position(slot * slotSize);
            byteBuffer.get(slotBuffer);

            long found = LongPacker.unpackLong(slotBuffer, keyLength);
            if (found == 0) {
              // The spot is empty use it
              // 根据hash值写入key以及key对应value在data文件的偏移量
              byteBuffer.position(slot * slotSize);
              byteBuffer.put(keyBuffer);
              int pos = LongPacker.packLong(offsetBuffer, offset);
              byteBuffer.put(offsetBuffer, 0, pos);
              break;
            } else {
              collision = true;
              // PalDB不支持存在相同的key
              if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) {
                throw new RuntimeException(
                        String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer)));
              }
            }
          }

          if (collision) {
            collisions++;
          }
        }
      } 

    return indexFile;
  }


PalDB的文件存储格式

img_9bf47af41ad5d2f3a2311af834eb4e2f.png
PalDB存储结构
目录
相关文章
|
22天前
|
存储 算法 数据处理
模拟数据读取函数有哪些
模拟数据读取函数主要用于测试和开发阶段,常见的有:numpy的random系列函数、pandas的DataFrame.sample()、Python内置的random模块等。这些函数可以生成随机或样本数据,方便快捷地进行数据处理和算法测试。
有 3 个进程 P1、P2、P3 协作解决文件打印问题。P1 将文件记录从磁盘读入内存的缓冲区 1,每执行一次读一个记录 ;P2 将缓冲区 1 中的内容复制到缓冲区 2 中,每执行一次复制一个记录 ;
有 3 个进程 P1、P2、P3 协作解决文件打印问题。P1 将文件记录从磁盘读入内存的缓冲区 1,每执行一次读一个记录 ;P2 将缓冲区 1 中的内容复制到缓冲区 2 中,每执行一次复制一个记录 ;
|
7月前
|
存储 文件存储
<文件操作> 文件的打开与关闭,顺序读写,随机读写,二进制文件,读取结束的判定,文件缓冲区
<文件操作> 文件的打开与关闭,顺序读写,随机读写,二进制文件,读取结束的判定,文件缓冲区
42 1
|
7月前
|
存储 C语言
用二进制方式向文本读写数据
用二进制方式向文本读写数据
44 3
|
7月前
|
C语言 C++
文件底层的理解之缓冲区
文件底层的理解之缓冲区
|
7月前
|
存储 分布式计算 Java
HDFS的数据读取流程是怎样的?请描述数据读取的过程。
HDFS的数据读取流程是怎样的?请描述数据读取的过程。
108 0
|
计算机视觉 索引 Windows
视频操作_01视频读写:视频读写+读取视频+保存视频
在OpenCV中我们要获取一个视频,需要创建一个VideoCapture对象
193 0
|
NoSQL 数据库 开发者
了解读写数据流程 | 学习笔记
快速学习 了解读写数据流程
了解读写数据流程 | 学习笔记
|
移动开发 Linux Windows
IO流概述分类、字节流写数据、字节流写数据的三种方式及写数据的两个小问题
IO流概述分类、字节流写数据、字节流写数据的三种方式及写数据的两个小问题的简单示例
123 0
IO流概述分类、字节流写数据、字节流写数据的三种方式及写数据的两个小问题
|
存储 缓存 Linux
流的读写
流的读写
105 0