HFile写入Cell浅析(一)

简介:         在HBase中,无论是MemStore的flush,还是HFile的Compact,都会涉及到新HFile的产生,那么HFile是如何产生的呢?我们又是如何将数据Cell写入到HFile中的呢?本文,我们将会大家简单介绍下HFile写入Cell的主体流程。

        在HBase中,无论是MemStore的flush,还是HFile的Compact,都会涉及到新HFile的产生,那么HFile是如何产生的呢?我们又是如何将数据Cell写入到HFile中的呢?本文,我们将会大家简单介绍下HFile写入Cell的主体流程。

        在《HFile文件格式》一文中,我们简单给大家介绍了HFile文件内容的组织形式,理解这篇文章,将会对理解本文有所帮助,请读者自行阅读。

        HFile文件Cell的写入,发起的一个地方,就是MemStore flush时,StoreFlusher的performFlush()方法,如下:

  /**
   * Performs memstore flush, writing data from scanner into sink.
   * 执行memstore的刷新,将数据从scanner写入到sink
   * 
   * @param scanner Scanner to get data from.
   * @param sink Sink to write data to. Could be StoreFile.Writer.
   * @param smallestReadPoint Smallest read point used for the flush.
   */
  protected void performFlush(InternalScanner scanner,
      Compactor.CellSink sink, long smallestReadPoint) throws IOException {
    int compactionKVMax =
      conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
    List<Cell> kvs = new ArrayList<Cell>();
    boolean hasMore;
    do {
      hasMore = scanner.next(kvs, compactionKVMax);
      if (!kvs.isEmpty()) {
    	// 循环Cell列表,调用Compactor.CellSink的sink方法,append数据到磁盘
        for (Cell c : kvs) {
          // If we know that this KV is going to be included always, then let us
          // set its memstoreTS to 0. This will help us save space when writing to
          // disk.
        	// 如果我们知道这个KV是包括总,然后让我们设置它memstoreTS为0。这将帮助我们节省空间在写入磁盘。
          sink.append(c);
        }
        kvs.clear();
      }
    } while (hasMore);
  }
        它会循环的将Cell写入Compactor.CellSink类型的sink,那么这个sink是什么呢?在performFlush()方法的上层调用者DefaultStoreFlusher的flushSnapshot()方法中,首先会调用HStore的createWriterInTmp()方法生成一个StoreFile.Writer实例writer,然后将这个writer作为参数sink传入performFlush()方法,如下:

        // Write the map out to the disk
        // 创建Writer
        writer = store.createWriterInTmp(
            cellsCount, store.getFamily().getCompression(), false, true, true);
        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
        IOException e = null;
        try {
          performFlush(scanner, writer, smallestReadPoint);
        } catch (IOException ioe) {
          e = ioe;
          // throw the exception out
          throw ioe;
        } finally {
          if (e != null) {
            writer.close();
          } else {
            finalizeWriter(writer, cacheFlushId, status);
          }
        }
        我们再看这个StoreFile.Writer类型的writer是如何生成的,进入HStore的createWriterInTmp()方法,代码如下:

    // 创建StoreFile的StoreFile,需要使用上述信息,比如文件系统、文件路径、合并器、最大keyvalue数目、有利节点等
    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
        this.getFileSystem())// 文件系统
            .withFilePath(fs.createTempName())// 文件路径
            .withComparator(comparator)// 合并器
            .withBloomType(family.getBloomFilterType())
            .withMaxKeyCount(maxKeyCount)// 最大keyvalue数目
            .withFavoredNodes(favoredNodes)// 有利节点
            .withFileContext(hFileContext)// HFile上下文信息
            .build();
        而在StoreFile.WriterBuilder的build()方法最后,会new一个StoreFile.Writer实例,在其构造方法中,会生成一个HFile.Writer实例writer,如下:

      writer = HFile.getWriterFactory(conf, cacheConf)
          .withPath(fs, path)
          .withComparator(comparator)
          .withFavoredNodes(favoredNodes)
          .withFileContext(fileContext)
          .create();
        而这个HFile.Writer实例writer则是通过HFile中WriterFactory获取的,如下:

  /**
   * Returns the factory to be used to create {@link HFile} writers
   */
  public static final WriterFactory getWriterFactory(Configuration conf,
      CacheConfig cacheConf) {
    int version = getFormatVersion(conf);
    switch (version) {
    case 2:
      return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
    case 3:
      return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
    default:
      throw new IllegalArgumentException("Cannot create writer for HFile " +
          "format version " + version);
    }
  }
        HBase中包含两种类型的WriterFactoryV2,分别为HFileWriterV2.WriterFactoryV2和HFileWriterV3.WriterFactoryV3,今天我们以HFileWriterV2.WriterFactoryV2为例,其creaet的HFile.Writer实例其实为HFileWriterV2类型,如下:

    @Override
    public Writer createWriter(FileSystem fs, Path path, 
        FSDataOutputStream ostream,
        KVComparator comparator, HFileContext context) throws IOException {
      context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
      return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
          comparator, context);
      }
    }
        那么,最开始的sink调用append循环写入Cell,实际上最终调用的是HFileWriterV2的append()方法,如下:

    public void append(final Cell cell) throws IOException {
      appendGeneralBloomfilter(cell);
      appendDeleteFamilyBloomFilter(cell);
      writer.append(cell);// 调用HFile的Writer的append()方法,将数据写入文件
      trackTimestamps(cell);// 记录本次Put操作的时间戳
    }
        这个writer的实例化就是我们上面讲到的HFileWriterV2的实例化。下面,我们重点看下HFileWriterV2的append()方法,如下:

  /**
   * Add key/value to file. Keys must be added in an order that agrees with the
   * Comparator passed on construction.
   *
   * @param cell Cell to add. Cannot be empty nor null.
   * @throws IOException
   */
  @Override
  public void append(final Cell cell) throws IOException {
    
	// 从Cell中获取数据value、偏移量voffset、长度vlength
	byte[] value = cell.getValueArray();
    int voffset = cell.getValueOffset();
    int vlength = cell.getValueLength();
    
    // checkKey uses comparator to check we are writing in order.
    // 检测给定Cell,确保key是有序的,dupKey为true说明key没有换,false说明key已更改
    boolean dupKey = checkKey(cell);
    
    // 检测value,确保value不为null
    checkValue(value, voffset, vlength);
    
    if (!dupKey) {// 换key时才检测block边界
      checkBlockBoundary();
    }

    if (!fsBlockWriter.isWriting()) {
      // 申请新块
      newBlock();
    }

    // 写入Cell
    fsBlockWriter.write(cell);

    // 累加Key长度totalKeyLength和Value长度totalValueLength
    totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
    totalValueLength += vlength;

    // Are we the first key in this block?
    // 标记该数据块中写入的第一个key,firstCellInBlock
    if (firstCellInBlock == null) {
      // If cell is big, block will be closed and this firstCellInBlock reference will only last
      // a short while.
      firstCellInBlock = cell;
    }

    // TODO: What if cell is 10MB and we write infrequently?  We'll hold on to the cell here
    // indefinetly?
    // 标记上次写入Cell,即lastCell赋值为当前cell
    lastCell = cell;
    
    // 条目计数加1
    entryCount++;
    
    // 更新maxMemstoreTS
    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
  }
        逻辑很简单,大体如下:

        1、首先从Cell中获取数据value、偏移量voffset、长度vlength;

        2、然后调用checkKey()方法,检测给定Cell,确保key是有序的,dupKey为true说明key没有换,false说明key已更改;

        3、接着调用checkValue()方法,检测value,确保value不为null;

        4、换key时才检测block边界,此时调用的是checkBlockBoundary()方法;

        5、如果需要申请新的数据块的话,调用newBlock()方法申请新的数据块,判断的依据是fsBlockWriter的isWriting()返回为false,表明文件系统数据块写入者暂停写入;

        6、调用fsBlockWriter的write()方法写入Cell;

        7、累加Key长度totalKeyLength和Value长度totalValueLength;

        8、如果需要,标记该数据块中写入的第一个key,firstCellInBlock;

        9、标记上次写入Cell,即lastCell赋值为当前cell;

        10、条目计数entryCount加1;

        11、更新maxMemstoreTS。

        检测block边界的checkBlockBoundary()方法如下:

  /**
   * At a block boundary, write all the inline blocks and opens new block.
   *
   * @throws IOException
   */
  protected void checkBlockBoundary() throws IOException {
    
	// 如果fsBlockWriter已写入大小小于hFileContext中定义的块大小,直接返回
	if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
      return;

	// 结束上一个块
    finishBlock();
    
    // 写入InlineBlocks
    writeInlineBlocks(false);
    
    // 申请新块
    newBlock();
  }
        它会首先判断fsBlockWriter已写入大小,如果fsBlockWriter已写入大小小于hFileContext中定义的块大小,说明block未越界,直接返回,否则的话,说明block已经达到或者超过设定的阈值,此时,需要做以下处理:

        1、调用finishBlock()方法,结束上一个block;

        2、调用writeInlineBlocks()方法,写入InlineBlocks;

        3、调用newBlock()方法,申请新块。

        未完待续!










