hbase源码系列(十四)Compact和Split

简介: 本文介绍hbase中的Compact和Split。

先上一张图讲一下Compaction和Split的关系,这样会比较直观一些。

2ef04f764ac40b753132f05511980956d2b55b58

Compaction把多个MemStore flush出来的StoreFile合并成一个文件,而Split则是把过大的文件Split成两个。

之前在Delete的时候,我们知道它其实并没有真正删除数据的,那总不能一直不删吧,下面我们就介绍一下它删除数据的过程,它就是Compaction。

在讲源码之前,先说一下它的分类和作用。

Compaction主要起到如下几个作用:

1)合并文件

2)清除删除、过期、多余版本的数据

3)提高读写数据的效率

Minor & Major Compaction的区别:

1)Minor操作只用来做部分文件的合并操作以及包括minVersion=0并且设置ttl的过期版本清理,不做任何删除数据、多版本数据的清理工作。

2)Major操作是对Region下的HStore下的所有StoreFile执行合并操作,最终的结果是整理合并出一个文件。

先说一下怎么使用吧,下面分别是它们是shell命令,可以在hbase的shell里面执行。

//major compaction
major compact '表名或region名'

//minor compaction
compact '表名或region名'

下面我们开始看入口吧,入口在HBaseAdmin,找到compact方法,都知道我们compact可以对表操作或者对region进行操作。

1、先把表或者region相关的region信息和server信息全部获取出来

2、循环遍历这些region信息,依次请求compact操作

AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
try {
      admin.compactRegion(null, request);
} catch (ServiceException se) {
      throw ProtobufUtil.getRemoteException(se);
}

到这里,客户端的工作就结束了,我们直接到HRegionServer找compactRegion这个方法吧。

//major compaction多走这一步骤
      if (major) {
        if (family != null) {
          store.triggerMajorCompaction();
        } else {
          region.triggerMajorCompaction();
        }
      }
    //请求compaction走这里
      if(family != null) {
        compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER, null);
      } else {
        compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, null);
      }

我们先看major compaction吧,直接去看triggerMajorCompaction和requestCompaction方法。

Compaction

进入方法里面就发现了它把forceMajor置为true就完了,看来这个参数是major和minor的开关,接着看requestCompaction。

CompactionContext compaction = null;
if (selectNow) {
    compaction = selectCompaction(r, s, priority, request);
    if (compaction == null) return null; // message logged inside
}
// 要根据文件的size来判断用给个大的线程池还是小的线程池
long size = selectNow ? compaction.getRequest().getSize() : 0;
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool));

上面的步骤是执行selectCompaction创建一个CompactionContext,然后提交CompactionRunner。

我们接着看CompactionContext的创建过程吧,这里还需要分是用户创建的Compaction和系统创建的Compaction。

1、创建CompactionContext

2、判断是否是非高峰时间,下面是这两个参数的值

int startHour = conf.getInt("hbase.offpeak.start.hour", -1);
int endHour = conf.getInt("hbase.offpeak.end.hour", -1);

3、选择需要进行compaction的文件,添加到CompactionRequest和filesCompacting列表当中

compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty());

我们看看这个select的具体实现吧。

public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
      request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
          filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
      return request != null;
}

这里的select方法,从名字上看是压缩策略的意思,它是由这个参数控制的hbase.hstore.defaultengine.compactionpolicy.class,默认是ExploringCompactionPolicy这个类。

接着看ExploringCompactionPolicy的selectCompaction方法,发现这个方法是继承来的,找它的父类RatioBasedCompactionPolicy。

public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
      final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
    ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
    int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
    boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
        >= storeConfigInfo.getBlockingFileCount();
    //从candidateSelection排除掉filesCompacting中的文件
    candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);long cfTtl = this.storeConfigInfo.getStoreFileTtl();
    if (!forceMajor) {
      // 如果不是强制major的话,包含了过期的文件,先删除过期的文件
      if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
        ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
            candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
        if (expiredSelection != null) {
          return new CompactionRequest(expiredSelection);
        }
      }
      //居然还要跳过大文件,看来不是major的还是不行的,净挑小的弄
      candidateSelection = skipLargeFiles(candidateSelection);
    }
    // 是不是major的compaction还需要判断,做这个操作还是比较谨慎的
    boolean majorCompaction = (
      (forceMajor && isUserCompaction)
      || ((forceMajor || isMajorCompaction(candidateSelection))
          && (candidateSelection.size() < comConf.getMaxFilesToCompact()))
      || StoreUtils.hasReferences(candidateSelection)
      );

    if (!majorCompaction) {
     //过滤掉bulk load进来的文件
      candidateSelection = filterBulk(candidateSelection);
      //过滤掉一些不满足大小的文件
      candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
      //检查文件数是否满足最小的要求,文件不够,也不做compaction
      candidateSelection = checkMinFilesCriteria(candidateSelection);
    }
    //非major的超过最大可以compact的文件数量也要剔除掉,major的只是警告一下
    candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
    CompactionRequest result = new CompactionRequest(candidateSelection);
    result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
    return result;
  }

