柏辰爸爸 2016-11-29 817浏览量
继上篇《Hadoop-2.6.0NodeManager Restart Recover实现分析(二)》。
@Override protected void initStorage(Configuration conf) throws IOException { Path storeRoot = createStorageDir(conf); Options options = new Options(); options.createIfMissing(false); options.logger(new LeveldbLogger()); LOG.info("Using state database at " + storeRoot + " for recovery"); File dbfile = new File(storeRoot.toString()); try { db = JniDBFactory.factory.open(dbfile, options); } catch (NativeDB.DBException e) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { LOG.info("Creating state database at " + dbfile); isNewlyCreated = true; options.createIfMissing(true); try { db = JniDBFactory.factory.open(dbfile, options); // store version storeVersion(); } catch (DBException dbErr) { throw new IOException(dbErr.getMessage(), dbErr); } } else { throw e; } } checkVersion(); }最主要的是通过JniDBFactory.factory.open(dbfile, options)打开了一个db实例。
@Override protected void closeStorage() throws IOException { if (db != null) { db.close(); } }下面,以Applications为例,罗列下实现方式:
@Override public void storeApplication(ApplicationId appId, ContainerManagerApplicationProto p) throws IOException { String key = APPLICATIONS_KEY_PREFIX + appId; try { db.put(bytes(key), p.toByteArray()); } catch (DBException e) { throw new IOException(e); } }
message ContainerManagerApplicationProto { optional ApplicationIdProto id = 1; optional string user = 2; optional bytes credentials = 3; repeated ApplicationACLMapProto acls = 4; optional LogAggregationContextProto log_aggregation_context = 5; }
@Override public void storeFinishedApplication(ApplicationId appId) throws IOException { String key = FINISHED_APPS_KEY_PREFIX + appId; try { db.put(bytes(key), new byte[0]); } catch (DBException e) { throw new IOException(e); } }其中,key为ContainerManager/finishedApps/再加appId,value为空,即new byte[0]。
@Override public void removeApplication(ApplicationId appId) throws IOException { try { WriteBatch batch = db.createWriteBatch(); try { String key = APPLICATIONS_KEY_PREFIX + appId; batch.delete(bytes(key)); key = FINISHED_APPS_KEY_PREFIX + appId; batch.delete(bytes(key)); db.write(batch); } finally { batch.close(); } } catch (DBException e) { throw new IOException(e); } }根据各自的key分别删除storeApplication()和storeFinishedApplication()存储的Application信息,
@Override public RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); state.applications = new ArrayList<ContainerManagerApplicationProto>(); String keyPrefix = APPLICATIONS_KEY_PREFIX; LeveldbIterator iter = null; try { iter = new LeveldbIterator(db); iter.seek(bytes(keyPrefix)); while (iter.hasNext()) { Entry<byte[], byte[]> entry = iter.next(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { break; } state.applications.add( ContainerManagerApplicationProto.parseFrom(entry.getValue())); } state.finishedApplications = new ArrayList<ApplicationId>(); keyPrefix = FINISHED_APPS_KEY_PREFIX; iter.seek(bytes(keyPrefix)); while (iter.hasNext()) { Entry<byte[], byte[]> entry = iter.next(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { break; } ApplicationId appId = ConverterUtils.toApplicationId(key.substring(keyPrefix.length())); state.finishedApplications.add(appId); } } catch (DBException e) { throw new IOException(e); } finally { if (iter != null) { iter.close(); } } return state; }
通过LeveldbIterator和key的前缀ContainerManager/applications/进行load。
后续会分析哪些地方调用了上述方法,未完待续!
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
时时分享云计算技术内容,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。