HFile写入Cell浅析(一)

简介:         在HBase中,无论是MemStore的flush,还是HFile的Compact,都会涉及到新HFile的产生,那么HFile是如何产生的呢?我们又是如何将数据Cell写入到HFile中的呢?本文,我们将会大家简单介绍下HFile写入Cell的主体流程。

        在HBase中,无论是MemStore的flush,还是HFile的Compact,都会涉及到新HFile的产生,那么HFile是如何产生的呢?我们又是如何将数据Cell写入到HFile中的呢?本文,我们将会大家简单介绍下HFile写入Cell的主体流程。

        在《HFile文件格式》一文中,我们简单给大家介绍了HFile文件内容的组织形式,理解这篇文章,将会对理解本文有所帮助,请读者自行阅读。

        HFile文件Cell的写入,发起的一个地方,就是MemStore flush时,StoreFlusher的performFlush()方法,如下:

  /**
   * Performs memstore flush, writing data from scanner into sink.
   * 执行memstore的刷新,将数据从scanner写入到sink
   * 
   * @param scanner Scanner to get data from.
   * @param sink Sink to write data to. Could be StoreFile.Writer.
   * @param smallestReadPoint Smallest read point used for the flush.
   */
  protected void performFlush(InternalScanner scanner,
      Compactor.CellSink sink, long smallestReadPoint) throws IOException {
    int compactionKVMax =
      conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
    List<Cell> kvs = new ArrayList<Cell>();
    boolean hasMore;
    do {
      hasMore = scanner.next(kvs, compactionKVMax);
      if (!kvs.isEmpty()) {
    	// 循环Cell列表,调用Compactor.CellSink的sink方法,append数据到磁盘
        for (Cell c : kvs) {
          // If we know that this KV is going to be included always, then let us
          // set its memstoreTS to 0. This will help us save space when writing to
          // disk.
        	// 如果我们知道这个KV是包括总,然后让我们设置它memstoreTS为0。这将帮助我们节省空间在写入磁盘。
          sink.append(c);
        }
        kvs.clear();
      }
    } while (hasMore);
  }
        它会循环的将Cell写入Compactor.CellSink类型的sink,那么这个sink是什么呢?在performFlush()方法的上层调用者DefaultStoreFlusher的flushSnapshot()方法中,首先会调用HStore的createWriterInTmp()方法生成一个StoreFile.Writer实例writer,然后将这个writer作为参数sink传入performFlush()方法,如下:

        // Write the map out to the disk
        // 创建Writer
        writer = store.createWriterInTmp(
            cellsCount, store.getFamily().getCompression(), false, true, true);
        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
        IOException e = null;
        try {
          performFlush(scanner, writer, smallestReadPoint);
        } catch (IOException ioe) {
          e = ioe;
          // throw the exception out
          throw ioe;
        } finally {
          if (e != null) {
            writer.close();
          } else {
            finalizeWriter(writer, cacheFlushId, status);
          }
        }
        我们再看这个StoreFile.Writer类型的writer是如何生成的,进入HStore的createWriterInTmp()方法,代码如下:

    // 创建StoreFile的StoreFile,需要使用上述信息,比如文件系统、文件路径、合并器、最大keyvalue数目、有利节点等
    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
        this.getFileSystem())// 文件系统
            .withFilePath(fs.createTempName())// 文件路径
            .withComparator(comparator)// 合并器
            .withBloomType(family.getBloomFilterType())
            .withMaxKeyCount(maxKeyCount)// 最大keyvalue数目
            .withFavoredNodes(favoredNodes)// 有利节点
            .withFileContext(hFileContext)// HFile上下文信息
            .build();
        而在StoreFile.WriterBuilder的build()方法最后,会new一个StoreFile.Writer实例,在其构造方法中,会生成一个HFile.Writer实例writer,如下:

      writer = HFile.getWriterFactory(conf, cacheConf)
          .withPath(fs, path)
          .withComparator(comparator)
          .withFavoredNodes(favoredNodes)
          .withFileContext(fileContext)
          .create();
        而这个HFile.Writer实例writer则是通过HFile中WriterFactory获取的,如下:

  /**
   * Returns the factory to be used to create {@link HFile} writers
   */
  public static final WriterFactory getWriterFactory(Configuration conf,
      CacheConfig cacheConf) {
    int version = getFormatVersion(conf);
    switch (version) {
    case 2:
      return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
    case 3:
      return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
    default:
      throw new IllegalArgumentException("Cannot create writer for HFile " +
          "format version " + version);
    }
  }
        HBase中包含两种类型的WriterFactoryV2,分别为HFileWriterV2.WriterFactoryV2和HFileWriterV3.WriterFactoryV3,今天我们以HFileWriterV2.WriterFactoryV2为例,其creaet的HFile.Writer实例其实为HFileWriterV2类型,如下:

    @Override
    public Writer createWriter(FileSystem fs, Path path, 
        FSDataOutputStream ostream,
        KVComparator comparator, HFileContext context) throws IOException {
      context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
      return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
          comparator, context);
      }
    }
        那么,最开始的sink调用append循环写入Cell,实际上最终调用的是HFileWriterV2的append()方法,如下:

    public void append(final Cell cell) throws IOException {
      appendGeneralBloomfilter(cell);
      appendDeleteFamilyBloomFilter(cell);
      writer.append(cell);// 调用HFile的Writer的append()方法,将数据写入文件
      trackTimestamps(cell);// 记录本次Put操作的时间戳
    }
        这个writer的实例化就是我们上面讲到的HFileWriterV2的实例化。下面,我们重点看下HFileWriterV2的append()方法,如下:

  /**
   * Add key/value to file. Keys must be added in an order that agrees with the
   * Comparator passed on construction.
   *
   * @param cell Cell to add. Cannot be empty nor null.
   * @throws IOException
   */
  @Override
  public void append(final Cell cell) throws IOException {
    
	// 从Cell中获取数据value、偏移量voffset、长度vlength
	byte[] value = cell.getValueArray();
    int voffset = cell.getValueOffset();
    int vlength = cell.getValueLength();
    
    // checkKey uses comparator to check we are writing in order.
    // 检测给定Cell,确保key是有序的,dupKey为true说明key没有换,false说明key已更改
    boolean dupKey = checkKey(cell);
    
    // 检测value,确保value不为null
    checkValue(value, voffset, vlength);
    
    if (!dupKey) {// 换key时才检测block边界
      checkBlockBoundary();
    }

    if (!fsBlockWriter.isWriting()) {
      // 申请新块
      newBlock();
    }

    // 写入Cell
    fsBlockWriter.write(cell);

    // 累加Key长度totalKeyLength和Value长度totalValueLength
    totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
    totalValueLength += vlength;

    // Are we the first key in this block?
    // 标记该数据块中写入的第一个key,firstCellInBlock
    if (firstCellInBlock == null) {
      // If cell is big, block will be closed and this firstCellInBlock reference will only last
      // a short while.
      firstCellInBlock = cell;
    }

    // TODO: What if cell is 10MB and we write infrequently?  We'll hold on to the cell here
    // indefinetly?
    // 标记上次写入Cell,即lastCell赋值为当前cell
    lastCell = cell;
    
    // 条目计数加1
    entryCount++;
    
    // 更新maxMemstoreTS
    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
  }
        逻辑很简单,大体如下:

        1、首先从Cell中获取数据value、偏移量voffset、长度vlength;

        2、然后调用checkKey()方法,检测给定Cell,确保key是有序的,dupKey为true说明key没有换,false说明key已更改;

        3、接着调用checkValue()方法,检测value,确保value不为null;

        4、换key时才检测block边界,此时调用的是checkBlockBoundary()方法;

        5、如果需要申请新的数据块的话,调用newBlock()方法申请新的数据块,判断的依据是fsBlockWriter的isWriting()返回为false,表明文件系统数据块写入者暂停写入;

        6、调用fsBlockWriter的write()方法写入Cell;

        7、累加Key长度totalKeyLength和Value长度totalValueLength;

        8、如果需要,标记该数据块中写入的第一个key,firstCellInBlock;

        9、标记上次写入Cell,即lastCell赋值为当前cell;

        10、条目计数entryCount加1;

        11、更新maxMemstoreTS。

        检测block边界的checkBlockBoundary()方法如下:

  /**
   * At a block boundary, write all the inline blocks and opens new block.
   *
   * @throws IOException
   */
  protected void checkBlockBoundary() throws IOException {
    
	// 如果fsBlockWriter已写入大小小于hFileContext中定义的块大小,直接返回
	if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
      return;

	// 结束上一个块
    finishBlock();
    
    // 写入InlineBlocks
    writeInlineBlocks(false);
    
    // 申请新块
    newBlock();
  }
        它会首先判断fsBlockWriter已写入大小,如果fsBlockWriter已写入大小小于hFileContext中定义的块大小,说明block未越界,直接返回,否则的话,说明block已经达到或者超过设定的阈值,此时,需要做以下处理:

        1、调用finishBlock()方法,结束上一个block;

        2、调用writeInlineBlocks()方法,写入InlineBlocks;

        3、调用newBlock()方法,申请新块。

        未完待续!










