@Override public Result next() throws IOException { // If the scanner is closed and there's nothing left in the cache, next is a no-op. if (cache.size() == 0 && this.closed) { return null; } // 如果缓存中不存在数据,调用loadCache()方法加载缓存 if (cache.size() == 0) { loadCache(); } // 缓存中存在数据的话,直接从缓存中拉取数据,返回给客户端请求者 if (cache.size() > 0) { return cache.poll(); } // if we exhausted this scanner before calling close, write out the scan metrics writeScanMetrics(); return null; }而这个加载缓存的loadCache()方法,则会调用call()方法,发送RPC请求给对应的RegionServer上的Region,那么它是如何定位Region的呢?我们先看下这个call()方法,代码如下:
Result[] call(Scan scan, ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, int scannerTimeout) throws IOException, RuntimeException { if (Thread.interrupted()) { throw new InterruptedIOException(); } // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // we do a callWithRetries // caller为RpcRetryingCaller类型 // callable为ScannerCallableWithReplicas类型 return caller.callWithoutRetries(callable, scannerTimeout); }实际上caller为RpcRetryingCaller类型,而callable为ScannerCallableWithReplicas类型,我们看下RpcRetryingCaller的callWithoutRetries()方法,关键代码如下:
// 先调用prepare()方法,再调用call()方法,超时时间为callTimeout callable.prepare(false); return callable.call(callTimeout);发现没,实际上最终调用的是callable的call()方法,也就是ScannerCallableWithReplicas的call()方法,我们跟进下关键代码:
@Override public Result [] call(int timeout) throws IOException { // 此处省略代码若干字...... // 根据scan的startRow获取Region位置,使用cache RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, currentScannerCallable.getRow()); // 此处省略代码若干字...... }终于切入主题了!Region的定位是通过调用RpcRetryingCallerWithReadReplicas的getRegionLocations()方法进行的,它需要是否使用缓存标识位useCache、主从复制replicaId、ClusterConnection集群连接器cConnection,表名tableName、所在行Row等关键参数,并返回RegionLocations,用于表示Region的位置信息。而RegionLocations中存在一个数组locations,它的定义如下:
// locations array contains the HRL objects for known region replicas indexes by the replicaId. // elements can be null if the region replica is not known at all. A null value indicates // that there is a region replica with the index as replicaId, but the location is not known // in the cache. private final HRegionLocation[] locations; // replicaId -> HRegionLocation.它是一个HRegionLocation类型的数组,实际上存储的是replicaId到HRegionLocation的映射,replicaId就是数组的下标。而上面调用getRegionLocations()方法时,传入的replicaId为RegionReplicaUtil.DEFAULT_REPLICA_ID,也就是0。那么HRegionLocation是什么呢?看下它的两个关键成员变量就知道了:
private final HRegionInfo regionInfo; private final ServerName serverName;HRegionLocation就是Region的位置信息,它包含了关键的两点信息:1、数据读写请求中row所在Region信息HRegionInfo;2、Region所在服务器ServerName。有了这两点,我们就能够掌握row对应Region位置信息了。
static RegionLocations getRegionLocations(boolean useCache, int replicaId, ClusterConnection cConnection, TableName tableName, byte[] row) throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { RegionLocations rl; try { // 根据表名tableName,行row,和副本replicaId,来定位Region位置,得到RegionLocations,即rl if (!useCache) { // 不使用缓存,调用ClusterConnection的relocateRegion()方法,定位Region位置 rl = cConnection.relocateRegion(tableName, row, replicaId); } else { // 使用缓存,调用ClusterConnection的locateRegion()方法,定位Region位置 rl = cConnection.locateRegion(tableName, row, useCache, true, replicaId); } } catch (DoNotRetryIOException e) { throw e; } catch (RetriesExhaustedException e) { throw e; } catch (InterruptedIOException e) { throw e; } catch (IOException e) { throw new RetriesExhaustedException("Can't get the location", e); } if (rl == null) { throw new RetriesExhaustedException("Can't get the locations"); } return rl; }其实逻辑很简单,就分两种情况,使用缓存和不使用缓存。而且,我们也应该能猜出来,即便是使用缓存,如果缓存中没有的话,它还是会走一遍不使用缓存的流程,将获取到的Region位置信息加载到缓存中,然后再返回给外部调用者,最终我们需要共同研究的仅仅是不使用缓存的情况下如何定位Region而已。到底是不是这样呢?我们先记住,后面再做验证。
this.connection = ConnectionManager.getConnectionInternal(conf);通过ConnectionManager的静态方法getConnectionInternal(),从配置信息conf中加载而来。继续看下它的代码:
static ClusterConnection getConnectionInternal(final Configuration conf) throws IOException { // 根据配置信息conf构造HConnectionKey HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (CONNECTION_INSTANCES) { // 先从CONNECTION_INSTANCES中根据HConnectionKey获取连接HConnectionImplementation类型的connection, // CONNECTION_INSTANCES为HConnectionKey到HConnectionImplementation的映射集合 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection == null) {// 如果CONNECTION_INSTANCES中不存在 // 调用createConnection()方法创建一个HConnectionImplementation connection = (HConnectionImplementation)createConnection(conf, true); // 将新创建的HConnectionImplementation与HConnectionKey的对应关系存入CONNECTION_INSTANCES CONNECTION_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) {// 如果CONNECTION_INSTANCES中存在,且已关闭的话 // 调用ConnectionManager的deleteConnection()方法,删除connectionKey对应的记录: // 1、调用decCount()方法减少计数; // 2、从CONNECTION_INSTANCES类表中移除connectionKey对应记录; // 3、调用HConnectionImplementation的internalClose()方法处理关闭连接事宜 ConnectionManager.deleteConnection(connectionKey, true); // 调用createConnection()方法创建一个HConnectionImplementation connection = (HConnectionImplementation)createConnection(conf, true); // 将新创建的HConnectionImplementation与HConnectionKey的对应关系存入CONNECTION_INSTANCES CONNECTION_INSTANCES.put(connectionKey, connection); } // 连接计数器增1 connection.incCount(); // 返回连接 return connection; } }这个HConnectionKey实际上是连接的一个Key类,包含了连接对应的hbase.zookeeper.quorum、hbase.zookeeper.property.clientPort等重要信息,而获取连接的方法也很简单,如果之前创建过key相同的连接,直接从CONNECTION_INSTANCES集合中根据HConnectionKey获取,并将连接计数器增1,直接返回连接,获取不到的话,根据HConnectionKey创建一个新的,并加入CONNECTION_INSTANCES集合,而且,如果获取到的连接是Closed的话,调用ConnectionManager的deleteConnection()方法,删除connectionKey对应的记录,创建一个新的连接创建一个HConnectionImplementation,并加入到CONNECTION_INSTANCES集合。
@Override public RegionLocations relocateRegion(final TableName tableName, final byte [] row, int replicaId) throws IOException{ // Since this is an explicit request not to use any caching, finding // disabled tables should not be desirable. This will ensure that an exception is thrown when // the first time a disabled table is interacted with. // 既然这是一个明确不使用任何缓存的请求,如果发现表被禁用,那么这将是不可取的。 // 当我们第一时间发现表被禁用时,这里将会确保一个异常被抛出。 // 如果表不是META表,并且表被禁用的话,直接抛出TableNotEnabledException if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) { throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); } // 调用locateRegion方法进行Region的定位,传入的useCache为false,retry为true,即不使用缓存,并且进行重试 return locateRegion(tableName, row, false, true, replicaId); }很简单,先做一个必要的检查,如果表不是META表,并且表被禁用的话,直接抛出TableNotEnabledException,然后调用locateRegion()方法进行Region的定位,传入的useCache为false,retry为true,即不使用缓存,并且进行重试。
@Override public RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry, int replicaId) throws IOException { // 判断连接是否已关闭的标志位closed,为true则直接抛出IOException异常 if (this.closed) throw new IOException(toString() + " closed"); // 判断表名tableName,表名为空的话直接抛出IllegalArgumentException异常 if (tableName== null || tableName.getName().length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); } if (tableName.equals(TableName.META_TABLE_NAME)) { // 如果是meta表,直接调用locateMeta()方法进行定位 return locateMeta(tableName, useCache, replicaId); } else { // Region not in the cache - have to go to the meta RS // 如果不是meta表,cache中没有,需要访问meta RS,调用locateRegionInMeta()方法进行定位 return locateRegionInMeta(tableName, row, useCache, retry, replicaId); } }locateRegion()方法上来先做一些必要的检查:
2、如果不是meta表,cache中没有,需要访问meta RS,调用locateRegionInMeta()方法进行定位;
/* * Search the hbase:meta table for the HRegionLocation * info that contains the table and row we're seeking. * * 搜索HBase的meta表,找出包含我们正在查找的table及row相关的HRegionLocation信息 */ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { // If we are supposed to be using the cache, look in the cache to see if // we already have the region. // 如果我们支持在缓存中查找,先在缓存中看看是否我们已经有该Region if (useCache) { RegionLocations locations = getCachedLocation(tableName, row); if (locations != null && locations.getRegionLocation(replicaId) != null) { return locations; } } // build the key of the meta region we should be looking for. // the extra 9's on the end are necessary to allow "exact" matches // without knowing the precise region names. // 缓存中没有,构造一个scan // 根据表名tableName、行row、字符串"99999999999999",调用HRegionInfo的createRegionName()方法, // 创建一个Region Name:metaKey byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false); // 构造一个Scan,scan的起始行为上述metaKey,并且是一个反向小scan,即reversed small Scan Scan s = new Scan(); s.setReversed(true); s.setStartRow(metaKey); s.setSmall(true); s.setCaching(1); // 确定重试次数 int localNumRetries = (retry ? numTries : 1); for (int tries = 0; true; tries++) { // 重试次数超过上限的话,直接抛出NoServerForRegionException异常, // 重试次数上限取参数hbase.client.retries.number,参数未配置的话默认为31 if (tries >= localNumRetries) { throw new NoServerForRegionException("Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName + " after " + localNumRetries + " tries."); } if (useCache) {// 如果支持使用缓存的话 // 每次再从缓存中取一遍 RegionLocations locations = getCachedLocation(tableName, row); if (locations != null && locations.getRegionLocation(replicaId) != null) { return locations; } } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. // 如果我们不支持使用缓存,删除任何存在的相关缓存,以确保它不会干扰我们的查询 // 调用metaCache的clearCache()方法,根据tableName和row来删除 metaCache.clearCache(tableName, row); } // Query the meta region try { Result regionInfoRow = null; ReversedClientScanner rcs = null; try { // 构造ClientSmallReversedScanner实例rcs,从meta表中查找, // 而meta表的表名固定为hbase:meta,它的namespace为"meta",qualifier为"meta" rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, rpcControllerFactory, getBatchPool(), 0); // 通过scanner的next()方法,获取唯一的结果regionInfoRow regionInfoRow = rcs.next(); } finally { // 关闭ClientSmallReversedScanner if (rcs != null) { rcs.close(); } } if (regionInfoRow == null) {// 如果regionInfoRow为空,直接抛出TableNotFoundException异常 throw new TableNotFoundException(tableName); } // convert the row result into the HRegionLocation we need! // 将Result转换为我们需要的RegionLocations,即regionInfoRow->locations RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow); if (locations == null || locations.getRegionLocation(replicaId) == null) { throw new IOException("HRegionInfo was null in " + tableName + ", row=" + regionInfoRow); } // 从locations中获取Region信息HRegionInfo HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo(); if (regionInfo == null) { throw new IOException("HRegionInfo was null or empty in " + TableName.META_TABLE_NAME + ", row=" + regionInfoRow); } // possible we got a region of a different table... // 验证表名是否一致 if (!regionInfo.getTable().equals(tableName)) { throw new TableNotFoundException( "Table '" + tableName + "' was not found, got: " + regionInfo.getTable() + "."); } // 验证Rgion是否已分裂 if (regionInfo.isSplit()) { throw new RegionOfflineException("the only available region for" + " the required row is a split parent," + " the daughters should be online soon: " + regionInfo.getRegionNameAsString()); } // 验证Rgion是否已下线 if (regionInfo.isOffline()) { throw new RegionOfflineException("the region is offline, could" + " be caused by a disable table call: " + regionInfo.getRegionNameAsString()); } // 从locations中根据replicaId获取ServerName ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); if (serverName == null) { throw new NoServerForRegionException("No server address listed " + "in " + TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() + " containing row " + Bytes.toStringBinary(row)); } // 验证ServerName是否已死亡 if (isDeadServer(serverName)){ throw new RegionServerStoppedException("hbase:meta says the region "+ regionInfo.getRegionNameAsString()+" is managed by the server " + serverName + ", but it is dead."); } // Instantiate the location // 缓存获得的位置信息 cacheLocation(tableName, locations); return locations; } catch (TableNotFoundException e) { // if we got this error, probably means the table just plain doesn't // exist. rethrow the error immediately. this should always be coming // from the HTable constructor. throw e; } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); if (e instanceof RemoteException) { e = ((RemoteException)e).unwrapRemoteException(); } if (tries < localNumRetries - 1) { if (LOG.isDebugEnabled()) { LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME + ", metaLocation=" + ", attempt=" + tries + " of " + localNumRetries + " failed; retrying after sleep of " + ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); } } else { throw e; } // Only relocate the parent region if necessary if(!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId); } } try{ // 当前线程休眠一段时间,再次重试,休眠的时间与pause和tries有关,越往后,停顿时间一般越长(波动时间除外) Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { throw new InterruptedIOException("Giving up trying to location region in " + "meta: thread is interrupted."); } } }locateRegionInMeta()方法是对非Meta表中特定行row所在Region位置信息的检索,它本质上是通过检索HBase中Meta表数据来获取对应非Meta表中行row对应的Region位置信息的,其处理逻辑如下:
2、缓存中没有,构造一个scan,先根据表名tableName、行row、字符串"99999999999999",调用HRegionInfo的createRegionName()方法,创建一个Region Name:metaKey;
3、构造一个Scan,scan的起始行为上述metaKey,并且是一个反向小scan,即reversed small Scan;
/** * Search the cache for a location that fits our table and row key. * Return null if no suitable region is located. * * @param tableName * @param row * @return Null or region location found in cache. */ RegionLocations getCachedLocation(final TableName tableName, final byte [] row) { return metaCache.getCachedLocation(tableName, row); }它实际上是从metaCache中获取tableName和row所对应的Region位置信息。而metaCache是一个叫做MetaCache类的对象,它是为缓存Region位置信息,即Meta Data而专门设计的一个数据结构,我们先看下它的getCachedLocation()方法:
/** * Search the cache for a location that fits our table and row key. * Return null if no suitable region is located. * * * @param tableName * @param row * @return Null or region location found in cache. */ public RegionLocations getCachedLocation(final TableName tableName, final byte [] row) { // 首先根据tableName获取缓存的该表相关的位置信息tableLocations,它是一个保存的row到RegionLocations映射的跳表结构ConcurrentSkipListMap ConcurrentSkipListMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName); // 从tableLocations中根据row获取到小于或等于row的最大row到RegionLocations映射的条目 Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row); if (e == null) { // 查询不到的话直接返回 return null; } // 获取Region的位置信息possibleRegion RegionLocations possibleRegion = e.getValue(); // make sure that the end key is greater than the row we're looking // for, otherwise the row actually belongs in the next region, not // this one. the exception case is when the endkey is // HConstants.EMPTY_END_ROW, signifying that the region we're // checking is actually the last region in the table. // 从possibleRegion中获取Rgion对应的endKey byte[] endKey = possibleRegion.getRegionLocation().getRegionInfo().getEndKey(); // 如果endKey为byte [0],或者row大于endKey,恭喜,找到啦!!那么我们就返回这个possibleRegion if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || tableName.getRowComparator().compareRows( endKey, 0, endKey.length, row, 0, row.length) > 0) { return possibleRegion; } // Passed all the way through, so we got nothing - complete cache miss // 最后,没办法,没有找到,返回null吧 return null; }逻辑比较简单,注释也很清晰,读者可自行分析。这里要着重说下ConcurrentSkipListMap,它提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够在O(log(n))时间内完成查找、插入、删除操作。读者可自行查阅ConcurrentSkipListMap的相关介绍。而根据tableName获取缓存的该表相关的位置信息tableLocations的getTableLocations()方法,如下:
/** * @param tableName * @return Map of cached locations for passed <code>tableName</code> */ private ConcurrentSkipListMap<byte[], RegionLocations> getTableLocations(final TableName tableName) { // find the map of cached locations for this table ConcurrentSkipListMap<byte[], RegionLocations> result; // 从cachedRegionLocations中根据tableName获取缓存的该表相关的位置信息tableLocations,即一个保存的row到RegionLocations映射的跳表结构ConcurrentSkipListMap // cachedRegionLocations也是一个ConcurrentHashMap,它是MetaCache中实现缓存Region位置信息功能所依靠的最主要的数据结构, // 它存储的是{tableName->[row->RegionLocations]}的两级映射关系 result = this.cachedRegionLocations.get(tableName); // if tableLocations for this table isn't built yet, make one if (result == null) {// 如果result没有创建的话,创建一个 result = new ConcurrentSkipListMap<byte[], RegionLocations>(Bytes.BYTES_COMPARATOR); // 将创建的result放入cachedRegionLocations,并获取旧值old ConcurrentSkipListMap<byte[], RegionLocations> old = this.cachedRegionLocations.putIfAbsent(tableName, result); // 如果old不为空,直接返回old if (old != null) { return old; } } // 返回result return result; }注释十分详细,读者可自行分析。这里要强调的是MetaCache的一个成员变量:cachedRegionLocations,它的定义如下:
/** * Map of table to table {@link HRegionLocation}s. */ private final ConcurrentMap<TableName, ConcurrentSkipListMap<byte[], RegionLocations>> cachedRegionLocations = new ConcurrentHashMap<TableName, ConcurrentSkipListMap<byte[], RegionLocations>>();它是MetaCache中实现缓存Region位置信息功能所依靠的最主要的数据结构,它存储的是{tableName->[row->RegionLocations]}的两级映射关系。而MetaCache中还有一个涉及到所有Server的变量,如下:
// The presence of a server in the map implies it's likely that there is an // entry in cachedRegionLocations that map to this server; but the absence // of a server in this map guarentees that there is no entry in cache that // maps to the absent server. // The access to this attribute must be protected by a lock on cachedRegionLocations private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>();不消多说。
/** * Put a newly discovered HRegionLocation into the cache. * @param tableName The table name. * @param locations the new locations */ public void cacheLocation(final TableName tableName, final RegionLocations locations) { // 从Region位置信息locations中获取Region对应的起始rowkey:startKey byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey(); // 调用getTableLocations()方法,根据表名tableName获取表的位置信息tableLocations // 它是一个Region的起始rowkey,即startKey到RegionLocations的映射 ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName); // 将新得到的Region位置信息locations放入tableLocations,并且得到之前的Region位置信息oldLocation RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations); // 根据oldLocation是否为null判断是否为新缓存的一个条目 boolean isNewCacheEntry = (oldLocation == null); if (isNewCacheEntry) {// 如果是新缓存的一个条目 if (LOG.isTraceEnabled()) { LOG.trace("Cached location: " + locations); } // 调用addToCachedServers()方法,缓存出现的server,加入到cachedServers列表中 addToCachedServers(locations); // 返回 return; } // 如果不是新缓存的一个条目 // merge old and new locations and add it to the cache // 合并新旧Region位置信息,并替换到缓存中 // Meta record might be stale - some (probably the same) server has closed the region // with later seqNum and told us about the new location. // 合并 RegionLocations mergedLocation = oldLocation.mergeLocations(locations); // 替换 boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation); if (replaced && LOG.isTraceEnabled()) { LOG.trace("Merged cached locations: " + mergedLocation); } // 调用addToCachedServers()方法,缓存出现的server,加入到cachedServers列表中 addToCachedServers(locations); }注释同样很详细,这里就不再赘述了。
/** * Returns an HRegionLocationList extracted from the result. * @return an HRegionLocationList containing all locations for the region range or null if * we can't deserialize the result. */ public static RegionLocations getRegionLocations(final Result r) { if (r == null) return null; // 从Result中获取Region信息HRegionInfo,getRegionInfoColumn()返回的为字符串"regioninfo"对应的byte[], // 也就是meta表中对应的qualifier,而family为"info" HRegionInfo regionInfo = getHRegionInfo(r, getRegionInfoColumn()); if (regionInfo == null) return null; List<HRegionLocation> locations = new ArrayList<HRegionLocation>(1); NavigableMap<byte[],NavigableMap<byte[],byte[]>> familyMap = r.getNoVersionMap(); locations.add(getRegionLocation(r, regionInfo, 0)); NavigableMap<byte[], byte[]> infoMap = familyMap.get(getFamily()); if (infoMap == null) return new RegionLocations(locations); // iterate until all serverName columns are seen int replicaId = 0; byte[] serverColumn = getServerColumn(replicaId); SortedMap<byte[], byte[]> serverMap = infoMap.tailMap(serverColumn, false); if (serverMap.isEmpty()) return new RegionLocations(locations); for (Map.Entry<byte[], byte[]> entry : serverMap.entrySet()) { replicaId = parseReplicaIdFromServerColumn(entry.getKey()); if (replicaId < 0) { break; } locations.add(getRegionLocation(r, regionInfo, replicaId)); } return new RegionLocations(locations); }最重要的一点,从Result中获取Region信息HRegionInfo,getRegionInfoColumn()返回的为字符串"regioninfo"对应的byte[],也就是meta表中对应的qualifier,而family为"info",getHRegionInfo()和getRegionInfoColumn()方法如下:
/** * Returns the column qualifier for serialized region info * @return HConstants.REGIONINFO_QUALIFIER */ protected static byte[] getRegionInfoColumn() { return HConstants.REGIONINFO_QUALIFIER; }
/** * Returns the HRegionInfo object from the column {@link HConstants#CATALOG_FAMILY} and * <code>qualifier</code> of the catalog table result. * @param r a Result object from the catalog table scan * @param qualifier Column family qualifier * @return An HRegionInfo instance or null. */ private static HRegionInfo getHRegionInfo(final Result r, byte [] qualifier) { // 获取单元格Cell,family为"info",qualifier为"regioninfo" Cell cell = r.getColumnLatestCell(getFamily(), qualifier); if (cell == null) return null; // 调用HRegionInfo的parseFromOrNull()方法将Cell转换为HRegionInfo, // 实际上就是反序列化,读出HRegionInfo需要的成员变量,比如startKey、endKey、regionId、regionName、split、offLine等 return HRegionInfo.parseFromOrNull(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); }只有两步骤:
/** * Calculate pause time. * Built on {@link HConstants#RETRY_BACKOFF}. * @param pause * @param tries * @return How long to wait after <code>tries</code> retries */ public static long getPauseTime(final long pause, final int tries) { int ntries = tries; // 如果重试次数大于RETRY_BACKOFF的长度(即大于13),重试次数设置为12,目的是能取数组中的数值 if (ntries >= HConstants.RETRY_BACKOFF.length) { // 重试次数设置为12 ntries = HConstants.RETRY_BACKOFF.length - 1; } // 基准的停顿时间normalPause为pause * 重试次数对应在RETRY_BACKOFF数组中的位置的值; // RETRY_BACKOFF为{1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200} // 说明,越往后,停顿时间越长 long normalPause = pause * HConstants.RETRY_BACKOFF[ntries]; // 波动时间jitter为normalPause百分之一基础上乘以一个随机数的波动 long jitter = (long)(normalPause * RANDOM.nextFloat() * 0.01f); // 1% possible jitter // 返回基准时间加波动时间 return normalPause + jitter; }很简单,基本上是越往后,休眠的时间越长,而pause是取参数hbase.client.pause,参数未配置的话,默认为100。
private RegionLocations locateMeta(final TableName tableName, boolean useCache, int replicaId) throws IOException { // HBASE-10785: We cache the location of the META itself, so that we are not overloading // zookeeper with one request for every region lookup. We cache the META with empty row // key in MetaCache. // 获得meta缓存的key,实际上为byte [0] byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta RegionLocations locations = null; // 如果使用缓存的话,调用getCachedLocation()方法,定位Region位置,获得RegionLocations,即locations // 如果locations不为空的话,说明缓存中存在对应数据,直接返回,否则继续往下执行,以定位Region位置 if (useCache) { locations = getCachedLocation(tableName, metaCacheKey); if (locations != null) { return locations; } } // only one thread should do the lookup. // 使用synchronized关键字在metaRegionLock上加互斥锁,确保某一时刻只有一个线程在执行 synchronized (metaRegionLock) { // Check the cache again for a hit in case some other thread made the // same query while we were waiting on the lock. // 再次检查缓存,因为可能在当前线程等待对象metaRegionLock上互斥锁的时候,一些其它线程做相同的查询 ,已经将对应数据加载入缓存 if (useCache) {// 再检查一遍缓存吧 locations = getCachedLocation(tableName, metaCacheKey); if (locations != null) { return locations; } } // Look up from zookeeper // 从zookeeper中寻找 locations = this.registry.getMetaRegionLocation(); if (locations != null) { // 定位到Region后,调用cacheLocation()方法放入缓存中,以备后续访问者可以直接从缓存中读取 cacheLocation(tableName, locations); } } return locations; }Meta表中Region的定位与非Meta表有很大不同,具体流程如下:
1、获得meta缓存的key,实际上为byte [0];
3.1、再次检查缓存,因为可能在当前线程等待对象metaRegionLock上互斥锁的时候,一些其它线程做相同的查询 ,已经将对应数据加载入缓存;
this.registry = setupRegistry();我们再看下这个setupRegistry()方法,代码如下:
/** * @return The cluster registry implementation to use. * @throws IOException */ private Registry setupRegistry() throws IOException { return RegistryFactory.getRegistry(this); }它调用的是RegistryFactory工厂类的静态方法getRegistry()来获得Registry实例的,我们继续往下看:
/** * @return The cluster registry implementation to use. * @throws IOException */ static Registry getRegistry(final Connection connection) throws IOException { // 获取类名registryClass,取参数hbase.client.registry.impl,参数未配置的话默认为ZooKeeperRegistry String registryClass = connection.getConfiguration().get("hbase.client.registry.impl", ZooKeeperRegistry.class.getName()); Registry registry = null; try { // 通过反射获得registryClass的实例registry registry = (Registry)Class.forName(registryClass).newInstance(); } catch (Throwable t) { throw new IOException(t); } // 调用init()方法初始化registry registry.init(connection); // 返回registry return registry; }比较简单,首先获取类名registryClass,取参数hbase.client.registry.impl,参数未配置的话默认为ZooKeeperRegistry,接着通过反射获得registryClass的实例registry,然后调用init()方法初始化registry,最后返回registry。而返回前的初始化操作也比较简单,如下:
@Override public void init(Connection connection) { if (!(connection instanceof ConnectionManager.HConnectionImplementation)) { throw new RuntimeException("This registry depends on HConnectionImplementation"); } this.hci = (ConnectionManager.HConnectionImplementation)connection; }先做connection的判断,看它是否是ConnectionManager.HConnectionImplementation实例,然后将其转化为ConnectionManager.HConnectionImplementation,并赋值给ZooKeeperRegistry的成员变量hci。
@Override public RegionLocations getMetaRegionLocation() throws IOException { // 从hci中获取ZooKeeper连接ZooKeeperKeepAliveConnection,即zkw ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher(); try { if (LOG.isTraceEnabled()) { LOG.trace("Looking up meta region location in ZK," + " connection=" + this); } // 获取ServerName:servername,通过MetaTableLocator实例的blockUntilAvailable()方法获取的 ServerName servername = new MetaTableLocator().blockUntilAvailable(zkw, hci.rpcTimeout); if (LOG.isTraceEnabled()) { LOG.trace("Looked up meta region location, connection=" + this + "; serverName=" + ((servername == null) ? "null" : servername)); } // servername为空的话,直接返回null if (servername == null) return null; // 构造HRegionLocation实例loc, // 需要的参数包括:HRegionInfo.FIRST_META_REGIONINFO、上面获得的servername和默认为0的seqNum, // HRegionInfo的FIRST_META_REGIONINFO实际上就是HRegionInfo的一个实例,其regionId为1L,TableName为TableName.META_TABLE_NAME HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0); // 利用loc构造RegionLocations,实际上RegionLocations中只包含这一个HRegionLocation return new RegionLocations(new HRegionLocation[] {loc}); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { // 关闭zkw zkw.close(); } }getMetaRegionLocation()方法处理流程如下:
sn = getMetaRegionLocation(zkw); if (sn != null || sw.elapsedMillis() > timeout - HConstants.SOCKET_RETRY_WAIT_MS) { break; }而getMetaRegionLocation()方法如下:
/** * Gets the meta region location, if available. Does not block. * * 如果可用的话,获取元表Region的位置。不会阻塞 * * @param zkw zookeeper connection to use * @return server name or null if we failed to get the data. */ @Nullable public ServerName getMetaRegionLocation(final ZooKeeperWatcher zkw) { try { RegionState state = getMetaRegionState(zkw); return state.isOpened() ? state.getServerName() : null; } catch (KeeperException ke) { return null; } }而getMetaRegionState()方法关键代码如下:
byte[] data = ZKUtil.getData(zkw, zkw.metaServerZNode);它利用ZKUtil获取ZooKeeper上的metaServerZNode,而metaServerZNode的初始化如下:
metaServerZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.metaserver", "meta-region-server"));baseZNode取参数zookeeper.znode.parent,参数未配置则默认为/hbase,然后再取参数zookeeper.znode.metaserver,参数未配置则默认为meta-region-server。也就是说,默认情况下,metaserver在ZooKeeper上的位置为/hbase/meta-region-server。