核心模块的代码实现
1. package com.imooc.bigdata.hos.server; 2. 3. import java.io.ByteArrayInputStream; 4. import java.io.IOException; 5. import java.io.InputStream; 6. import java.nio.ByteBuffer; 7. import java.util.ArrayList; 8. import java.util.Collections; 9. import java.util.HashMap; 10. import java.util.List; 11. import java.util.Map; 12. 13. import org.apache.curator.framework.CuratorFramework; 14. import org.apache.curator.framework.CuratorFrameworkFactory; 15. import org.apache.curator.framework.recipes.locks.InterProcessMutex; 16. import org.apache.curator.retry.ExponentialBackoffRetry; 17. import org.apache.hadoop.hbase.Cell; 18. import org.apache.hadoop.hbase.CellUtil; 19. import org.apache.hadoop.hbase.TableName; 20. import org.apache.hadoop.hbase.client.Admin; 21. import org.apache.hadoop.hbase.client.Connection; 22. import org.apache.hadoop.hbase.client.Get; 23. import org.apache.hadoop.hbase.client.Put; 24. import org.apache.hadoop.hbase.client.Result; 25. import org.apache.hadoop.hbase.client.ResultScanner; 26. import org.apache.hadoop.hbase.client.Scan; 27. import org.apache.hadoop.hbase.filter.BinaryComparator; 28. import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 29. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 30. import org.apache.hadoop.hbase.filter.FilterList; 31. import org.apache.hadoop.hbase.filter.FilterList.Operator; 32. import org.apache.hadoop.hbase.filter.PageFilter; 33. import org.apache.hadoop.hbase.filter.QualifierFilter; 34. import org.apache.hadoop.hbase.io.ByteBufferInputStream; 35. import org.apache.hadoop.hbase.util.Bytes; 36. import org.apache.log4j.Logger; 37. 38. import com.google.common.base.Strings; 39. import com.imooc.bigdata.hos.common.HosObject; 40. import com.imooc.bigdata.hos.common.HosObjectSummary; 41. import com.imooc.bigdata.hos.common.ObjectListResult; 42. import com.imooc.bigdata.hos.common.ObjectMetaData; 43. import com.imooc.bigdata.hos.common.util.JsonUtil; 44. 45. /** 46. * Created by jixin on 18-3-13. 47. */ 48. public class HosStoreServiceImpl implements IHosStoreService { 49. 50. private static Logger logger = Logger.getLogger(HosStoreServiceImpl.class); 51. private Connection connection = null; 52. private IHdfsService fileStore; 53. private String zkUrls; 54. private CuratorFramework zkClient; 55. 56. public HosStoreServiceImpl(Connection connection, IHdfsService fileStore, String zkurls) 57. throws IOException { 58. this.connection = connection; 59. this.fileStore = fileStore; 60. this.zkUrls = zkurls; 61. zkClient = CuratorFrameworkFactory.newClient(zkUrls, new ExponentialBackoffRetry(20, 5)); 62. zkClient.start(); 63. } 64. 65. @Override 66. public void createSeqTable() throws IOException { 67. Admin admin = connection.getAdmin(); 68. TableName tableName = TableName.valueOf(HosUtil.BUCKET_DIR_SEQ_TABLE); 69. if (admin.tableExists(tableName)) { 70. return; 71. } 72. HBaseService.createTable(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, 73. new String[]{HosUtil.BUCKET_DIR_SEQ_CF}); 74. } 75. 76. @Override 77. public void put(String bucket, String key, ByteBuffer input, long length, String mediaType, 78. Map<String, String> properties) throws Exception { 79. InterProcessMutex lock = null; 80. try { 81. if (key.endsWith("/")) { 82. //put dir object 83. putDir(bucket, key); 84. return; 85. } 86. //put dir 87. String dir = key.substring(0, key.lastIndexOf("/") + 1); 88. String hash = null; 89. while (hash == null) { 90. if (!dirExist(bucket, dir)) { 91. hash = putDir(bucket, dir); 92. } else { 93. hash = this.getDirSeqId(bucket, dir); 94. } 95. } 96. String lockey = key.replaceAll("/", "_"); 97. lock = new InterProcessMutex(this.zkClient, "/mos/" + bucket + "/" + lockey); 98. lock.acquire(); 99. //put 100. String fileKey = hash + "_" + key.substring(key.lastIndexOf("/") + 1); 101. Put contentPut = new Put(fileKey.getBytes()); 102. if (!Strings.isNullOrEmpty(mediaType)) { 103. contentPut.addColumn(HosUtil.OBJ_META_CF_BYTES, 104. HosUtil.OBJ_MEDIATYPE_QUALIFIER, mediaType.getBytes()); 105. } 106. if (properties != null) { 107. String props = JsonUtil.toJson(properties); 108. contentPut.addColumn(HosUtil.OBJ_META_CF_BYTES, 109. HosUtil.OBJ_PROPS_QUALIFIER, props.getBytes()); 110. } 111. contentPut.addColumn(HosUtil.OBJ_META_CF_BYTES, 112. HosUtil.OBJ_LEN_QUALIFIER, Bytes.toBytes((long) length)); 113. 114. if (length <= HosUtil.FILE_STORE_THRESHOLD) { 115. ByteBuffer qualifierBuffer = ByteBuffer.wrap(HosUtil.OBJ_CONT_QUALIFIER); 116. contentPut.addColumn(HosUtil.OBJ_CONT_CF_BYTES, 117. qualifierBuffer, System.currentTimeMillis(), input); 118. qualifierBuffer.clear(); 119. } 120. HBaseService.putRow(connection, HosUtil.getObjTableName(bucket), contentPut); 121. if (length > HosUtil.FILE_STORE_THRESHOLD) { 122. String fileDir = HosUtil.FILE_STORE_ROOT + "/" + bucket + "/" + hash; 123. String name = key.substring(key.lastIndexOf("/") + 1); 124. InputStream inputStream = new ByteBufferInputStream(input); 125. this.fileStore.saveFile(fileDir, name, inputStream, length, getBucketReplication(bucket)); 126. } 127. } finally { 128. if (lock != null) { 129. lock.release(); 130. } 131. } 132. } 133. 134. @Override 135. public HosObjectSummary getSummary(String bucket, String key) throws IOException { 136. if (key.endsWith("/")) { 137. Result result = HBaseService 138. .getRow(connection, HosUtil.getDirTableName(bucket), key); 139. if (!result.isEmpty()) { 140. return this.dirObjectToSummary(result, bucket, key); 141. } else { 142. return null; 143. } 144. } 145. String dir = key.substring(0, key.lastIndexOf("/") + 1); 146. String seq = this.getDirSeqId(bucket, dir); 147. if (seq == null) { 148. return null; 149. } 150. //序列号_文件名 151. String objKey = seq + "_" + key.substring(key.lastIndexOf("/") + 1); 152. Result result = HBaseService 153. .getRow(connection, HosUtil.getObjTableName(bucket), objKey); 154. if (result.isEmpty()) { 155. return null; 156. } 157. return this.resultToObjectSummary(result, bucket, dir); 158. } 159. 160. @Override 161. public List<HosObjectSummary> list(String bucket, String startKey, String endKey) 162. throws IOException { 163. 164. String dir1 = startKey.substring(0, startKey.lastIndexOf("/") + 1).trim(); 165. if (dir1.length() == 0) { 166. dir1 = "/"; 167. } 168. String dir2 = endKey.substring(0, startKey.lastIndexOf("/") + 1).trim(); 169. if (dir2.length() == 0) { 170. dir2 = "/"; 171. } 172. String name1 = startKey.substring(startKey.lastIndexOf("/") + 1); 173. String name2 = endKey.substring(startKey.lastIndexOf("/") + 1); 174. String seqId = this.getDirSeqId(bucket, dir1); 175. //查询dir1中大于name1的全部文件 176. List<HosObjectSummary> keys = new ArrayList<>(); 177. if (seqId != null && name1.length() > 0) { 178. byte[] max = Bytes.createMaxByteArray(100); 179. byte[] tail = Bytes.add(Bytes.toBytes(seqId), max); 180. if (dir1.equals(dir2)) { 181. tail = (seqId + "_" + name2).getBytes(); 182. } 183. byte[] start = (seqId + "_" + name1).getBytes(); 184. ResultScanner scanner1 = HBaseService 185. .scanner(connection, HosUtil.getObjTableName(bucket), start, tail); 186. Result result = null; 187. while ((result = scanner1.next()) != null) { 188. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir1); 189. keys.add(summary); 190. } 191. if (scanner1 != null) { 192. scanner1.close(); 193. } 194. } 195. //startkey~endkey之间的全部目录 196. ResultScanner scanner2 = HBaseService 197. .scanner(connection, HosUtil.getDirTableName(bucket), startKey, endKey); 198. Result result = null; 199. while ((result = scanner2.next()) != null) { 200. String seqId2 = Bytes.toString(result.getValue(HosUtil.DIR_META_CF_BYTES, 201. HosUtil.DIR_SEQID_QUALIFIER)); 202. if (seqId2 == null) { 203. continue; 204. } 205. String dir = Bytes.toString(result.getRow()); 206. keys.add(dirObjectToSummary(result, bucket, dir)); 207. getDirAllFiles(bucket, dir, seqId2, keys, endKey); 208. } 209. if (scanner2 != null) { 210. scanner2.close(); 211. } 212. Collections.sort(keys); 213. return keys; 214. } 215. 216. @Override 217. public ObjectListResult listDir(String bucket, String dir, String start, int maxCount) 218. throws IOException { 219. if (start == null) { 220. start = ""; 221. } 222. //查询目录表 223. Get get = new Get(Bytes.toBytes(dir)); 224. get.addFamily(HosUtil.DIR_SUBDIR_CF_BYTES); 225. if (start.length() > 0) { 226. get.setFilter(new QualifierFilter(CompareOp.GREATER_OR_EQUAL, 227. new BinaryComparator(Bytes.toBytes(start)))); 228. } 229. int maxCount1 = maxCount + 2; 230. Result dirResult = HBaseService 231. .getRow(connection, HosUtil.getDirTableName(bucket), get); 232. List<HosObjectSummary> subDirs = null; 233. if (!dirResult.isEmpty()) { 234. subDirs = new ArrayList<>(); 235. for (Cell cell : dirResult.rawCells()) { 236. HosObjectSummary summary = new HosObjectSummary(); 237. byte[] qualifierBytes = new byte[cell.getQualifierLength()]; 238. CellUtil.copyQualifierTo(cell, qualifierBytes, 0); 239. String name = Bytes.toString(qualifierBytes); 240. summary.setKey(dir + name + "/"); 241. summary.setName(name); 242. summary.setLastModifyTime(cell.getTimestamp()); 243. summary.setMediaType(""); 244. summary.setBucket(bucket); 245. summary.setLength(0); 246. subDirs.add(summary); 247. if (subDirs.size() >= maxCount1) { 248. break; 249. } 250. } 251. } 252. 253. //查询文件表 254. String dirSeq = this.getDirSeqId(bucket, dir); 255. byte[] objStart = Bytes.toBytes(dirSeq + "_" + start); 256. Scan objScan = new Scan(); 257. objScan.setRowPrefixFilter(Bytes.toBytes(dirSeq + "_")); 258. /** 259. * 有这样一个场景,在HBase中需要分页查询,同时根据某一列的值进行过滤。 260. * 不同于RDBMS天然支持分页查询,HBase要进行分页必须由自己实现。据我了解的,目前有两种方案, 261. * 一是《HBase权威指南》中提到的用PageFilter加循环动态设置startRow实现, 262. * 详细见这里。但这种方法效率比较低,且有冗余查询。 263. * 因此京东研发了一种用额外的一张表来保存行序号的方案。 该种方案效率较高,但实现麻烦些,需要维护一张额外的表 264. * */ 265. objScan.setFilter(new PageFilter(maxCount + 1)); 266. objScan.setStartRow(objStart); 267. objScan.setMaxResultsPerColumnFamily(maxCount1); 268. objScan.addFamily(HosUtil.OBJ_META_CF_BYTES); 269. logger.info("scan start: " + Bytes.toString(objStart) + " - "); 270. ResultScanner objScanner = HBaseService 271. .scanner(connection, HosUtil.getObjTableName(bucket), objScan); 272. List<HosObjectSummary> objectSummaryList = new ArrayList<>(); 273. Result result = null; 274. while (objectSummaryList.size() < maxCount1 && (result = objScanner.next()) != null) { 275. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir); 276. objectSummaryList.add(summary); 277. } 278. if (objScanner != null) { 279. objScanner.close(); 280. } 281. 282. 283. //返回给用户maxCount 284. logger.info("scan complete: " + Bytes.toString(objStart) + " - "); 285. if (subDirs != null && subDirs.size() > 0) { 286. objectSummaryList.addAll(subDirs); 287. } 288. Collections.sort(objectSummaryList); 289. ObjectListResult listResult = new ObjectListResult(); 290. HosObjectSummary nextMarkerObj = 291. objectSummaryList.size() > maxCount ? objectSummaryList.get(objectSummaryList.size() - 1) 292. : null; 293. if (nextMarkerObj != null) { 294. listResult.setNextMarker(nextMarkerObj.getKey()); 295. } 296. if (objectSummaryList.size() > maxCount) { 297. objectSummaryList = objectSummaryList.subList(0, maxCount); 298. } 299. listResult.setMaxKeyNumber(maxCount); 300. if (objectSummaryList.size() > 0) { 301. listResult.setMinKey(objectSummaryList.get(0).getKey()); 302. listResult.setMaxKey(objectSummaryList.get(objectSummaryList.size() - 1).getKey()); 303. } 304. listResult.setObjectCount(objectSummaryList.size()); 305. listResult.setObjectList(objectSummaryList); 306. listResult.setBucket(bucket); 307. 308. return listResult; 309. } 310. 311. @Override 312. public ObjectListResult listByPrefix(String bucket, String dir, String keyPrefix, String start, 313. int maxCount) throws IOException { 314. if (start == null) { 315. start = ""; 316. } 317. FilterList filterList = new FilterList(Operator.MUST_PASS_ALL); 318. filterList.addFilter(new ColumnPrefixFilter(keyPrefix.getBytes())); 319. if (start.length() > 0) { 320. filterList.addFilter(new QualifierFilter(CompareOp.GREATER_OR_EQUAL, 321. new BinaryComparator(Bytes.toBytes(start)))); 322. } 323. int maxCount1 = maxCount + 2; 324. Result dirResult = HBaseService 325. .getRow(connection, HosUtil.getDirTableName(bucket), dir, filterList); 326. List<HosObjectSummary> subDirs = null; 327. if (!dirResult.isEmpty()) { 328. subDirs = new ArrayList<>(); 329. for (Cell cell : dirResult.rawCells()) { 330. HosObjectSummary summary = new HosObjectSummary(); 331. byte[] qualifierBytes = new byte[cell.getQualifierLength()]; 332. CellUtil.copyQualifierTo(cell, qualifierBytes, 0); 333. String name = Bytes.toString(qualifierBytes); 334. summary.setKey(dir + name + "/"); 335. summary.setName(name); 336. summary.setLastModifyTime(cell.getTimestamp()); 337. summary.setMediaType(""); 338. summary.setBucket(bucket); 339. summary.setLength(0); 340. subDirs.add(summary); 341. if (subDirs.size() >= maxCount1) { 342. break; 343. } 344. } 345. } 346. 347. String dirSeq = this.getDirSeqId(bucket, dir); 348. byte[] objStart = Bytes.toBytes(dirSeq + "_" + start); 349. Scan objScan = new Scan(); 350. objScan.setRowPrefixFilter(Bytes.toBytes(dirSeq + "_" + keyPrefix)); 351. objScan.setFilter(new PageFilter(maxCount + 1)); 352. objScan.setStartRow(objStart); 353. objScan.setMaxResultsPerColumnFamily(maxCount1); 354. objScan.addFamily(HosUtil.OBJ_META_CF_BYTES); 355. logger.info("scan start: " + Bytes.toString(objStart) + " - "); 356. ResultScanner objScanner = HBaseService 357. .scanner(connection, HosUtil.getObjTableName(bucket), objScan); 358. List<HosObjectSummary> objectSummaryList = new ArrayList<>(); 359. Result result = null; 360. while (objectSummaryList.size() < maxCount1 && (result = objScanner.next()) != null) { 361. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir); 362. objectSummaryList.add(summary); 363. } 364. if (objScanner != null) { 365. objScanner.close(); 366. } 367. logger.info("scan complete: " + Bytes.toString(objStart) + " - "); 368. if (subDirs != null && subDirs.size() > 0) { 369. objectSummaryList.addAll(subDirs); 370. } 371. Collections.sort(objectSummaryList); 372. ObjectListResult listResult = new ObjectListResult(); 373. HosObjectSummary nextMarkerObj = 374. objectSummaryList.size() > maxCount ? objectSummaryList.get(objectSummaryList.size() - 1) 375. : null; 376. if (nextMarkerObj != null) { 377. listResult.setNextMarker(nextMarkerObj.getKey()); 378. } 379. if (objectSummaryList.size() > maxCount) { 380. objectSummaryList = objectSummaryList.subList(0, maxCount); 381. } 382. listResult.setMaxKeyNumber(maxCount); 383. if (objectSummaryList.size() > 0) { 384. listResult.setMinKey(objectSummaryList.get(0).getKey()); 385. listResult.setMaxKey(objectSummaryList.get(objectSummaryList.size() - 1).getKey()); 386. } 387. listResult.setObjectCount(objectSummaryList.size()); 388. listResult.setObjectList(objectSummaryList); 389. listResult.setBucket(bucket); 390. 391. return listResult; 392. } 393. 394. @Override 395. public HosObject getObject(String bucket, String key) throws IOException { 396. if (key.endsWith("/")) { 397. Result result = HBaseService 398. .getRow(connection, HosUtil.getDirTableName(bucket), key); 399. if (result.isEmpty()) { 400. return null; 401. } 402. ObjectMetaData metaData = new ObjectMetaData(); 403. metaData.setBucket(bucket); 404. metaData.setKey(key); 405. metaData.setLastModifyTime(result.rawCells()[0].getTimestamp()); 406. metaData.setLength(0); 407. HosObject object = new HosObject(); 408. object.setMetaData(metaData); 409. return object; 410. } 411. String dir = key.substring(0, key.lastIndexOf("/") + 1); 412. String name = key.substring(key.lastIndexOf("/") + 1); 413. String seq = this.getDirSeqId(bucket, dir); 414. String objKey = seq + "_" + name; 415. Result result = HBaseService 416. .getRow(connection, HosUtil.getObjTableName(bucket), objKey); 417. if (result.isEmpty()) { 418. return null; 419. } 420. HosObject object = new HosObject(); 421. if (result.containsNonEmptyColumn(HosUtil.OBJ_CONT_CF_BYTES, 422. HosUtil.OBJ_CONT_QUALIFIER)) { 423. ByteArrayInputStream bas = new ByteArrayInputStream( 424. result 425. .getValue(HosUtil.OBJ_CONT_CF_BYTES, 426. HosUtil.OBJ_CONT_QUALIFIER)); 427. object.setContent(bas); 428. } else { 429. String fileDir = HosUtil.FILE_STORE_ROOT + "/" + bucket + "/" + seq; 430. InputStream inputStream = this.fileStore.openFile(fileDir, name); 431. object.setContent(inputStream); 432. } 433. long len = Bytes.toLong(result.getValue(HosUtil.OBJ_META_CF_BYTES, 434. HosUtil.OBJ_LEN_QUALIFIER)); 435. ObjectMetaData metaData = new ObjectMetaData(); 436. metaData.setBucket(bucket); 437. metaData.setKey(key); 438. metaData.setLastModifyTime(result.rawCells()[0].getTimestamp()); 439. metaData.setLength(len); 440. metaData.setMediaType(Bytes.toString(result.getValue(HosUtil.OBJ_META_CF_BYTES, 441. HosUtil.OBJ_MEDIATYPE_QUALIFIER))); 442. byte[] b = result 443. .getValue(HosUtil.OBJ_META_CF_BYTES, HosUtil.OBJ_PROPS_QUALIFIER); 444. if (b != null) { 445. metaData.setAttrs(JsonUtil.fromJson(Map.class, Bytes.toString(b))); 446. } 447. object.setMetaData(metaData); 448. return object; 449. } 450. 451. @Override 452. public void deleteObject(String bucket, String key) throws Exception { 453. if (key.endsWith("/")) { 454. //check sub dir and current dir files. 455. if (!isDirEmpty(bucket, key)) { 456. throw new RuntimeException("dir is not empty"); 457. } 458. InterProcessMutex lock = null; 459. try { 460. String lockey = key.replaceAll("/", "_"); 461. lock = new InterProcessMutex(this.zkClient, "/mos/" + bucket + "/" + lockey); 462. lock.acquire(); 463. if (!isDirEmpty(bucket, key)) { 464. throw new RuntimeException("dir is not empty"); 465. } 466. String dir1 = key.substring(0, key.lastIndexOf("/")); 467. String name = dir1.substring(dir1.lastIndexOf("/") + 1); 468. if (name.length() > 0) { 469. String parent = key.substring(0, key.lastIndexOf(name)); 470. HBaseService 471. .deleteQualifier(connection, HosUtil.getDirTableName(bucket), parent, 472. HosUtil.DIR_SUBDIR_CF, name); 473. } 474. HBaseService.delete(connection, HosUtil.getDirTableName(bucket), key); 475. return; 476. } finally { 477. if (lock != null) { 478. lock.release(); 479. } 480. } 481. } 482. String dir = key.substring(0, key.lastIndexOf("/") + 1); 483. String name = key.substring(key.lastIndexOf("/") + 1); 484. String seqId = this.getDirSeqId(bucket, dir); 485. String objKey = seqId + "_" + name; 486. Result result = HBaseService 487. .getRow(connection, HosUtil.getObjTableName(bucket), objKey, 488. HosUtil.OBJ_META_CF_BYTES, HosUtil.OBJ_LEN_QUALIFIER); 489. if (result.isEmpty()) { 490. return; 491. } 492. long len = Bytes.toLong(result.getValue(HosUtil.OBJ_META_CF_BYTES, 493. HosUtil.OBJ_LEN_QUALIFIER)); 494. if (len > HosUtil.FILE_STORE_THRESHOLD) { 495. String fileDir = HosUtil.FILE_STORE_ROOT + "/" + bucket + "/" + seqId; 496. this.fileStore.deleteFile(fileDir, name); 497. } 498. HBaseService.delete(connection, HosUtil.getObjTableName(bucket), objKey); 499. } 500. 501. private boolean isDirEmpty(String bucket, String dir) throws IOException { 502. return listDir(bucket, dir, null, 2).getObjectList().size() == 0; 503. } 504. 505. @Override 506. public void deleteBucketStore(String bucket) throws IOException { 507. HBaseService.deleteTable(connection, HosUtil.getDirTableName(bucket)); 508. HBaseService.deleteTable(connection, HosUtil.getObjTableName(bucket)); 509. 510. HBaseService.delete(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, bucket); 511. this.fileStore.deleteDir(HosUtil.FILE_STORE_ROOT + "/" + bucket); 512. } 513. 514. @Override 515. public void createBucketStore(String bucket) throws IOException { 516. HBaseService.createTable(connection, HosUtil.getDirTableName(bucket), 517. HosUtil.getDirColumnFamily()); 518. HBaseService.createTable(connection, HosUtil.getObjTableName(bucket), 519. HosUtil.getObjColumnFamily(), HosUtil.OBJ_REGIONS); 520. 521. Put put = new Put(Bytes.toBytes(bucket)); 522. put.addColumn(HosUtil.BUCKET_DIR_SEQ_CF_BYTES, 523. HosUtil.BUCKET_DIR_SEQ_QUALIFIER, Bytes.toBytes(0L)); 524. HBaseService.putRow(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, put); 525. this.fileStore.mikDir(HosUtil.FILE_STORE_ROOT + "/" + bucket); 526. } 527. 528. /** 529. * seqId由bucket和dir唯一确定 530. * @param bucket 531. * @param dir 532. * @return 533. * @throws IOException 534. */ 535. private String getDirSeqId(String bucket, String dir) throws IOException { 536. Result result = HBaseService.getRow(connection, HosUtil.getDirTableName(bucket), dir); 537. if (result.isEmpty()) { 538. return null; 539. } 540. String dirSeqId = Bytes.toString(result.getValue(HosUtil.DIR_META_CF_BYTES, 541. HosUtil.DIR_SEQID_QUALIFIER)); 542. return dirSeqId; 543. } 544. 545. private void getDirAllFiles(String bucket, String dir, String seqId, List<HosObjectSummary> keys, 546. String endKey) throws IOException { 547. 548. byte[] max = Bytes.createMaxByteArray(100); 549. byte[] tail = Bytes.add(Bytes.toBytes(seqId), max); 550. if (endKey.startsWith(dir)) { 551. String endKeyLeft = endKey.replace(dir, ""); 552. String fileNameMax = endKeyLeft; 553. if (endKeyLeft.indexOf("/") > 0) { 554. fileNameMax = endKeyLeft.substring(0, endKeyLeft.indexOf("/")); 555. } 556. tail = Bytes.toBytes(seqId + "_" + fileNameMax); 557. } 558. 559. Scan scan = new Scan(Bytes.toBytes(seqId), tail); 560. scan.setFilter(HosUtil.OBJ_META_SCAN_FILTER); 561. ResultScanner scanner = HBaseService 562. .scanner(connection, HosUtil.getObjTableName(bucket), scan); 563. Result result = null; 564. while ((result = scanner.next()) != null) { 565. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir); 566. keys.add(summary); 567. } 568. if (scanner != null) { 569. scanner.close(); 570. } 571. } 572. 573. private HosObjectSummary resultToObjectSummary(Result result, String bucket, String dir) 574. throws IOException { 575. HosObjectSummary summary = new HosObjectSummary(); 576. long timestamp = result.rawCells()[0].getTimestamp(); 577. summary.setLastModifyTime(timestamp); 578. String id = new String(result.getRow()); 579. summary.setId(id); 580. String name = id.split("_", 2)[1]; 581. String key = dir + name; 582. summary.setKey(key); 583. summary.setName(name); 584. summary.setBucket(bucket); 585. String s = Bytes.toString(result.getValue(HosUtil.OBJ_META_CF_BYTES, 586. HosUtil.OBJ_PROPS_QUALIFIER)); 587. if (s != null) { 588. summary.setAttrs(JsonUtil.fromJson(Map.class, s)); 589. } 590. summary.setLength(Bytes.toLong(result.getValue(HosUtil.OBJ_META_CF_BYTES, 591. HosUtil.OBJ_LEN_QUALIFIER))); 592. summary 593. .setMediaType(Bytes.toString(result.getValue(HosUtil.OBJ_META_CF_BYTES, 594. HosUtil.OBJ_MEDIATYPE_QUALIFIER))); 595. 596. return summary; 597. } 598. 599. private HosObjectSummary dirObjectToSummary(Result result, String bucket, String dir) { 600. HosObjectSummary summary = new HosObjectSummary(); 601. String id = Bytes.toString(result.getRow()); 602. summary.setId(id); 603. summary.setAttrs(new HashMap<>(0)); 604. if (dir.length() > 1) { 605. summary.setName(dir.substring(dir.lastIndexOf("/") + 1)); 606. } else { 607. summary.setName(""); 608. } 609. summary.setBucket(bucket); 610. summary.setKey(dir); 611. summary.setLastModifyTime(result.rawCells()[0].getTimestamp()); 612. summary.setLength(0); 613. summary.setMediaType(""); 614. return summary; 615. } 616. 617. 618. private short getBucketReplication(String bucket) { 619. return 2; 620. } 621. 622. private String putDir(String bucket, String dir) throws Exception { 623. if (dirExist(bucket, dir)) { 624. return null; 625. } 626. InterProcessMutex lock = null; 627. try { 628. String lockey = dir.replaceAll("/", "_"); 629. lock = new InterProcessMutex(this.zkClient, "/mos/" + bucket + "/" + lockey); 630. lock.acquire(); 631. String dir1 = dir.substring(0, dir.lastIndexOf("/")); 632. String name = dir1.substring(dir1.lastIndexOf("/") + 1); 633. if (name.length() > 0) { 634. String parent = dir.substring(0, dir1.lastIndexOf("/") + 1); 635. if (!this.dirExist(bucket, parent)) { 636. this.putDir(bucket, parent); 637. } 638. Put put = new Put(Bytes.toBytes(parent)); 639. put.addColumn(HosUtil.DIR_SUBDIR_CF_BYTES, Bytes.toBytes(name), 640. Bytes.toBytes('1')); 641. HBaseService.putRow(connection, HosUtil.getDirTableName(bucket), put); 642. } 643. String seqId = this.getDirSeqId(bucket, dir); 644. String hash = seqId == null ? makeDirSeqId(bucket) : seqId; 645. Put hashPut = new Put(dir.getBytes()); 646. hashPut.addColumn(HosUtil.DIR_META_CF_BYTES, 647. HosUtil.DIR_SEQID_QUALIFIER, Bytes.toBytes(hash)); 648. HBaseService.putRow(connection, HosUtil.getDirTableName(bucket), hashPut); 649. return hash; 650. } finally { 651. if (lock != null) { 652. lock.release(); 653. } 654. } 655. } 656. 657. private String makeDirSeqId(String bucket) throws IOException { 658. long v = HBaseService 659. .incrementColumnValue(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, bucket, 660. HosUtil.BUCKET_DIR_SEQ_CF_BYTES, HosUtil.BUCKET_DIR_SEQ_QUALIFIER, 661. 1); 662. return String.format("%da%d", v % 64, v); 663. } 664. 665. private boolean dirExist(String bucket, String dir) throws IOException { 666. return HBaseService.existsRow(connection, HosUtil.getDirTableName(bucket), dir); 667. } 668. }
1.用户上传或删除文件时需要加分布式zk锁
2.设计思想需要认真反复看
Hbase的调优