相关文章
|
NoSQL Redis
如何查看yum 安装的软件路径(不要再忘了)
如何查看yum 安装的软件路径 今天使用yum 安装了一个软件,后来没有找到路径 1、首先安装一个redis [root@iZbp1eem925ojwyx17ao9kZ ~]# yum install redis 2...
2297 0
|
分布式计算 Java Hadoop
解决Hbase启动报错问题:No such file or directory!
应用场景 在Hbase搭建完之后,本想开开心心的启动Hbase,进行测试使用hbase,但是发现启动hbase的时候,报各种各样的错误,java_home,hbase,hadoop等找不到文件或目录,no such ...
4092 0
|
SQL 存储 关系型数据库
|
分布式计算 安全 Hadoop
kyuubi提交任务异常报错Unauthorized connection for super-user from IP
最终,因为系统配置可能相当复杂,如果问题仍然没法解决,建议联系相关的系统管理员或寻求专业支持。
223 3
|
分布式计算 安全 Hadoop
HBase Shell-org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet 已解决
在HBase Shell遇到错误时,检查Hadoop非安全模式:`hdfs dfsadmin -safemode get`。问题解决在于`hbase-site.xml`中添加配置:Zookeeper客户端端口设为2181和预写日志提供者设为filesystem。
501 6
|
人工智能 安全 大数据
HBase启动报错:ERROR: org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet
欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 今天进入hbase shell中输入命令报错:ERROR: org.
6349 0
|
监控 NoSQL 关系型数据库
ycsb性能测试的优缺点
YCSB(Yahoo Cloud Serving Benchmark)是一个开源的性能测试框架,用于评估分布式系统的读写性能。它具有以下优点和缺点: 优点: 简单易用:YCSB提供了简单的API和配置文件,使得性能测试非常容易上手和执行。 可扩展性:YCSB支持多种数据库和存储系统,包括关系型数据库、NoSQL数据库、分布式文件系统等,使得测试可以针对不同的系统进行比较和评估。 客户端压力:YCSB可以模拟大量并发用户并提供各种负载测试模式,可以测试系统在高负载情况下的性能表现。 可自定义:YCSB允许用户通过自定义操作和负载生成器来模拟真实场景的读写操作,并能够根据需求进行灵活的性能测试
321 0
|
Java 编译器
Java - 修改 Jar 包源码(非反编译操作)
Java - 修改 Jar 包源码(非反编译操作)
2157 0
Java - 修改 Jar 包源码(非反编译操作)
|
存储 SQL 数据采集