HBase-1.2.4 Allow block cache to be external分析

简介: 一、简介        从HBase-1.1.0起,HBase可以使用memcached作为外部BlockCache,这是一个在设备失效或者升级时不会发生完全的冷缓存的很好的特性。用句通俗的话讲,就是HBase出现故障或者升级时,缓存轻易不会丢失。

一、简介

        从HBase-1.1.0起,HBase可以使用memcached作为外部BlockCache,这是一个在设备失效或者升级时不会发生完全的冷缓存的很好的特性。用句通俗的话讲,就是HBase出现故障或者升级时,缓存轻易不会丢失。

二、启动

        通过配置以下两个参数实现Allow block cache to be external的启动:

        配置hbase.blockcache.use.external为true,并且通过配置hbase.cache.memcached.servers来指明memcached servers.

三、实现分析

        1、构造

        实现HBase external BlockCache的主要是MemcachedBlockCache这个类,它同LruBlockCache一样,实现了BlockCache接口。在其构造函数中,主要是实例化了一个Memcached客户端MemcachedClient,通过这个客户端完成缓存的相关操作,如下:

  public MemcachedBlockCache(Configuration c) throws IOException {
    LOG.info("Creating MemcachedBlockCache");

    long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
    long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
    boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);

    ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
        .setOpTimeout(opTimeout)
        .setOpQueueMaxBlockTime(queueTimeout) // Cap the max time before anything times out
        .setFailureMode(FailureMode.Redistribute)
        .setShouldOptimize(optimize)
        .setDaemon(true)                      // Don't keep threads around past the end of days.
        .setUseNagleAlgorithm(false)          // Ain't nobody got time for that
        .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case

    // Assume only the localhost is serving memecached.
    // A la mcrouter or co-locating memcached with split regionservers.
    //
    // If this config is a pool of memecached servers they will all be used according to the
    // default hashing scheme defined by the memcache client. Spy Memecache client in this
    // case.
    String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211");
    String[] servers = serverListString.split(",");
    List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(servers.length);
    for (String s:servers) {
      serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
    }

    client = new MemcachedClient(builder.build(), serverAddresses);
  }
        2、缓存

        缓存还是通过cacheBlock()方法来实现,如下:

  @Override
  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
    if (buf instanceof HFileBlock) {
      client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc);
    } else {
      if (LOG.isDebugEnabled()) {
        LOG.debug("MemcachedBlockCache can not cache Cacheable's of type "
            + buf.getClass().toString());
      }
    }
  }
        需要缓存的对象Cacheable必须是继承HFileBlock的子类才行,然后通过上述client将其add入Memcached。
        3、回收缓存

        缓存的回收,则是通过client的delete操作完成的,如下:

  @Override
  public boolean evictBlock(BlockCacheKey cacheKey) {
    try {
      cacheStats.evict();
      return client.delete(cacheKey.toString()).get();
    } catch (InterruptedException e) {
      LOG.warn("Error deleting " + cacheKey.toString(), e);
      Thread.currentThread().interrupt();
    } catch (ExecutionException e) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Error deleting " + cacheKey.toString(), e);
      }
    }
    return false;
  }

        4、获取缓存

        获取缓存则是通过getBlock()方法实现的,它是通过client的get()操作来实现,如下:

  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
                            boolean repeat, boolean updateCacheMetrics) {
    // Assume that nothing is the block cache
    HFileBlock result = null;

    try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) {
      result = client.get(cacheKey.toString(), tc);
    } catch (Exception e) {
      // Catch a pretty broad set of exceptions to limit any changes in the memecache client
      // and how it handles failures from leaking into the read path.
      if (LOG.isDebugEnabled()) {
        LOG.debug("Exception pulling from memcached [ "
            + cacheKey.toString()
            + " ]. Treating as a miss.", e);
      }
      result = null;
    } finally {
      // Update stats if this request doesn't have it turned off 100% of the time
      if (updateCacheMetrics) {
        if (result == null) {
          cacheStats.miss(caching, cacheKey.isPrimary());
        } else {
          cacheStats.hit(caching, cacheKey.isPrimary());
        }
      }
    }


    return result;
  }
        5、初始化BlockCache

        BlockCache的初始化是在类CacheConfig的静态方法instantiateBlockCache()中完成的,代码如下:

  /**
   * Returns the block cache or <code>null</code> in case none should be used.
   * Sets GLOBAL_BLOCK_CACHE_INSTANCE
   *
   * @param conf  The current configuration.
   * @return The block cache or <code>null</code>.
   */
  public static synchronized BlockCache instantiateBlockCache(Configuration conf) {
    
	// 首选取静态成员变量GLOBAL_BLOCK_CACHE_INSTANCE,如果之前是否已经初始化过 ,立即返回 
	if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE;
    
	// 判断BlockCache是否被禁用
	if (blockCacheDisabled) return null;
    MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
    
    // 通过getL1()方法获取LruBlockCache,即l1
    LruBlockCache l1 = getL1(conf, mu);
    
    // 因为getL1()的调用可能引起blockCacheDisabled的变化,再次检查
    // blockCacheDisabled is set as a side-effect of getL1(), so check it again after the call.
    if (blockCacheDisabled) return null;
    
    // 通过getL2()方法获取BlockCache,即l2
    BlockCache l2 = getL2(conf, mu);
    
    // 如果l2为null,则GLOBAL_BLOCK_CACHE_INSTANCE定义为l1,下次直接获取GLOBAL_BLOCK_CACHE_INSTANCE
    if (l2 == null) {
      GLOBAL_BLOCK_CACHE_INSTANCE = l1;
    } else {// 否则
      // 判断是否启用外部缓存,通过参数hbase.blockcache.use.external判断,默认不启用
      boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);
     
      // 判断是否启用综合缓存,通过参数hbase.bucketcache.combinedcache.enabled判断
      boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
        DEFAULT_BUCKET_CACHE_COMBINED);
      if (useExternal) {// 如果启动外部缓存,构造InclusiveCombinedBlockCache实例并赋值给GLOBAL_BLOCK_CACHE_INSTANCE
        GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2);
      } else {
    	// 否则,如果启动综合缓存,构造CombinedBlockCache实例赋值给GLOBAL_BLOCK_CACHE_INSTANCE
        if (combinedWithLru) {
          GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2);
        } else {
          // L1 and L2 are not 'combined'.  They are connected via the LruBlockCache victimhandler
          // mechanism.  It is a little ugly but works according to the following: when the
          // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get
          // a block from the L1 cache, if not in L1, we will search L2.
          // 最后的一种情况则是l1
          GLOBAL_BLOCK_CACHE_INSTANCE = l1;
        }
      }
      l1.setVictimCache(l2);
    }
    return GLOBAL_BLOCK_CACHE_INSTANCE;
  }

        注释比较清晰,读者可自行阅读。

        getL2()方法,在启用外部缓存的情况下,通过getExternalBlockcache()方法获取外部缓存对象,否则会尝试获取BucketCache,如下:

  /**
   * @param c Configuration to use.
   * @param mu JMX Memory Bean
   * @return Returns L2 block cache instance (for now it is BucketCache BlockCache all the time)
   * or null if not supposed to be a L2.
   */
  private static BlockCache getL2(final Configuration c, final MemoryUsage mu) {
    final boolean useExternal = c.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Trying to use " + (useExternal?" External":" Internal") + " l2 cache");
    }

    // If we want to use an external block cache then create that.
    if (useExternal) {
      return getExternalBlockcache(c);
    }

    // otherwise use the bucket cache.
    return getBucketCache(c, mu);

  }

        外部缓存对象通过hbase.blockcache.external.class反射构造而成,而BucketCache的是否能够构造,需要综合判断一些条件,这个后续再分析。

       需要额外注意的是,上述代码中有一个地方,即l1.setVictimCache(l2),这么做的目的是让l1和l2能够进行同步,在l1中获取不到缓存时,查询l2,并将可能的结果同步到l1中,这个下篇文章会有介绍。

        下一篇博文会对上述缓存中的CombinedBlockCache和InclusiveCombinedBlockCache做一些简单介绍。

        





相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
65 1
|
6月前
|
分布式计算 Hadoop 关系型数据库
Hadoop任务scan Hbase 导出数据量变小分析
Hadoop任务scan Hbase 导出数据量变小分析
95 0
|
6月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
130 0
|
存储 SQL 分布式数据库
记录一次 Hbase 线上问题的分析和解决,并分析总结下背后的知识点 - KeyValue size too large
记录一次 Hbase 线上问题的分析和解决,并分析总结下背后的知识点 - KeyValue size too large
|
存储 分布式计算 关系型数据库
|
SQL 存储 消息中间件
|
分布式数据库 Hbase
|
分布式数据库 Hbase
《HBase 基本知识介绍及典型案例分析》电子版地址
HBase 基本知识介绍及典型案例分析
83 0
《HBase 基本知识介绍及典型案例分析》电子版地址
|
SQL 消息中间件 存储
基于 HBase 的大数据在线分析|学习笔记
快速学习基于 HBase 的大数据在线分析
基于 HBase 的大数据在线分析|学习笔记
|
大数据 分布式数据库 Hbase
《HBase实战:基于HBase的大数据在线分析》电子版地址
HBase实战:基于HBase的大数据在线分析
127 0
《HBase实战:基于HBase的大数据在线分析》电子版地址