基于Hbase和SpringBoot的分布式HOS文件存储系统(二)

简介: 基于Hbase和SpringBoot的分布式HOS文件存储系统

核心模块的代码实现

1. package com.imooc.bigdata.hos.server;
2. 
3. import java.io.ByteArrayInputStream;
4. import java.io.IOException;
5. import java.io.InputStream;
6. import java.nio.ByteBuffer;
7. import java.util.ArrayList;
8. import java.util.Collections;
9. import java.util.HashMap;
10. import java.util.List;
11. import java.util.Map;
12. 
13. import org.apache.curator.framework.CuratorFramework;
14. import org.apache.curator.framework.CuratorFrameworkFactory;
15. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
16. import org.apache.curator.retry.ExponentialBackoffRetry;
17. import org.apache.hadoop.hbase.Cell;
18. import org.apache.hadoop.hbase.CellUtil;
19. import org.apache.hadoop.hbase.TableName;
20. import org.apache.hadoop.hbase.client.Admin;
21. import org.apache.hadoop.hbase.client.Connection;
22. import org.apache.hadoop.hbase.client.Get;
23. import org.apache.hadoop.hbase.client.Put;
24. import org.apache.hadoop.hbase.client.Result;
25. import org.apache.hadoop.hbase.client.ResultScanner;
26. import org.apache.hadoop.hbase.client.Scan;
27. import org.apache.hadoop.hbase.filter.BinaryComparator;
28. import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
29. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
30. import org.apache.hadoop.hbase.filter.FilterList;
31. import org.apache.hadoop.hbase.filter.FilterList.Operator;
32. import org.apache.hadoop.hbase.filter.PageFilter;
33. import org.apache.hadoop.hbase.filter.QualifierFilter;
34. import org.apache.hadoop.hbase.io.ByteBufferInputStream;
35. import org.apache.hadoop.hbase.util.Bytes;
36. import org.apache.log4j.Logger;
37. 
38. import com.google.common.base.Strings;
39. import com.imooc.bigdata.hos.common.HosObject;
40. import com.imooc.bigdata.hos.common.HosObjectSummary;
41. import com.imooc.bigdata.hos.common.ObjectListResult;
42. import com.imooc.bigdata.hos.common.ObjectMetaData;
43. import com.imooc.bigdata.hos.common.util.JsonUtil;
44. 
45. /**
46.  * Created by jixin on 18-3-13.
47.  */
48. public class HosStoreServiceImpl implements IHosStoreService {
49. 
50. private static Logger logger = Logger.getLogger(HosStoreServiceImpl.class);
51. private Connection connection = null;
52. private IHdfsService fileStore;
53. private String zkUrls;
54. private CuratorFramework zkClient;
55. 
56. public HosStoreServiceImpl(Connection connection, IHdfsService fileStore, String zkurls)
57. throws IOException {
58. this.connection = connection;
59. this.fileStore = fileStore;
60. this.zkUrls = zkurls;
61.     zkClient = CuratorFrameworkFactory.newClient(zkUrls, new ExponentialBackoffRetry(20, 5));
62.     zkClient.start();
63.   }
64. 
65. @Override
66. public void createSeqTable() throws IOException {
67. Admin admin = connection.getAdmin();
68. TableName tableName = TableName.valueOf(HosUtil.BUCKET_DIR_SEQ_TABLE);
69. if (admin.tableExists(tableName)) {
70. return;
71.     }
72.     HBaseService.createTable(connection, HosUtil.BUCKET_DIR_SEQ_TABLE,
73. new String[]{HosUtil.BUCKET_DIR_SEQ_CF});
74.   }
75. 
76. @Override
77. public void put(String bucket, String key, ByteBuffer input, long length, String mediaType,
78.       Map<String, String> properties) throws Exception {
79. InterProcessMutex lock = null;
80. try {
81. if (key.endsWith("/")) {
82. //put dir object
83.         putDir(bucket, key);
84. return;
85.       }
86. //put dir
87. String dir = key.substring(0, key.lastIndexOf("/") + 1);
88. String hash = null;
89. while (hash == null) {
90. if (!dirExist(bucket, dir)) {
91.           hash = putDir(bucket, dir);
92.         } else {
93.           hash = this.getDirSeqId(bucket, dir);
94.         }
95.       }
96. String lockey = key.replaceAll("/", "_");
97.       lock = new InterProcessMutex(this.zkClient, "/mos/" + bucket + "/" + lockey);
98.       lock.acquire();
99. //put
100. String fileKey = hash + "_" + key.substring(key.lastIndexOf("/") + 1);
101. Put contentPut = new Put(fileKey.getBytes());
102. if (!Strings.isNullOrEmpty(mediaType)) {
103.         contentPut.addColumn(HosUtil.OBJ_META_CF_BYTES,
104.             HosUtil.OBJ_MEDIATYPE_QUALIFIER, mediaType.getBytes());
105.       }
106. if (properties != null) {
107. String props = JsonUtil.toJson(properties);
108.         contentPut.addColumn(HosUtil.OBJ_META_CF_BYTES,
109.             HosUtil.OBJ_PROPS_QUALIFIER, props.getBytes());
110.       }
111.       contentPut.addColumn(HosUtil.OBJ_META_CF_BYTES,
112.           HosUtil.OBJ_LEN_QUALIFIER, Bytes.toBytes((long) length));
113. 
114. if (length <= HosUtil.FILE_STORE_THRESHOLD) {
115. ByteBuffer qualifierBuffer = ByteBuffer.wrap(HosUtil.OBJ_CONT_QUALIFIER);
116.         contentPut.addColumn(HosUtil.OBJ_CONT_CF_BYTES,
117.             qualifierBuffer, System.currentTimeMillis(), input);
118.         qualifierBuffer.clear();
119.       }
120.       HBaseService.putRow(connection, HosUtil.getObjTableName(bucket), contentPut);
121. if (length > HosUtil.FILE_STORE_THRESHOLD) {
122. String fileDir = HosUtil.FILE_STORE_ROOT + "/" + bucket + "/" + hash;
123. String name = key.substring(key.lastIndexOf("/") + 1);
124. InputStream inputStream = new ByteBufferInputStream(input);
125. this.fileStore.saveFile(fileDir, name, inputStream, length, getBucketReplication(bucket));
126.       }
127.     } finally {
128. if (lock != null) {
129.         lock.release();
130.       }
131.     }
132.   }
133. 
134. @Override
135. public HosObjectSummary getSummary(String bucket, String key) throws IOException {
136. if (key.endsWith("/")) {
137. Result result = HBaseService
138.           .getRow(connection, HosUtil.getDirTableName(bucket), key);
139. if (!result.isEmpty()) {
140. return this.dirObjectToSummary(result, bucket, key);
141.       } else {
142. return null;
143.       }
144.     }
145. String dir = key.substring(0, key.lastIndexOf("/") + 1);
146. String seq = this.getDirSeqId(bucket, dir);
147. if (seq == null) {
148. return null;
149.     }
150. //序列号_文件名
151. String objKey = seq + "_" + key.substring(key.lastIndexOf("/") + 1);
152. Result result = HBaseService
153.         .getRow(connection, HosUtil.getObjTableName(bucket), objKey);
154. if (result.isEmpty()) {
155. return null;
156.     }
157. return this.resultToObjectSummary(result, bucket, dir);
158.   }
159. 
160. @Override
161. public List<HosObjectSummary> list(String bucket, String startKey, String endKey)
162. throws IOException {
163. 
164. String dir1 = startKey.substring(0, startKey.lastIndexOf("/") + 1).trim();
165. if (dir1.length() == 0) {
166.       dir1 = "/";
167.     }
168. String dir2 = endKey.substring(0, startKey.lastIndexOf("/") + 1).trim();
169. if (dir2.length() == 0) {
170.       dir2 = "/";
171.     }
172. String name1 = startKey.substring(startKey.lastIndexOf("/") + 1);
173. String name2 = endKey.substring(startKey.lastIndexOf("/") + 1);
174. String seqId = this.getDirSeqId(bucket, dir1);
175. //查询dir1中大于name1的全部文件
176.     List<HosObjectSummary> keys = new ArrayList<>();
177. if (seqId != null && name1.length() > 0) {
178. byte[] max = Bytes.createMaxByteArray(100);
179. byte[] tail = Bytes.add(Bytes.toBytes(seqId), max);
180. if (dir1.equals(dir2)) {
181.         tail = (seqId + "_" + name2).getBytes();
182.       }
183. byte[] start = (seqId + "_" + name1).getBytes();
184. ResultScanner scanner1 = HBaseService
185.           .scanner(connection, HosUtil.getObjTableName(bucket), start, tail);
186. Result result = null;
187. while ((result = scanner1.next()) != null) {
188. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir1);
189.         keys.add(summary);
190.       }
191. if (scanner1 != null) {
192.         scanner1.close();
193.       }
194.     }
195. //startkey~endkey之间的全部目录
196. ResultScanner scanner2 = HBaseService
197.         .scanner(connection, HosUtil.getDirTableName(bucket), startKey, endKey);
198. Result result = null;
199. while ((result = scanner2.next()) != null) {
200. String seqId2 = Bytes.toString(result.getValue(HosUtil.DIR_META_CF_BYTES,
201.           HosUtil.DIR_SEQID_QUALIFIER));
202. if (seqId2 == null) {
203. continue;
204.       }
205. String dir = Bytes.toString(result.getRow());
206.       keys.add(dirObjectToSummary(result, bucket, dir));
207.       getDirAllFiles(bucket, dir, seqId2, keys, endKey);
208.     }
209. if (scanner2 != null) {
210.       scanner2.close();
211.     }
212.     Collections.sort(keys);
213. return keys;
214.   }
215. 
216. @Override
217. public ObjectListResult listDir(String bucket, String dir, String start, int maxCount)
218. throws IOException {
219. if (start == null) {
220.       start = "";
221.     }
222. //查询目录表
223. Get get = new Get(Bytes.toBytes(dir));
224.     get.addFamily(HosUtil.DIR_SUBDIR_CF_BYTES);
225. if (start.length() > 0) {
226.       get.setFilter(new QualifierFilter(CompareOp.GREATER_OR_EQUAL,
227. new BinaryComparator(Bytes.toBytes(start))));
228.     }
229. int maxCount1 = maxCount + 2;
230. Result dirResult = HBaseService
231.         .getRow(connection, HosUtil.getDirTableName(bucket), get);
232.     List<HosObjectSummary> subDirs = null;
233. if (!dirResult.isEmpty()) {
234.       subDirs = new ArrayList<>();
235. for (Cell cell : dirResult.rawCells()) {
236. HosObjectSummary summary = new HosObjectSummary();
237. byte[] qualifierBytes = new byte[cell.getQualifierLength()];
238.         CellUtil.copyQualifierTo(cell, qualifierBytes, 0);
239. String name = Bytes.toString(qualifierBytes);
240.         summary.setKey(dir + name + "/");
241.         summary.setName(name);
242.         summary.setLastModifyTime(cell.getTimestamp());
243.         summary.setMediaType("");
244.         summary.setBucket(bucket);
245.         summary.setLength(0);
246.         subDirs.add(summary);
247. if (subDirs.size() >= maxCount1) {
248. break;
249.         }
250.       }
251.     }
252. 
253. //查询文件表
254. String dirSeq = this.getDirSeqId(bucket, dir);
255. byte[] objStart = Bytes.toBytes(dirSeq + "_" + start);
256. Scan objScan = new Scan();
257.     objScan.setRowPrefixFilter(Bytes.toBytes(dirSeq + "_"));
258. /**
259.  *     有这样一个场景,在HBase中需要分页查询,同时根据某一列的值进行过滤。
260.  *     不同于RDBMS天然支持分页查询,HBase要进行分页必须由自己实现。据我了解的,目前有两种方案,
261.  *     一是《HBase权威指南》中提到的用PageFilter加循环动态设置startRow实现,
262.  *     详细见这里。但这种方法效率比较低,且有冗余查询。
263.  *     因此京东研发了一种用额外的一张表来保存行序号的方案。 该种方案效率较高,但实现麻烦些,需要维护一张额外的表
264.  * */
265.     objScan.setFilter(new PageFilter(maxCount + 1));
266.     objScan.setStartRow(objStart);
267.     objScan.setMaxResultsPerColumnFamily(maxCount1);
268.     objScan.addFamily(HosUtil.OBJ_META_CF_BYTES);
269.     logger.info("scan start: " + Bytes.toString(objStart) + " - ");
270. ResultScanner objScanner = HBaseService
271.         .scanner(connection, HosUtil.getObjTableName(bucket), objScan);
272.     List<HosObjectSummary> objectSummaryList = new ArrayList<>();
273. Result result = null;
274. while (objectSummaryList.size() < maxCount1 && (result = objScanner.next()) != null) {
275. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir);
276.       objectSummaryList.add(summary);
277.     }
278. if (objScanner != null) {
279.       objScanner.close();
280.     }
281. 
282. 
283. //返回给用户maxCount
284.     logger.info("scan complete: " + Bytes.toString(objStart) + " - ");
285. if (subDirs != null && subDirs.size() > 0) {
286.       objectSummaryList.addAll(subDirs);
287.     }
288.     Collections.sort(objectSummaryList);
289. ObjectListResult listResult = new ObjectListResult();
290. HosObjectSummary nextMarkerObj =
291.         objectSummaryList.size() > maxCount ? objectSummaryList.get(objectSummaryList.size() - 1)
292.             : null;
293. if (nextMarkerObj != null) {
294.       listResult.setNextMarker(nextMarkerObj.getKey());
295.     }
296. if (objectSummaryList.size() > maxCount) {
297.       objectSummaryList = objectSummaryList.subList(0, maxCount);
298.     }
299.     listResult.setMaxKeyNumber(maxCount);
300. if (objectSummaryList.size() > 0) {
301.       listResult.setMinKey(objectSummaryList.get(0).getKey());
302.       listResult.setMaxKey(objectSummaryList.get(objectSummaryList.size() - 1).getKey());
303.     }
304.     listResult.setObjectCount(objectSummaryList.size());
305.     listResult.setObjectList(objectSummaryList);
306.     listResult.setBucket(bucket);
307. 
308. return listResult;
309.   }
310. 
311. @Override
312. public ObjectListResult listByPrefix(String bucket, String dir, String keyPrefix, String start,
313. int maxCount) throws IOException {
314. if (start == null) {
315.       start = "";
316.     }
317. FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
318.     filterList.addFilter(new ColumnPrefixFilter(keyPrefix.getBytes()));
319. if (start.length() > 0) {
320.       filterList.addFilter(new QualifierFilter(CompareOp.GREATER_OR_EQUAL,
321. new BinaryComparator(Bytes.toBytes(start))));
322.     }
323. int maxCount1 = maxCount + 2;
324. Result dirResult = HBaseService
325.         .getRow(connection, HosUtil.getDirTableName(bucket), dir, filterList);
326.     List<HosObjectSummary> subDirs = null;
327. if (!dirResult.isEmpty()) {
328.       subDirs = new ArrayList<>();
329. for (Cell cell : dirResult.rawCells()) {
330. HosObjectSummary summary = new HosObjectSummary();
331. byte[] qualifierBytes = new byte[cell.getQualifierLength()];
332.         CellUtil.copyQualifierTo(cell, qualifierBytes, 0);
333. String name = Bytes.toString(qualifierBytes);
334.         summary.setKey(dir + name + "/");
335.         summary.setName(name);
336.         summary.setLastModifyTime(cell.getTimestamp());
337.         summary.setMediaType("");
338.         summary.setBucket(bucket);
339.         summary.setLength(0);
340.         subDirs.add(summary);
341. if (subDirs.size() >= maxCount1) {
342. break;
343.         }
344.       }
345.     }
346. 
347. String dirSeq = this.getDirSeqId(bucket, dir);
348. byte[] objStart = Bytes.toBytes(dirSeq + "_" + start);
349. Scan objScan = new Scan();
350.     objScan.setRowPrefixFilter(Bytes.toBytes(dirSeq + "_" + keyPrefix));
351.     objScan.setFilter(new PageFilter(maxCount + 1));
352.     objScan.setStartRow(objStart);
353.     objScan.setMaxResultsPerColumnFamily(maxCount1);
354.     objScan.addFamily(HosUtil.OBJ_META_CF_BYTES);
355.     logger.info("scan start: " + Bytes.toString(objStart) + " - ");
356. ResultScanner objScanner = HBaseService
357.         .scanner(connection, HosUtil.getObjTableName(bucket), objScan);
358.     List<HosObjectSummary> objectSummaryList = new ArrayList<>();
359. Result result = null;
360. while (objectSummaryList.size() < maxCount1 && (result = objScanner.next()) != null) {
361. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir);
362.       objectSummaryList.add(summary);
363.     }
364. if (objScanner != null) {
365.       objScanner.close();
366.     }
367.     logger.info("scan complete: " + Bytes.toString(objStart) + " - ");
368. if (subDirs != null && subDirs.size() > 0) {
369.       objectSummaryList.addAll(subDirs);
370.     }
371.     Collections.sort(objectSummaryList);
372. ObjectListResult listResult = new ObjectListResult();
373. HosObjectSummary nextMarkerObj =
374.         objectSummaryList.size() > maxCount ? objectSummaryList.get(objectSummaryList.size() - 1)
375.             : null;
376. if (nextMarkerObj != null) {
377.       listResult.setNextMarker(nextMarkerObj.getKey());
378.     }
379. if (objectSummaryList.size() > maxCount) {
380.       objectSummaryList = objectSummaryList.subList(0, maxCount);
381.     }
382.     listResult.setMaxKeyNumber(maxCount);
383. if (objectSummaryList.size() > 0) {
384.       listResult.setMinKey(objectSummaryList.get(0).getKey());
385.       listResult.setMaxKey(objectSummaryList.get(objectSummaryList.size() - 1).getKey());
386.     }
387.     listResult.setObjectCount(objectSummaryList.size());
388.     listResult.setObjectList(objectSummaryList);
389.     listResult.setBucket(bucket);
390. 
391. return listResult;
392.   }
393. 
394. @Override
395. public HosObject getObject(String bucket, String key) throws IOException {
396. if (key.endsWith("/")) {
397. Result result = HBaseService
398.           .getRow(connection, HosUtil.getDirTableName(bucket), key);
399. if (result.isEmpty()) {
400. return null;
401.       }
402. ObjectMetaData metaData = new ObjectMetaData();
403.       metaData.setBucket(bucket);
404.       metaData.setKey(key);
405.       metaData.setLastModifyTime(result.rawCells()[0].getTimestamp());
406.       metaData.setLength(0);
407. HosObject object = new HosObject();
408.       object.setMetaData(metaData);
409. return object;
410.     }
411. String dir = key.substring(0, key.lastIndexOf("/") + 1);
412. String name = key.substring(key.lastIndexOf("/") + 1);
413. String seq = this.getDirSeqId(bucket, dir);
414. String objKey = seq + "_" + name;
415. Result result = HBaseService
416.         .getRow(connection, HosUtil.getObjTableName(bucket), objKey);
417. if (result.isEmpty()) {
418. return null;
419.     }
420. HosObject object = new HosObject();
421. if (result.containsNonEmptyColumn(HosUtil.OBJ_CONT_CF_BYTES,
422.         HosUtil.OBJ_CONT_QUALIFIER)) {
423. ByteArrayInputStream bas = new ByteArrayInputStream(
424.           result
425.               .getValue(HosUtil.OBJ_CONT_CF_BYTES,
426.                   HosUtil.OBJ_CONT_QUALIFIER));
427.       object.setContent(bas);
428.     } else {
429. String fileDir = HosUtil.FILE_STORE_ROOT + "/" + bucket + "/" + seq;
430. InputStream inputStream = this.fileStore.openFile(fileDir, name);
431.       object.setContent(inputStream);
432.     }
433. long len = Bytes.toLong(result.getValue(HosUtil.OBJ_META_CF_BYTES,
434.         HosUtil.OBJ_LEN_QUALIFIER));
435. ObjectMetaData metaData = new ObjectMetaData();
436.     metaData.setBucket(bucket);
437.     metaData.setKey(key);
438.     metaData.setLastModifyTime(result.rawCells()[0].getTimestamp());
439.     metaData.setLength(len);
440.     metaData.setMediaType(Bytes.toString(result.getValue(HosUtil.OBJ_META_CF_BYTES,
441.         HosUtil.OBJ_MEDIATYPE_QUALIFIER)));
442. byte[] b = result
443.         .getValue(HosUtil.OBJ_META_CF_BYTES, HosUtil.OBJ_PROPS_QUALIFIER);
444. if (b != null) {
445.       metaData.setAttrs(JsonUtil.fromJson(Map.class, Bytes.toString(b)));
446.     }
447.     object.setMetaData(metaData);
448. return object;
449.   }
450. 
451. @Override
452. public void deleteObject(String bucket, String key) throws Exception {
453. if (key.endsWith("/")) {
454. //check sub dir and current dir files.
455. if (!isDirEmpty(bucket, key)) {
456. throw new RuntimeException("dir is not empty");
457.       }
458. InterProcessMutex lock = null;
459. try {
460. String lockey = key.replaceAll("/", "_");
461.         lock = new InterProcessMutex(this.zkClient, "/mos/" + bucket + "/" + lockey);
462.         lock.acquire();
463. if (!isDirEmpty(bucket, key)) {
464. throw new RuntimeException("dir is not empty");
465.         }
466. String dir1 = key.substring(0, key.lastIndexOf("/"));
467. String name = dir1.substring(dir1.lastIndexOf("/") + 1);
468. if (name.length() > 0) {
469. String parent = key.substring(0, key.lastIndexOf(name));
470.           HBaseService
471.               .deleteQualifier(connection, HosUtil.getDirTableName(bucket), parent,
472.                   HosUtil.DIR_SUBDIR_CF, name);
473.         }
474.         HBaseService.delete(connection, HosUtil.getDirTableName(bucket), key);
475. return;
476.       } finally {
477. if (lock != null) {
478.           lock.release();
479.         }
480.       }
481.     }
482. String dir = key.substring(0, key.lastIndexOf("/") + 1);
483. String name = key.substring(key.lastIndexOf("/") + 1);
484. String seqId = this.getDirSeqId(bucket, dir);
485. String objKey = seqId + "_" + name;
486. Result result = HBaseService
487.         .getRow(connection, HosUtil.getObjTableName(bucket), objKey,
488.             HosUtil.OBJ_META_CF_BYTES, HosUtil.OBJ_LEN_QUALIFIER);
489. if (result.isEmpty()) {
490. return;
491.     }
492. long len = Bytes.toLong(result.getValue(HosUtil.OBJ_META_CF_BYTES,
493.         HosUtil.OBJ_LEN_QUALIFIER));
494. if (len > HosUtil.FILE_STORE_THRESHOLD) {
495. String fileDir = HosUtil.FILE_STORE_ROOT + "/" + bucket + "/" + seqId;
496. this.fileStore.deleteFile(fileDir, name);
497.     }
498.     HBaseService.delete(connection, HosUtil.getObjTableName(bucket), objKey);
499.   }
500. 
501. private boolean isDirEmpty(String bucket, String dir) throws IOException {
502. return listDir(bucket, dir, null, 2).getObjectList().size() == 0;
503.   }
504. 
505. @Override
506. public void deleteBucketStore(String bucket) throws IOException {
507.     HBaseService.deleteTable(connection, HosUtil.getDirTableName(bucket));
508.     HBaseService.deleteTable(connection, HosUtil.getObjTableName(bucket));
509. 
510.     HBaseService.delete(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, bucket);
511. this.fileStore.deleteDir(HosUtil.FILE_STORE_ROOT + "/" + bucket);
512.   }
513. 
514. @Override
515. public void createBucketStore(String bucket) throws IOException {
516.     HBaseService.createTable(connection, HosUtil.getDirTableName(bucket),
517.         HosUtil.getDirColumnFamily());
518.     HBaseService.createTable(connection, HosUtil.getObjTableName(bucket),
519.         HosUtil.getObjColumnFamily(), HosUtil.OBJ_REGIONS);
520. 
521. Put put = new Put(Bytes.toBytes(bucket));
522.     put.addColumn(HosUtil.BUCKET_DIR_SEQ_CF_BYTES,
523.         HosUtil.BUCKET_DIR_SEQ_QUALIFIER, Bytes.toBytes(0L));
524.     HBaseService.putRow(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, put);
525. this.fileStore.mikDir(HosUtil.FILE_STORE_ROOT + "/" + bucket);
526.   }
527. 
528. /**
529.    * seqId由bucket和dir唯一确定
530.    * @param bucket
531.    * @param dir
532.    * @return
533.    * @throws IOException
534.    */
535. private String getDirSeqId(String bucket, String dir) throws IOException {
536. Result result = HBaseService.getRow(connection, HosUtil.getDirTableName(bucket), dir);
537. if (result.isEmpty()) {
538. return null;
539.     }
540. String dirSeqId = Bytes.toString(result.getValue(HosUtil.DIR_META_CF_BYTES,
541.         HosUtil.DIR_SEQID_QUALIFIER));
542. return dirSeqId;
543.   }
544. 
545. private void getDirAllFiles(String bucket, String dir, String seqId, List<HosObjectSummary> keys,
546.       String endKey) throws IOException {
547. 
548. byte[] max = Bytes.createMaxByteArray(100);
549. byte[] tail = Bytes.add(Bytes.toBytes(seqId), max);
550. if (endKey.startsWith(dir)) {
551. String endKeyLeft = endKey.replace(dir, "");
552. String fileNameMax = endKeyLeft;
553. if (endKeyLeft.indexOf("/") > 0) {
554.         fileNameMax = endKeyLeft.substring(0, endKeyLeft.indexOf("/"));
555.       }
556.       tail = Bytes.toBytes(seqId + "_" + fileNameMax);
557.     }
558. 
559. Scan scan = new Scan(Bytes.toBytes(seqId), tail);
560.     scan.setFilter(HosUtil.OBJ_META_SCAN_FILTER);
561. ResultScanner scanner = HBaseService
562.         .scanner(connection, HosUtil.getObjTableName(bucket), scan);
563. Result result = null;
564. while ((result = scanner.next()) != null) {
565. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir);
566.       keys.add(summary);
567.     }
568. if (scanner != null) {
569.       scanner.close();
570.     }
571.   }
572. 
573. private HosObjectSummary resultToObjectSummary(Result result, String bucket, String dir)
574. throws IOException {
575. HosObjectSummary summary = new HosObjectSummary();
576. long timestamp = result.rawCells()[0].getTimestamp();
577.     summary.setLastModifyTime(timestamp);
578. String id = new String(result.getRow());
579.     summary.setId(id);
580. String name = id.split("_", 2)[1];
581. String key = dir + name;
582.     summary.setKey(key);
583.     summary.setName(name);
584.     summary.setBucket(bucket);
585. String s = Bytes.toString(result.getValue(HosUtil.OBJ_META_CF_BYTES,
586.         HosUtil.OBJ_PROPS_QUALIFIER));
587. if (s != null) {
588.       summary.setAttrs(JsonUtil.fromJson(Map.class, s));
589.     }
590.     summary.setLength(Bytes.toLong(result.getValue(HosUtil.OBJ_META_CF_BYTES,
591.         HosUtil.OBJ_LEN_QUALIFIER)));
592.     summary
593.         .setMediaType(Bytes.toString(result.getValue(HosUtil.OBJ_META_CF_BYTES,
594.             HosUtil.OBJ_MEDIATYPE_QUALIFIER)));
595. 
596. return summary;
597.   }
598. 
599. private HosObjectSummary dirObjectToSummary(Result result, String bucket, String dir) {
600. HosObjectSummary summary = new HosObjectSummary();
601. String id = Bytes.toString(result.getRow());
602.     summary.setId(id);
603.     summary.setAttrs(new HashMap<>(0));
604. if (dir.length() > 1) {
605.       summary.setName(dir.substring(dir.lastIndexOf("/") + 1));
606.     } else {
607.       summary.setName("");
608.     }
609.     summary.setBucket(bucket);
610.     summary.setKey(dir);
611.     summary.setLastModifyTime(result.rawCells()[0].getTimestamp());
612.     summary.setLength(0);
613.     summary.setMediaType("");
614. return summary;
615.   }
616. 
617. 
618. private short getBucketReplication(String bucket) {
619. return 2;
620.   }
621. 
622. private String putDir(String bucket, String dir) throws Exception {
623. if (dirExist(bucket, dir)) {
624. return null;
625.     }
626. InterProcessMutex lock = null;
627. try {
628. String lockey = dir.replaceAll("/", "_");
629.       lock = new InterProcessMutex(this.zkClient, "/mos/" + bucket + "/" + lockey);
630.       lock.acquire();
631. String dir1 = dir.substring(0, dir.lastIndexOf("/"));
632. String name = dir1.substring(dir1.lastIndexOf("/") + 1);
633. if (name.length() > 0) {
634. String parent = dir.substring(0, dir1.lastIndexOf("/") + 1);
635. if (!this.dirExist(bucket, parent)) {
636. this.putDir(bucket, parent);
637.         }
638. Put put = new Put(Bytes.toBytes(parent));
639.         put.addColumn(HosUtil.DIR_SUBDIR_CF_BYTES, Bytes.toBytes(name),
640.             Bytes.toBytes('1'));
641.         HBaseService.putRow(connection, HosUtil.getDirTableName(bucket), put);
642.       }
643. String seqId = this.getDirSeqId(bucket, dir);
644. String hash = seqId == null ? makeDirSeqId(bucket) : seqId;
645. Put hashPut = new Put(dir.getBytes());
646.       hashPut.addColumn(HosUtil.DIR_META_CF_BYTES,
647.           HosUtil.DIR_SEQID_QUALIFIER, Bytes.toBytes(hash));
648.       HBaseService.putRow(connection, HosUtil.getDirTableName(bucket), hashPut);
649. return hash;
650.     } finally {
651. if (lock != null) {
652.         lock.release();
653.       }
654.     }
655.   }
656. 
657. private String makeDirSeqId(String bucket) throws IOException {
658. long v = HBaseService
659.         .incrementColumnValue(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, bucket,
660.             HosUtil.BUCKET_DIR_SEQ_CF_BYTES, HosUtil.BUCKET_DIR_SEQ_QUALIFIER,
661. 1);
662. return String.format("%da%d", v % 64, v);
663.   }
664. 
665. private boolean dirExist(String bucket, String dir) throws IOException {
666. return HBaseService.existsRow(connection, HosUtil.getDirTableName(bucket), dir);
667.   }
668. }