从上面可以看出来,major compaction的选择文件几乎没什么限制,只要排除掉正在compacting的文件就行了,反而是minor compact有诸多的排除选项,因为默认的compaction是定时执行的,所以它这方面的考虑吧,排除太大的文件,选择那些过期的文件,排除掉bulkload的文件等等内容。

Minor Compaction的文件选择策略

我们再简单看看applyCompactionPolicy这个方法吧,它是minor的时候用的,它的过程就像下图一样。

fbea70b3714c01ff08551288d646532902e2dc30

这个是双层循环: 

从0开始,循环N遍(N=文件数),就相当于窗口向右滑动,指针为start

----->从currentEnd=start + MinFiles(默认是3)-1,每次增加一个文件作为考虑,类似扩张的动作, 窗口扩大, 指针为

-------------->从candidateSelection文件里面取出(start, currentEnd + 1)开始

-------------->小于最小compact数量文件,默认是3,continue

-------------->大于最大compact数量文件,默认是10,continue

-------------->获取这部分文件的大小

-------------->如果这部分文件数量比上次选择方案的文件还小,替换为最小文件方案

-------------->大于MemStore flush的大小128M并且符合有一个文件不满这个公式(FileSize(i) <= ( 文件总大小- FileSize(i) ) * Ratio),continue

(注意上面的Ratio是干嘛的,这个和前面提到的非高峰时间的数值有关系,非高峰时段这个数值是5,高峰时间段这个值是1.2, 这说明高峰时段不允许compact过大的文件)

-------------->开始判断是不是最优的选择(下面讲的mayBeStuck是从selectCompaction传入的,可选择的文件超过7个的情况,上面黄色那部分代码)

1)如果mayBeStuck并且不是初次,如果 文件平均大小 > 上次选择的文件的平均大小*1.05, 替换上次的选择文件方案成为最优解;

2)初次或者不是mayBeStuck的情况,文件更多的或者文件相同、总文件大小更小的会成为最新的选择文件方案;

如果经过比较之后的最优文件选择方案不为空,就把它返回,否则就把最小文件方案返回。

下面是之前的Ratio的参数值,需要配合之前提到的参数配合使用的。

hbase.hstore.compaction.ratio              高峰时段,默认值是1.2
hbase.hstore.compaction.ratio.offpeak      非高峰时段,默认值是5

到这里先来个小结吧,从上面可以看得出来,这个Minor Compaction的文件选择策略就是选小的来,选最多的小文件来合并。

选择文件结束,回到compact的主流程

4、把CompactionRequest放入CompactionRunner,走线程池提交

之前的代码我再贴一下,省得大家有点凌乱。

ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool));

我们去看CompactionRunner的run方法吧,它也在当前的类里面。