相关文章
|
分布式计算 Hadoop Java
CDH性能优化(参数配置)
NameNode中用于处理RPC调用的线程数,即指定NameNode 的服务器线程的数量。NameNode有一个工作线程池用来处理客户端的远程过程调用及集群守护进程的调用,处理程序数量越多意味着要更大的池来处理来自不同DataNode的并发心跳以及客户端并发的元数据操作)。
615 0
|
3月前
|
缓存 自然语言处理 JavaScript
抓紧上车,别再错过啦, Github 开源后台管理平台,Naive UI !!!
naive-ui-pro 是基于 Vue3 + Vite + TypeScript 的免费开源中后台模板,主打“路由插件化架构”,将权限、页签、缓存等功能拆解为可插拔模块,像搭积木一样灵活组装。内置 14+ 插件、Pro Naive UI 组件库与丰富示例,支持移动端适配、多主题、国际化,MIT 许可,开箱即用,助力高效开发。
318 4
|
10月前
|
人工智能 算法 Cloud Native
华为、埃森哲都在用的培训法则:3大战场拆解与8家破局者图谱
但当企业竞争进入“深水区”,当AI技术以周为单位迭代时,堆砌知识量 → 标准化课件 → 单向灌输这套模式注定失效。真正的破局者在哪里?那些能将培训嵌入业务毛细血管,用真实项目倒逼能力跃迁,靠前沿技术直击行业痛点的机构,正在改写游戏规则。
|
SQL 存储 关系型数据库
|
存储 安全 Java
javax.security.auth.login.LoginException: Receive timed out
`亲测可用,之前搜索了很多博客,啥样的都有,就是不介绍报错以及配置用处,根本不懂照抄那些配置是干啥的,稀里糊涂的按照博客搭完也跑不起来,因此记录这个。` `项目背景`:公司项目当前采用http协议+shiro+mysql的登录认证方式,而现在想支持ldap协议认证登录然后能够访问自己公司的项目网站。 `举例说明`:假设我们公司有自己的门户网站,现在我们收购了一家公司,他们数据库采用ldap存储用户数据,那么为了他们账户能登陆我们公司项目所以需要集成,而不是再把他们的账户重新在mysql再创建一遍,万一人家有1W个账户呢,不累死了且也不现实啊。
210 10
|
存储 数据可视化 Cloud Native
用Ganos低代码实现免切片遥感影像浏览(二):动态栅格瓦片
本文介绍了Ganos全新发布了动态栅格瓦片能力,帮助用户将库内栅格数据或栅格分析结果快速可视化,无需依赖类似GeoServer等空间服务中间件,技术栈短平快,使用灵活高效。
三阶魔方公式详解及快速解法方法介绍
三阶魔方公式详解及快速解法方法介绍
|
Linux 开发工具 数据库
【REP】hrms-ERPNext 容器安装配置
【REP】hrms-ERPNext 容器安装配置
|
分布式计算 安全 Hadoop
kyuubi提交任务异常报错Unauthorized connection for super-user from IP
最终,因为系统配置可能相当复杂,如果问题仍然没法解决,建议联系相关的系统管理员或寻求专业支持。
270 3
|
监控 Java 数据挖掘
用Java代码打造游戏反作弊系统
用Java代码打造游戏反作弊系统
521 0