1.用户上传或删除文件时需要加分布式zk锁

2.设计思想需要认真反复看

Hbase的调优

完整代码

https://download.csdn.net/download/JiShuiSanQianLi/16276861

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
18天前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
48 4
|
19天前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot提供了简便的方式来集成和使用分布式缓存。通过Redis和Memcached等缓存方案,可以显著提升应用的性能和扩展性。合理配置和优化缓存策略,可以有效避免常见的缓存问题,保证系统的稳定性和高效运行。
37 3
|
1月前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
106 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
26天前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
46 6
|
1月前
|
XML Java 数据库连接
SpringBoot集成Flowable:打造强大的工作流管理系统
在企业级应用开发中,工作流管理是一个核心组件,它能够帮助我们定义、执行和管理业务流程。Flowable是一个开源的工作流和业务流程管理(BPM)平台,它提供了强大的工作流引擎和建模工具。结合SpringBoot,我们可以快速构建一个高效、灵活的工作流管理系统。本文将探讨如何将Flowable集成到SpringBoot应用中,并展示其强大的功能。
192 1
|
24天前
|
机器学习/深度学习 人工智能 分布式计算
【AI系统】分布式通信与 NVLink
进入大模型时代后,AI的核心转向大模型发展,训练这类模型需克服大量GPU资源及长时间的需求。面对单个GPU内存限制,跨多个GPU的分布式训练成为必要,这涉及到分布式通信和NVLink技术的应用。分布式通信允许多个节点协作完成任务,而NVLink则是一种高速、低延迟的通信技术,用于连接GPU或GPU与其它设备,以实现高性能计算。随着大模型的参数、数据规模扩大及算力需求增长,分布式并行策略,如数据并行和模型并行,变得至关重要。这些策略通过将模型或数据分割在多个GPU上处理,提高了训练效率。此外,NVLink和NVSwitch技术的持续演进,为GPU间的高效通信提供了更强的支持,推动了大模型训练的快
38 0
|
1月前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。
|
2月前
|
存储 安全 Java
打造智能合同管理系统:SpringBoot与电子签章的完美融合
【10月更文挑战第7天】 在数字化转型的浪潮中,电子合同管理系统因其高效、环保和安全的特点,正逐渐成为企业合同管理的新宠。本文将分享如何利用SpringBoot框架实现一个集电子文件签字与合同管理于一体的智能系统,探索技术如何助力合同管理的现代化。
113 4
|
2月前
|
前端开发 Java Apache
SpringBoot实现电子文件签字+合同系统!
【10月更文挑战第15天】 在现代企业运营中,合同管理和电子文件签字成为了日常活动中不可或缺的一部分。随着技术的发展,电子合同系统因其高效性、安全性和环保性,逐渐取代了传统的纸质合同。本文将详细介绍如何使用SpringBoot框架实现一个电子文件签字和合同管理系统。
104 1
|
2月前
|
文字识别 安全 Java
SpringBoot3.x和OCR构建车牌识别系统
本文介绍了一个基于Java SpringBoot3.x框架的车牌识别系统,详细阐述了系统的设计目标、需求分析及其实现过程。利用Tesseract OCR库和OpenCV库,实现了车牌图片的识别与处理,确保系统的高准确性和稳定性。文中还提供了具体的代码示例,展示了如何构建和优化车牌识别服务,以及如何处理特殊和异常车牌。通过实际应用案例,帮助读者理解和应用这一解决方案。
下一篇
DataWorks