if (this.compaction == null) {this.compaction = selectCompaction(this.region, this.store, queuedPriority, null); 
     // 出口,实在选不出东西来了,它会走这里跑掉
     if (this.compaction == null) return;
     // ....还有别的限制,和父亲运行的线程池也要一致,尼玛,什么逻辑  
    }
        
    boolean completed = region.compact(compaction, store);if (completed) {
       // blocked的regions再来一次,这次又要一次compaction意欲何为啊
       // 其实它的出口在上面的那段代码,它执行之后,没有这里这么恶心
       if (store.getCompactPriority() <= 0) {
           requestSystemCompaction(region, store, "Recursive enqueue");
       } else {
         // compaction之后的region可能很大,超过split的数量就要split了
         requestSplit(region);
       }

先是对region进行compact,如果完成了,判断一下优先级,优先级小于等于0,请求系统级别的compaction,否则请求split。

我们还是先看HRegion的compact方法,compact开始前,它要先上读锁,不让读了,然后调用HStore中的compact方法。

// 执行compact,生成新文件
      List<Path> newFiles = compaction.compact();
      //把compact生成的文件移动到正确的位置
      sfs = moveCompatedFilesIntoPlace(cr, newFiles);
      //记录WALEdit日志
      writeCompactionWalRecord(filesToCompact, sfs);
      //更新HStore相关的数据结构
      replaceStoreFiles(filesToCompact, sfs);/
      /归档旧的文件,关闭reader,重新计算file的大小
      completeCompaction(filesToCompact);

comact生成新文件的方法很简单,给源文件创建一个StoreScanner,之前说过StoreScanner能从多个Scanner当中每次都取出最小的kv,然后用StoreFile.Append的方法不停地追加写入即可,这些过程在前面的章节都介绍过了,这里不再重复。

简单的说,就是把这些文件合并到一个文件去了,尼玛,怪不得io那么大。

剩下的就是清理工作了,这里面有意思的就是它会记录一笔日志到writeCompactionWalRecord当中,在之间日志恢复那一章的时候,贴出来的代码里面有,只是没有详细的讲。因为走到这里它已经完成了compaction的过程,只是没有把旧的文件移入归档文件当中,它挂掉重启的时候进行恢复干的事情,就是替换文件。

5、store.getCompactPriority() 下一步是天堂抑或是地狱?

compact完了,要判断一下这个,真是天才啊。

public int getStoreCompactionPriority() {
    int blockingFileCount = conf.getInt(
        HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
    int priority = blockingFileCount - storefiles.size();
    return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}

比较方法是这个,blockingFileCount的默认值是7,如果compact之后storefiles的文件数量大于7的话,就很有可能再触发一下,那么major compaction触发的可能性低,minor触发的可能性非常大。

不过没关系,实在选不出文件来,它会退出的。咱们可以将它这个参数hbase.hstore.blockingStoreFiles设置得大一些,弄出来一个比较大的数字。

Split

好,我们接着看requestSplit。

if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
      byte[] midKey = r.checkSplit();
      if (midKey != null) {
        requestSplit(r, midKey);
        return true;
      }
}

先检查一下是否可以进行split,如果可以,把中间的key返回来。

那条件是啥?在这里,if的条件是成立的,条件判断在IncreasingToUpperBoundRegionSplitPolicy的shouldSplit方法当中。

遍历region里面所有的store

1、Store当中不能有Reference文件。

2、store.size > Math.min(getDesiredMaxFileSize(), this.flushSize * (tableRegionsCount * (long)tableRegionsCount)) 就返回ture,可以split。

getDesiredMaxFileSize()默认是10G,由这个参数来确定hbase.hregion.max.filesize, 当没超过10G的时候它就会根据128MB * (该表在这个RS上的region数量)平方。

midKey怎么找呢?找出最大的HStore,然后通过它来找这个分裂点,最大的文件的中间点。

return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);

但是如果是另外一种情况,我们通过客户端来分裂Region,我们强制指定的分裂点,这种情况是按照我们设置的分裂点来进行分裂。

分裂点有了,我们接着看,我们发现它又提交了一个SplitRequest线程,看run方法。

1、先获得一个tableLock,给这个表上锁

2、执行SplitTransaction的prepare方法,然后execute

3、结束了释放tableLock

// 先做准备工作,然后再execute执行主流程,过程当中出错了,就rollback
      if (!st.prepare()) return;
      try {
        st.execute(this.server, this.server);
      } catch (Exception e) {
        try {
        if (st.rollback(this.server, this.server)) {
        } catch (RuntimeException ee) {this.server.abort(msg);
        }
        return;
      }

prepare方法当中,主要做了这么件事,new了两个新的region出来:

this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);

我们接着看execute方法,这个是重头戏。

PairOfSameType<HRegion> regions = createDaughters(server, services);
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());

总共分三步:

1、创建子region

2、上线子region

3、更改zk当中的状态

我们先看createDaughters

//在region-in-transition节点下给父region创建一个splitting的节点
    createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
    this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);//在parent的region目录下创建.splits目录
    this.parent.getRegionFileSystem().createSplitsDir();
    this.journal.add(JournalEntry.CREATE_SPLIT_DIR);

    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
    //关闭parent,然后返回相应的列族和storefile的map
    hstoreFilesToSplit = this.parent.close(false);
    //从在线列表里下线parent
    services.removeFromOnlineRegions(this.parent, null);
    this.journal.add(JournalEntry.OFFLINED_PARENT);
    // 把parent的storefile均分给两个daughter,所谓均分,只是创建引用文件而已
    splitStoreFiles(hstoreFilesToSplit);

    // 把临时的Region A目录重名为正式的region A 的目录    
    this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
    HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);

    // 把临时的Region B目录重名为正式的region B的目录
    this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
    HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
    this.journal.add(JournalEntry.PONR);

    // 修改meta表中的信息,设置parent的状态为下线、并且split过,在增加两列左右孩子,左右孩子的信息也通过put插入到meta中
    MetaEditor.splitRegion(server.getCatalogTracker(), parent.getRegionInfo(),
          a.getRegionInfo(), b.getRegionInfo(), server.getServerName());
    return new PairOfSameType<HRegion>(a, b);

在splitStoreFiles这块的,它给每个文件都开一个线程去进行split。

fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false);
fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true);

这里其实是给每个文件都创建了Reference文件,无论它的文件当中包不包括splitRow。

//parentRegion/.splits/region/familyName目录
    Path splitDir = new Path(getSplitsDir(hri), familyName);
    // 其实它并没有真正的split,而是通过创建Reference
    Reference r = top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
    String parentRegionName = regionInfo.getEncodedName();
    // 原来通过这么关联啊,storefile名字 + 父parent的name
    Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
    return r.write(fs, p);

