1. package com.imooc.bigdata.hos.server;
2.
3. import java.io.ByteArrayInputStream;
4. import java.io.IOException;
5. import java.io.InputStream;
6. import java.nio.ByteBuffer;
7. import java.util.ArrayList;
8. import java.util.Collections;
9. import java.util.HashMap;
10. import java.util.List;
11. import java.util.Map;
12.
13. import org.apache.curator.framework.CuratorFramework;
14. import org.apache.curator.framework.CuratorFrameworkFactory;
15. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
16. import org.apache.curator.retry.ExponentialBackoffRetry;
17. import org.apache.hadoop.hbase.Cell;
18. import org.apache.hadoop.hbase.CellUtil;
19. import org.apache.hadoop.hbase.TableName;
20. import org.apache.hadoop.hbase.client.Admin;
21. import org.apache.hadoop.hbase.client.Connection;
22. import org.apache.hadoop.hbase.client.Get;
23. import org.apache.hadoop.hbase.client.Put;
24. import org.apache.hadoop.hbase.client.Result;
25. import org.apache.hadoop.hbase.client.ResultScanner;
26. import org.apache.hadoop.hbase.client.Scan;
27. import org.apache.hadoop.hbase.filter.BinaryComparator;
28. import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
29. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
30. import org.apache.hadoop.hbase.filter.FilterList;
31. import org.apache.hadoop.hbase.filter.FilterList.Operator;
32. import org.apache.hadoop.hbase.filter.PageFilter;
33. import org.apache.hadoop.hbase.filter.QualifierFilter;
34. import org.apache.hadoop.hbase.io.ByteBufferInputStream;
35. import org.apache.hadoop.hbase.util.Bytes;
36. import org.apache.log4j.Logger;
37.
38. import com.google.common.base.Strings;
39. import com.imooc.bigdata.hos.common.HosObject;
40. import com.imooc.bigdata.hos.common.HosObjectSummary;
41. import com.imooc.bigdata.hos.common.ObjectListResult;
42. import com.imooc.bigdata.hos.common.ObjectMetaData;
43. import com.imooc.bigdata.hos.common.util.JsonUtil;
44.
45. /**
46. * Created by jixin on 18-3-13.
47. */
48. public class HosStoreServiceImpl implements IHosStoreService {
49.
50. private static Logger logger = Logger.getLogger(HosStoreServiceImpl.class);
51. private Connection connection = null;
52. private IHdfsService fileStore;
53. private String zkUrls;
54. private CuratorFramework zkClient;
55.
56. public HosStoreServiceImpl(Connection connection, IHdfsService fileStore, String zkurls)
57. throws IOException {
58. this.connection = connection;
59. this.fileStore = fileStore;
60. this.zkUrls = zkurls;
61. zkClient = CuratorFrameworkFactory.newClient(zkUrls, new ExponentialBackoffRetry(20, 5));
62. zkClient.start();
63. }
64.
65. @Override
66. public void createSeqTable() throws IOException {
67. Admin admin = connection.getAdmin();
68. TableName tableName = TableName.valueOf(HosUtil.BUCKET_DIR_SEQ_TABLE);
69. if (admin.tableExists(tableName)) {
70. return;
71. }
72. HBaseService.createTable(connection, HosUtil.BUCKET_DIR_SEQ_TABLE,
73. new String[]{HosUtil.BUCKET_DIR_SEQ_CF});
74. }
75.
76. @Override
77. public void put(String bucket, String key, ByteBuffer input, long length, String mediaType,
78. Map<String, String> properties) throws Exception {
79. InterProcessMutex lock = null;
80. try {
81. if (key.endsWith("/")) {
82. //put dir object
83. putDir(bucket, key);
84. return;
85. }
86. //put dir
87. String dir = key.substring(0, key.lastIndexOf("/") + 1);
88. String hash = null;
89. while (hash == null) {
90. if (!dirExist(bucket, dir)) {
91. hash = putDir(bucket, dir);
92. } else {
93. hash = this.getDirSeqId(bucket, dir);
94. }
95. }
96. String lockey = key.replaceAll("/", "_");
97. lock = new InterProcessMutex(this.zkClient, "/mos/" + bucket + "/" + lockey);
98. lock.acquire();
99. //put
100. String fileKey = hash + "_" + key.substring(key.lastIndexOf("/") + 1);
101. Put contentPut = new Put(fileKey.getBytes());
102. if (!Strings.isNullOrEmpty(mediaType)) {
103. contentPut.addColumn(HosUtil.OBJ_META_CF_BYTES,
104. HosUtil.OBJ_MEDIATYPE_QUALIFIER, mediaType.getBytes());
105. }
106. if (properties != null) {
107. String props = JsonUtil.toJson(properties);
108. contentPut.addColumn(HosUtil.OBJ_META_CF_BYTES,
109. HosUtil.OBJ_PROPS_QUALIFIER, props.getBytes());
110. }
111. contentPut.addColumn(HosUtil.OBJ_META_CF_BYTES,
112. HosUtil.OBJ_LEN_QUALIFIER, Bytes.toBytes((long) length));
113.
114. if (length <= HosUtil.FILE_STORE_THRESHOLD) {
115. ByteBuffer qualifierBuffer = ByteBuffer.wrap(HosUtil.OBJ_CONT_QUALIFIER);
116. contentPut.addColumn(HosUtil.OBJ_CONT_CF_BYTES,
117. qualifierBuffer, System.currentTimeMillis(), input);
118. qualifierBuffer.clear();
119. }
120. HBaseService.putRow(connection, HosUtil.getObjTableName(bucket), contentPut);
121. if (length > HosUtil.FILE_STORE_THRESHOLD) {
122. String fileDir = HosUtil.FILE_STORE_ROOT + "/" + bucket + "/" + hash;
123. String name = key.substring(key.lastIndexOf("/") + 1);
124. InputStream inputStream = new ByteBufferInputStream(input);
125. this.fileStore.saveFile(fileDir, name, inputStream, length, getBucketReplication(bucket));
126. }
127. } finally {
128. if (lock != null) {
129. lock.release();
130. }
131. }
132. }
133.
134. @Override
135. public HosObjectSummary getSummary(String bucket, String key) throws IOException {
136. if (key.endsWith("/")) {
137. Result result = HBaseService
138. .getRow(connection, HosUtil.getDirTableName(bucket), key);
139. if (!result.isEmpty()) {
140. return this.dirObjectToSummary(result, bucket, key);
141. } else {
142. return null;
143. }
144. }
145. String dir = key.substring(0, key.lastIndexOf("/") + 1);
146. String seq = this.getDirSeqId(bucket, dir);
147. if (seq == null) {
148. return null;
149. }
150. //序列号_文件名
151. String objKey = seq + "_" + key.substring(key.lastIndexOf("/") + 1);
152. Result result = HBaseService
153. .getRow(connection, HosUtil.getObjTableName(bucket), objKey);
154. if (result.isEmpty()) {
155. return null;
156. }
157. return this.resultToObjectSummary(result, bucket, dir);
158. }
159.
160. @Override
161. public List<HosObjectSummary> list(String bucket, String startKey, String endKey)
162. throws IOException {
163.
164. String dir1 = startKey.substring(0, startKey.lastIndexOf("/") + 1).trim();
165. if (dir1.length() == 0) {
166. dir1 = "/";
167. }
168. String dir2 = endKey.substring(0, startKey.lastIndexOf("/") + 1).trim();
169. if (dir2.length() == 0) {
170. dir2 = "/";
171. }
172. String name1 = startKey.substring(startKey.lastIndexOf("/") + 1);
173. String name2 = endKey.substring(startKey.lastIndexOf("/") + 1);
174. String seqId = this.getDirSeqId(bucket, dir1);
175. //查询dir1中大于name1的全部文件
176. List<HosObjectSummary> keys = new ArrayList<>();
177. if (seqId != null && name1.length() > 0) {
178. byte[] max = Bytes.createMaxByteArray(100);
179. byte[] tail = Bytes.add(Bytes.toBytes(seqId), max);
180. if (dir1.equals(dir2)) {
181. tail = (seqId + "_" + name2).getBytes();
182. }
183. byte[] start = (seqId + "_" + name1).getBytes();
184. ResultScanner scanner1 = HBaseService
185. .scanner(connection, HosUtil.getObjTableName(bucket), start, tail);
186. Result result = null;
187. while ((result = scanner1.next()) != null) {
188. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir1);
189. keys.add(summary);
190. }
191. if (scanner1 != null) {
192. scanner1.close();
193. }
194. }
195. //startkey~endkey之间的全部目录
196. ResultScanner scanner2 = HBaseService
197. .scanner(connection, HosUtil.getDirTableName(bucket), startKey, endKey);
198. Result result = null;
199. while ((result = scanner2.next()) != null) {
200. String seqId2 = Bytes.toString(result.getValue(HosUtil.DIR_META_CF_BYTES,
201. HosUtil.DIR_SEQID_QUALIFIER));
202. if (seqId2 == null) {
203. continue;
204. }
205. String dir = Bytes.toString(result.getRow());
206. keys.add(dirObjectToSummary(result, bucket, dir));
207. getDirAllFiles(bucket, dir, seqId2, keys, endKey);
208. }
209. if (scanner2 != null) {
210. scanner2.close();
211. }
212. Collections.sort(keys);
213. return keys;
214. }
215.
216. @Override
217. public ObjectListResult listDir(String bucket, String dir, String start, int maxCount)
218. throws IOException {
219. if (start == null) {
220. start = "";
221. }
222. //查询目录表
223. Get get = new Get(Bytes.toBytes(dir));
224. get.addFamily(HosUtil.DIR_SUBDIR_CF_BYTES);
225. if (start.length() > 0) {
226. get.setFilter(new QualifierFilter(CompareOp.GREATER_OR_EQUAL,
227. new BinaryComparator(Bytes.toBytes(start))));
228. }
229. int maxCount1 = maxCount + 2;
230. Result dirResult = HBaseService
231. .getRow(connection, HosUtil.getDirTableName(bucket), get);
232. List<HosObjectSummary> subDirs = null;
233. if (!dirResult.isEmpty()) {
234. subDirs = new ArrayList<>();
235. for (Cell cell : dirResult.rawCells()) {
236. HosObjectSummary summary = new HosObjectSummary();
237. byte[] qualifierBytes = new byte[cell.getQualifierLength()];
238. CellUtil.copyQualifierTo(cell, qualifierBytes, 0);
239. String name = Bytes.toString(qualifierBytes);
240. summary.setKey(dir + name + "/");
241. summary.setName(name);
242. summary.setLastModifyTime(cell.getTimestamp());
243. summary.setMediaType("");
244. summary.setBucket(bucket);
245. summary.setLength(0);
246. subDirs.add(summary);
247. if (subDirs.size() >= maxCount1) {
248. break;
249. }
250. }
251. }
252.
253. //查询文件表
254. String dirSeq = this.getDirSeqId(bucket, dir);
255. byte[] objStart = Bytes.toBytes(dirSeq + "_" + start);
256. Scan objScan = new Scan();
257. objScan.setRowPrefixFilter(Bytes.toBytes(dirSeq + "_"));
258. /**
259. * 有这样一个场景,在HBase中需要分页查询,同时根据某一列的值进行过滤。
260. * 不同于RDBMS天然支持分页查询,HBase要进行分页必须由自己实现。据我了解的,目前有两种方案,
261. * 一是《HBase权威指南》中提到的用PageFilter加循环动态设置startRow实现,
262. * 详细见这里。但这种方法效率比较低,且有冗余查询。
263. * 因此京东研发了一种用额外的一张表来保存行序号的方案。 该种方案效率较高,但实现麻烦些,需要维护一张额外的表
264. * */
265. objScan.setFilter(new PageFilter(maxCount + 1));
266. objScan.setStartRow(objStart);
267. objScan.setMaxResultsPerColumnFamily(maxCount1);
268. objScan.addFamily(HosUtil.OBJ_META_CF_BYTES);
269. logger.info("scan start: " + Bytes.toString(objStart) + " - ");
270. ResultScanner objScanner = HBaseService
271. .scanner(connection, HosUtil.getObjTableName(bucket), objScan);
272. List<HosObjectSummary> objectSummaryList = new ArrayList<>();
273. Result result = null;
274. while (objectSummaryList.size() < maxCount1 && (result = objScanner.next()) != null) {
275. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir);
276. objectSummaryList.add(summary);
277. }
278. if (objScanner != null) {
279. objScanner.close();
280. }
281.
282.
283. //返回给用户maxCount
284. logger.info("scan complete: " + Bytes.toString(objStart) + " - ");
285. if (subDirs != null && subDirs.size() > 0) {
286. objectSummaryList.addAll(subDirs);
287. }
288. Collections.sort(objectSummaryList);
289. ObjectListResult listResult = new ObjectListResult();
290. HosObjectSummary nextMarkerObj =
291. objectSummaryList.size() > maxCount ? objectSummaryList.get(objectSummaryList.size() - 1)
292. : null;
293. if (nextMarkerObj != null) {
294. listResult.setNextMarker(nextMarkerObj.getKey());
295. }
296. if (objectSummaryList.size() > maxCount) {
297. objectSummaryList = objectSummaryList.subList(0, maxCount);
298. }
299. listResult.setMaxKeyNumber(maxCount);
300. if (objectSummaryList.size() > 0) {
301. listResult.setMinKey(objectSummaryList.get(0).getKey());
302. listResult.setMaxKey(objectSummaryList.get(objectSummaryList.size() - 1).getKey());
303. }
304. listResult.setObjectCount(objectSummaryList.size());
305. listResult.setObjectList(objectSummaryList);
306. listResult.setBucket(bucket);
307.
308. return listResult;
309. }
310.
311. @Override
312. public ObjectListResult listByPrefix(String bucket, String dir, String keyPrefix, String start,
313. int maxCount) throws IOException {
314. if (start == null) {
315. start = "";
316. }
317. FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
318. filterList.addFilter(new ColumnPrefixFilter(keyPrefix.getBytes()));
319. if (start.length() > 0) {
320. filterList.addFilter(new QualifierFilter(CompareOp.GREATER_OR_EQUAL,
321. new BinaryComparator(Bytes.toBytes(start))));
322. }
323. int maxCount1 = maxCount + 2;
324. Result dirResult = HBaseService
325. .getRow(connection, HosUtil.getDirTableName(bucket), dir, filterList);
326. List<HosObjectSummary> subDirs = null;
327. if (!dirResult.isEmpty()) {
328. subDirs = new ArrayList<>();
329. for (Cell cell : dirResult.rawCells()) {
330. HosObjectSummary summary = new HosObjectSummary();
331. byte[] qualifierBytes = new byte[cell.getQualifierLength()];
332. CellUtil.copyQualifierTo(cell, qualifierBytes, 0);
333. String name = Bytes.toString(qualifierBytes);
334. summary.setKey(dir + name + "/");
335. summary.setName(name);
336. summary.setLastModifyTime(cell.getTimestamp());
337. summary.setMediaType("");
338. summary.setBucket(bucket);
339. summary.setLength(0);
340. subDirs.add(summary);
341. if (subDirs.size() >= maxCount1) {
342. break;
343. }
344. }
345. }
346.
347. String dirSeq = this.getDirSeqId(bucket, dir);
348. byte[] objStart = Bytes.toBytes(dirSeq + "_" + start);
349. Scan objScan = new Scan();
350. objScan.setRowPrefixFilter(Bytes.toBytes(dirSeq + "_" + keyPrefix));
351. objScan.setFilter(new PageFilter(maxCount + 1));
352. objScan.setStartRow(objStart);
353. objScan.setMaxResultsPerColumnFamily(maxCount1);
354. objScan.addFamily(HosUtil.OBJ_META_CF_BYTES);
355. logger.info("scan start: " + Bytes.toString(objStart) + " - ");
356. ResultScanner objScanner = HBaseService
357. .scanner(connection, HosUtil.getObjTableName(bucket), objScan);
358. List<HosObjectSummary> objectSummaryList = new ArrayList<>();
359. Result result = null;
360. while (objectSummaryList.size() < maxCount1 && (result = objScanner.next()) != null) {
361. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir);
362. objectSummaryList.add(summary);
363. }
364. if (objScanner != null) {
365. objScanner.close();
366. }
367. logger.info("scan complete: " + Bytes.toString(objStart) + " - ");
368. if (subDirs != null && subDirs.size() > 0) {
369. objectSummaryList.addAll(subDirs);
370. }
371. Collections.sort(objectSummaryList);
372. ObjectListResult listResult = new ObjectListResult();
373. HosObjectSummary nextMarkerObj =
374. objectSummaryList.size() > maxCount ? objectSummaryList.get(objectSummaryList.size() - 1)
375. : null;
376. if (nextMarkerObj != null) {
377. listResult.setNextMarker(nextMarkerObj.getKey());
378. }
379. if (objectSummaryList.size() > maxCount) {
380. objectSummaryList = objectSummaryList.subList(0, maxCount);
381. }
382. listResult.setMaxKeyNumber(maxCount);
383. if (objectSummaryList.size() > 0) {
384. listResult.setMinKey(objectSummaryList.get(0).getKey());
385. listResult.setMaxKey(objectSummaryList.get(objectSummaryList.size() - 1).getKey());
386. }
387. listResult.setObjectCount(objectSummaryList.size());
388. listResult.setObjectList(objectSummaryList);
389. listResult.setBucket(bucket);
390.
391. return listResult;
392. }
393.
394. @Override
395. public HosObject getObject(String bucket, String key) throws IOException {
396. if (key.endsWith("/")) {
397. Result result = HBaseService
398. .getRow(connection, HosUtil.getDirTableName(bucket), key);
399. if (result.isEmpty()) {
400. return null;
401. }
402. ObjectMetaData metaData = new ObjectMetaData();
403. metaData.setBucket(bucket);
404. metaData.setKey(key);
405. metaData.setLastModifyTime(result.rawCells()[0].getTimestamp());
406. metaData.setLength(0);
407. HosObject object = new HosObject();
408. object.setMetaData(metaData);
409. return object;
410. }
411. String dir = key.substring(0, key.lastIndexOf("/") + 1);
412. String name = key.substring(key.lastIndexOf("/") + 1);
413. String seq = this.getDirSeqId(bucket, dir);
414. String objKey = seq + "_" + name;
415. Result result = HBaseService
416. .getRow(connection, HosUtil.getObjTableName(bucket), objKey);
417. if (result.isEmpty()) {
418. return null;
419. }
420. HosObject object = new HosObject();
421. if (result.containsNonEmptyColumn(HosUtil.OBJ_CONT_CF_BYTES,
422. HosUtil.OBJ_CONT_QUALIFIER)) {
423. ByteArrayInputStream bas = new ByteArrayInputStream(
424. result
425. .getValue(HosUtil.OBJ_CONT_CF_BYTES,
426. HosUtil.OBJ_CONT_QUALIFIER));
427. object.setContent(bas);
428. } else {
429. String fileDir = HosUtil.FILE_STORE_ROOT + "/" + bucket + "/" + seq;
430. InputStream inputStream = this.fileStore.openFile(fileDir, name);
431. object.setContent(inputStream);
432. }
433. long len = Bytes.toLong(result.getValue(HosUtil.OBJ_META_CF_BYTES,
434. HosUtil.OBJ_LEN_QUALIFIER));
435. ObjectMetaData metaData = new ObjectMetaData();
436. metaData.setBucket(bucket);
437. metaData.setKey(key);
438. metaData.setLastModifyTime(result.rawCells()[0].getTimestamp());
439. metaData.setLength(len);
440. metaData.setMediaType(Bytes.toString(result.getValue(HosUtil.OBJ_META_CF_BYTES,
441. HosUtil.OBJ_MEDIATYPE_QUALIFIER)));
442. byte[] b = result
443. .getValue(HosUtil.OBJ_META_CF_BYTES, HosUtil.OBJ_PROPS_QUALIFIER);
444. if (b != null) {
445. metaData.setAttrs(JsonUtil.fromJson(Map.class, Bytes.toString(b)));
446. }
447. object.setMetaData(metaData);
448. return object;
449. }
450.
451. @Override
452. public void deleteObject(String bucket, String key) throws Exception {
453. if (key.endsWith("/")) {
454. //check sub dir and current dir files.
455. if (!isDirEmpty(bucket, key)) {
456. throw new RuntimeException("dir is not empty");
457. }
458. InterProcessMutex lock = null;
459. try {
460. String lockey = key.replaceAll("/", "_");
461. lock = new InterProcessMutex(this.zkClient, "/mos/" + bucket + "/" + lockey);
462. lock.acquire();
463. if (!isDirEmpty(bucket, key)) {
464. throw new RuntimeException("dir is not empty");
465. }
466. String dir1 = key.substring(0, key.lastIndexOf("/"));
467. String name = dir1.substring(dir1.lastIndexOf("/") + 1);
468. if (name.length() > 0) {
469. String parent = key.substring(0, key.lastIndexOf(name));
470. HBaseService
471. .deleteQualifier(connection, HosUtil.getDirTableName(bucket), parent,
472. HosUtil.DIR_SUBDIR_CF, name);
473. }
474. HBaseService.delete(connection, HosUtil.getDirTableName(bucket), key);
475. return;
476. } finally {
477. if (lock != null) {
478. lock.release();
479. }
480. }
481. }
482. String dir = key.substring(0, key.lastIndexOf("/") + 1);
483. String name = key.substring(key.lastIndexOf("/") + 1);
484. String seqId = this.getDirSeqId(bucket, dir);
485. String objKey = seqId + "_" + name;
486. Result result = HBaseService
487. .getRow(connection, HosUtil.getObjTableName(bucket), objKey,
488. HosUtil.OBJ_META_CF_BYTES, HosUtil.OBJ_LEN_QUALIFIER);
489. if (result.isEmpty()) {
490. return;
491. }
492. long len = Bytes.toLong(result.getValue(HosUtil.OBJ_META_CF_BYTES,
493. HosUtil.OBJ_LEN_QUALIFIER));
494. if (len > HosUtil.FILE_STORE_THRESHOLD) {
495. String fileDir = HosUtil.FILE_STORE_ROOT + "/" + bucket + "/" + seqId;
496. this.fileStore.deleteFile(fileDir, name);
497. }
498. HBaseService.delete(connection, HosUtil.getObjTableName(bucket), objKey);
499. }
500.
501. private boolean isDirEmpty(String bucket, String dir) throws IOException {
502. return listDir(bucket, dir, null, 2).getObjectList().size() == 0;
503. }
504.
505. @Override
506. public void deleteBucketStore(String bucket) throws IOException {
507. HBaseService.deleteTable(connection, HosUtil.getDirTableName(bucket));
508. HBaseService.deleteTable(connection, HosUtil.getObjTableName(bucket));
509.
510. HBaseService.delete(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, bucket);
511. this.fileStore.deleteDir(HosUtil.FILE_STORE_ROOT + "/" + bucket);
512. }
513.
514. @Override
515. public void createBucketStore(String bucket) throws IOException {
516. HBaseService.createTable(connection, HosUtil.getDirTableName(bucket),
517. HosUtil.getDirColumnFamily());
518. HBaseService.createTable(connection, HosUtil.getObjTableName(bucket),
519. HosUtil.getObjColumnFamily(), HosUtil.OBJ_REGIONS);
520.
521. Put put = new Put(Bytes.toBytes(bucket));
522. put.addColumn(HosUtil.BUCKET_DIR_SEQ_CF_BYTES,
523. HosUtil.BUCKET_DIR_SEQ_QUALIFIER, Bytes.toBytes(0L));
524. HBaseService.putRow(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, put);
525. this.fileStore.mikDir(HosUtil.FILE_STORE_ROOT + "/" + bucket);
526. }
527.
528. /**
529. * seqId由bucket和dir唯一确定
530. * @param bucket
531. * @param dir
532. * @return
533. * @throws IOException
534. */
535. private String getDirSeqId(String bucket, String dir) throws IOException {
536. Result result = HBaseService.getRow(connection, HosUtil.getDirTableName(bucket), dir);
537. if (result.isEmpty()) {
538. return null;
539. }
540. String dirSeqId = Bytes.toString(result.getValue(HosUtil.DIR_META_CF_BYTES,
541. HosUtil.DIR_SEQID_QUALIFIER));
542. return dirSeqId;
543. }
544.
545. private void getDirAllFiles(String bucket, String dir, String seqId, List<HosObjectSummary> keys,
546. String endKey) throws IOException {
547.
548. byte[] max = Bytes.createMaxByteArray(100);
549. byte[] tail = Bytes.add(Bytes.toBytes(seqId), max);
550. if (endKey.startsWith(dir)) {
551. String endKeyLeft = endKey.replace(dir, "");
552. String fileNameMax = endKeyLeft;
553. if (endKeyLeft.indexOf("/") > 0) {
554. fileNameMax = endKeyLeft.substring(0, endKeyLeft.indexOf("/"));
555. }
556. tail = Bytes.toBytes(seqId + "_" + fileNameMax);
557. }
558.
559. Scan scan = new Scan(Bytes.toBytes(seqId), tail);
560. scan.setFilter(HosUtil.OBJ_META_SCAN_FILTER);
561. ResultScanner scanner = HBaseService
562. .scanner(connection, HosUtil.getObjTableName(bucket), scan);
563. Result result = null;
564. while ((result = scanner.next()) != null) {
565. HosObjectSummary summary = this.resultToObjectSummary(result, bucket, dir);
566. keys.add(summary);
567. }
568. if (scanner != null) {
569. scanner.close();
570. }
571. }
572.
573. private HosObjectSummary resultToObjectSummary(Result result, String bucket, String dir)
574. throws IOException {
575. HosObjectSummary summary = new HosObjectSummary();
576. long timestamp = result.rawCells()[0].getTimestamp();
577. summary.setLastModifyTime(timestamp);
578. String id = new String(result.getRow());
579. summary.setId(id);
580. String name = id.split("_", 2)[1];
581. String key = dir + name;
582. summary.setKey(key);
583. summary.setName(name);
584. summary.setBucket(bucket);
585. String s = Bytes.toString(result.getValue(HosUtil.OBJ_META_CF_BYTES,
586. HosUtil.OBJ_PROPS_QUALIFIER));
587. if (s != null) {
588. summary.setAttrs(JsonUtil.fromJson(Map.class, s));
589. }
590. summary.setLength(Bytes.toLong(result.getValue(HosUtil.OBJ_META_CF_BYTES,
591. HosUtil.OBJ_LEN_QUALIFIER)));
592. summary
593. .setMediaType(Bytes.toString(result.getValue(HosUtil.OBJ_META_CF_BYTES,
594. HosUtil.OBJ_MEDIATYPE_QUALIFIER)));
595.
596. return summary;
597. }
598.
599. private HosObjectSummary dirObjectToSummary(Result result, String bucket, String dir) {
600. HosObjectSummary summary = new HosObjectSummary();
601. String id = Bytes.toString(result.getRow());
602. summary.setId(id);
603. summary.setAttrs(new HashMap<>(0));
604. if (dir.length() > 1) {
605. summary.setName(dir.substring(dir.lastIndexOf("/") + 1));
606. } else {
607. summary.setName("");
608. }
609. summary.setBucket(bucket);
610. summary.setKey(dir);
611. summary.setLastModifyTime(result.rawCells()[0].getTimestamp());
612. summary.setLength(0);
613. summary.setMediaType("");
614. return summary;
615. }
616.
617.
618. private short getBucketReplication(String bucket) {
619. return 2;
620. }
621.
622. private String putDir(String bucket, String dir) throws Exception {
623. if (dirExist(bucket, dir)) {
624. return null;
625. }
626. InterProcessMutex lock = null;
627. try {
628. String lockey = dir.replaceAll("/", "_");
629. lock = new InterProcessMutex(this.zkClient, "/mos/" + bucket + "/" + lockey);
630. lock.acquire();
631. String dir1 = dir.substring(0, dir.lastIndexOf("/"));
632. String name = dir1.substring(dir1.lastIndexOf("/") + 1);
633. if (name.length() > 0) {
634. String parent = dir.substring(0, dir1.lastIndexOf("/") + 1);
635. if (!this.dirExist(bucket, parent)) {
636. this.putDir(bucket, parent);
637. }
638. Put put = new Put(Bytes.toBytes(parent));
639. put.addColumn(HosUtil.DIR_SUBDIR_CF_BYTES, Bytes.toBytes(name),
640. Bytes.toBytes('1'));
641. HBaseService.putRow(connection, HosUtil.getDirTableName(bucket), put);
642. }
643. String seqId = this.getDirSeqId(bucket, dir);
644. String hash = seqId == null ? makeDirSeqId(bucket) : seqId;
645. Put hashPut = new Put(dir.getBytes());
646. hashPut.addColumn(HosUtil.DIR_META_CF_BYTES,
647. HosUtil.DIR_SEQID_QUALIFIER, Bytes.toBytes(hash));
648. HBaseService.putRow(connection, HosUtil.getDirTableName(bucket), hashPut);
649. return hash;
650. } finally {
651. if (lock != null) {
652. lock.release();
653. }
654. }
655. }
656.
657. private String makeDirSeqId(String bucket) throws IOException {
658. long v = HBaseService
659. .incrementColumnValue(connection, HosUtil.BUCKET_DIR_SEQ_TABLE, bucket,
660. HosUtil.BUCKET_DIR_SEQ_CF_BYTES, HosUtil.BUCKET_DIR_SEQ_QUALIFIER,
661. 1);
662. return String.format("%da%d", v % 64, v);
663. }
664.
665. private boolean dirExist(String bucket, String dir) throws IOException {
666. return HBaseService.existsRow(connection, HosUtil.getDirTableName(bucket), dir);
667. }
668. }