通过配置以下两个参数实现Allow block cache to be external的启动:
配置hbase.blockcache.use.external为true,并且通过配置hbase.cache.memcached.servers来指明memcached servers.
实现HBase external BlockCache的主要是MemcachedBlockCache这个类,它同LruBlockCache一样,实现了BlockCache接口。在其构造函数中,主要是实例化了一个Memcached客户端MemcachedClient,通过这个客户端完成缓存的相关操作,如下:
public MemcachedBlockCache(Configuration c) throws IOException { LOG.info("Creating MemcachedBlockCache"); long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT); long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT); boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT); ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder() .setOpTimeout(opTimeout) .setOpQueueMaxBlockTime(queueTimeout) // Cap the max time before anything times out .setFailureMode(FailureMode.Redistribute) .setShouldOptimize(optimize) .setDaemon(true) // Don't keep threads around past the end of days. .setUseNagleAlgorithm(false) // Ain't nobody got time for that .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case // Assume only the localhost is serving memecached. // A la mcrouter or co-locating memcached with split regionservers. // // If this config is a pool of memecached servers they will all be used according to the // default hashing scheme defined by the memcache client. Spy Memecache client in this // case. String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211"); String[] servers = serverListString.split(","); List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(servers.length); for (String s:servers) { serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); } client = new MemcachedClient(builder.build(), serverAddresses); }2、缓存
@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { if (buf instanceof HFileBlock) { client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc); } else { if (LOG.isDebugEnabled()) { LOG.debug("MemcachedBlockCache can not cache Cacheable's of type " + buf.getClass().toString()); } } }需要缓存的对象Cacheable必须是继承HFileBlock的子类才行,然后通过上述client将其add入Memcached。
@Override public boolean evictBlock(BlockCacheKey cacheKey) { try { cacheStats.evict(); return client.delete(cacheKey.toString()).get(); } catch (InterruptedException e) { LOG.warn("Error deleting " + cacheKey.toString(), e); Thread.currentThread().interrupt(); } catch (ExecutionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Error deleting " + cacheKey.toString(), e); } } return false; }
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, boolean updateCacheMetrics) { // Assume that nothing is the block cache HFileBlock result = null; try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) { result = client.get(cacheKey.toString(), tc); } catch (Exception e) { // Catch a pretty broad set of exceptions to limit any changes in the memecache client // and how it handles failures from leaking into the read path. if (LOG.isDebugEnabled()) { LOG.debug("Exception pulling from memcached [ " + cacheKey.toString() + " ]. Treating as a miss.", e); } result = null; } finally { // Update stats if this request doesn't have it turned off 100% of the time if (updateCacheMetrics) { if (result == null) { cacheStats.miss(caching, cacheKey.isPrimary()); } else { cacheStats.hit(caching, cacheKey.isPrimary()); } } } return result; }5、初始化BlockCache
/** * Returns the block cache or <code>null</code> in case none should be used. * Sets GLOBAL_BLOCK_CACHE_INSTANCE * * @param conf The current configuration. * @return The block cache or <code>null</code>. */ public static synchronized BlockCache instantiateBlockCache(Configuration conf) { // 首选取静态成员变量GLOBAL_BLOCK_CACHE_INSTANCE,如果之前是否已经初始化过 ,立即返回 if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; // 判断BlockCache是否被禁用 if (blockCacheDisabled) return null; MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); // 通过getL1()方法获取LruBlockCache,即l1 LruBlockCache l1 = getL1(conf, mu); // 因为getL1()的调用可能引起blockCacheDisabled的变化,再次检查 // blockCacheDisabled is set as a side-effect of getL1(), so check it again after the call. if (blockCacheDisabled) return null; // 通过getL2()方法获取BlockCache,即l2 BlockCache l2 = getL2(conf, mu); // 如果l2为null,则GLOBAL_BLOCK_CACHE_INSTANCE定义为l1,下次直接获取GLOBAL_BLOCK_CACHE_INSTANCE if (l2 == null) { GLOBAL_BLOCK_CACHE_INSTANCE = l1; } else {// 否则 // 判断是否启用外部缓存,通过参数hbase.blockcache.use.external判断,默认不启用 boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); // 判断是否启用综合缓存,通过参数hbase.bucketcache.combinedcache.enabled判断 boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, DEFAULT_BUCKET_CACHE_COMBINED); if (useExternal) {// 如果启动外部缓存,构造InclusiveCombinedBlockCache实例并赋值给GLOBAL_BLOCK_CACHE_INSTANCE GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); } else { // 否则,如果启动综合缓存,构造CombinedBlockCache实例赋值给GLOBAL_BLOCK_CACHE_INSTANCE if (combinedWithLru) { GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); } else { // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler // mechanism. It is a little ugly but works according to the following: when the // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get // a block from the L1 cache, if not in L1, we will search L2. // 最后的一种情况则是l1 GLOBAL_BLOCK_CACHE_INSTANCE = l1; } } l1.setVictimCache(l2); } return GLOBAL_BLOCK_CACHE_INSTANCE; }
/** * @param c Configuration to use. * @param mu JMX Memory Bean * @return Returns L2 block cache instance (for now it is BucketCache BlockCache all the time) * or null if not supposed to be a L2. */ private static BlockCache getL2(final Configuration c, final MemoryUsage mu) { final boolean useExternal = c.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); if (LOG.isDebugEnabled()) { LOG.debug("Trying to use " + (useExternal?" External":" Internal") + " l2 cache"); } // If we want to use an external block cache then create that. if (useExternal) { return getExternalBlockcache(c); } // otherwise use the bucket cache. return getBucketCache(c, mu); }