把引用文件生成在每个子region对应的目录,以便下一步直接重命令目录即可。

重命名目录之后,就是修改Meta表了,splitRegion的方法是通过Put来进行操作的,它修改parent的regioninfo这一列更新为最新的信息,另外又增加了splitA和splitB两列,hri_a和hri_b则通过另外两个Put插入到Meta表当中。

这个过程当中如果出现任何问题,就需要根据journal记录的过程信息进行回滚操作。

怎么open这两个子region就不讲了,之前讲《HMaster启动过程》的时候讲过了。

到这里split的过程就基本结束了,鉴于Compaction和Split的对io方面的巨大影响,所以在任何资料里面都是推荐屏蔽自动执行,写脚本在晚上自动进行这些操作。

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
6月前
|
SQL Java 分布式数据库
实现HBase表和RDB表的转化(附Java源码资源)
该文介绍了如何将数据从RDB转换为HBase表,主要涉及三个来源:RDB Table、Client API和Files。文章重点讲解了RDB到HBase的转换,通过批处理思想,利用RDB接口批量导出数据并转化为`List&lt;Put&gt;`,然后导入HBase。目录结构包括配置文件、RDB接口及实现类、HBase接口及实现类,以及一个通用转换器接口和实现。代码中,`RDBImpl`负责从RDB读取数据并构造`Put`对象,`HBaseImpl`则负责将`Put`写入HBase表。整个过程通过配置文件`transfer.properties`管理HBase和RDB的映射关系。
57 3
实现HBase表和RDB表的转化(附Java源码资源)
|
NoSQL 大数据 分布式数据库
【HBase】(6)-Compact合并StoreFile流程
【HBase】(6)-Compact合并StoreFile流程
262 0
【HBase】(6)-Compact合并StoreFile流程
|
分布式数据库 Hbase
|
分布式数据库 Hbase
HBase 源码解析
HBase Read读流程源码解析&HBase Write写流程源码解析 &HBase Flush & Compact流程源码解析
4681 0
|
分布式数据库 Hbase Java
hbase region split源码分析
hbase region split : split执行调用流程: 1.HbaseAdmin发起split:### 2.RSRpcServices实现类执行split(Implements the regionserver RPC services.)### 3.CompactSplitThread类与SplitRequest类用来执行region切割:### 4.splitRequest执行doSplitting操作### 4.1初始化两个子region### 4.2执行切割#### 4.2.1:(创建子region。
1814 0
|
Java 分布式数据库 Ruby
HBase Filter 过滤器之 Comparator 原理及源码学习
HBase所有的比较器实现类都继承于父类ByteArrayComparable,而ByteArrayComparable又实现了Comparable接口;不同功能的比较器差别在于对父类compareTo()方法的重写逻辑不同。 下面分别对HBase Filter默认实现的七大比较器一一进行介绍。 1. BinaryComparator 介绍:二进制比较器,用于按字典顺序比较指定字节数组。 先看一个小例子: public class BinaryComparatorDemo { public static void main(String[] args) {
496 0
|
分布式计算 分布式数据库 Hbase
Hbase compact以及split跟踪
为了准确了解HBASE内部工作原理,我们需要做一些测试,在大量数据插入的情况下,HBASE内部到底有什么表现? 比如插入速度, hstore compact,split等相关活动,了解了这些才能更好的维护HBASE系统本身。 此次测试会有几轮,所以测试到哪里就写到哪里,我随便找了一张大概120W来的表,我会写一个mapreduce任务,来读取这张表,再写入另外一个测试表: test2, 没有选择更大的表是因为毕竟整个拷贝是需要时间,通常20分钟-30分钟,太大的表,不太利于跟踪。 拷贝过程,HBASE会针对此表有相关的活动日志,依据日志,我们来看看HBASE到底在干什么。 测试开始,
230 0
|
分布式数据库 Hbase 存储
HBase源码分析之HRegion上compact流程分析(一)
        首先来想两个问题:1、何谓compact?2、它产生的背景是怎样的?         compact是指HBase表中HRegion上某个Column Family下,部分或全部HFiles的合并。
1051 1
|
存储 Java Shell
HBase源码分析之HRegionServer上compact流程分析
        前面三篇文章中,我们详细叙述了compact流程是如何在HRegion上进行的,了解了它的很多细节方面的问题。但是,这个compact在HRegionServer上是如何进行的?合并时文件是如何选择的呢?在这篇文章中,你将找到答案!         首先,在HRegionServer内部,我们发现,它定义了一个CompactSplitThread类型的成员变量compactSplitThread,单看字面意思,这就是一个合并分裂线程,那么它会不会就是HRegionServer上具体执行合并的工作线程呢?我们一步一步来看。
1434 0