Hadoop文件系统支持释疑之S3

简介: 一、引言   Hadoop版本提供了对多种文件系统的支持,但是这些文件系统是以何种方式实现的,其实现原理是什么以前并没有深究过。今天正好有人咨询我这个问题:Hadoop对S3的支持原理是什么?特此总结一下。

一、引言

  Hadoop版本提供了对多种文件系统的支持,但是这些文件系统是以何种方式实现的,其实现原理是什么以前并没有深究过。今天正好有人咨询我这个问题:Hadoop对S3的支持原理是什么?特此总结一下。Hadoop支持的文件系统包括:  

  文件系统                 URI前缀       hadoop的具体实现类

  Local                     file               fs.LocalFileSystem

  HDFS                     hdfs            hdfs.DistributedFileSystem

  HFTP                      hftp            hdfs.HftpFileSystem

  HSFTP                    hsftp          hdfs.HsftpFileSystem

  HAR                        har            fs.HarFileSystem

  KFS                         kfs            fs.kfs.KosmosFileSystem

  FTP                          ftp             fs.ftp.FTPFileSystem

  S3 (native)              s3n            fs.s3native.NativeS3FileSystem

  S3 (blockbased)      s3      fs.s3.S3FileSystem

二、争议观点

   1.Hadoop对S3文件系统的支持是通过自己实现S3文件系统来做的吗?

   2.Hadoop对S3文件系统的支持是通过S3文件系统接口,实现的对S3文件系统的整合?

三、源码解析

  1 package org.apache.hadoop.fs.s3;
  2 
  3 import java.io.BufferedInputStream;
  4 import java.io.BufferedOutputStream;
  5 import java.io.Closeable;
  6 import java.io.File;
  7 import java.io.FileInputStream;
  8 import java.io.FileOutputStream;
  9 import java.io.IOException;
 10 import java.io.InputStream;
 11 import java.io.OutputStream;
 12 import java.net.URI;
 13 import java.util.HashMap;
 14 import java.util.Map;
 15 import java.util.Set;
 16 import java.util.TreeSet;
 17 
 18 import org.apache.hadoop.conf.Configuration;
 19 import org.apache.hadoop.fs.Path;
 20 import org.apache.hadoop.fs.s3.INode.FileType;
 21 import org.jets3t.service.S3Service;
 22 import org.jets3t.service.S3ServiceException;
 23 import org.jets3t.service.impl.rest.httpclient.RestS3Service;
 24 import org.jets3t.service.model.S3Bucket;
 25 import org.jets3t.service.model.S3Object;
 26 import org.jets3t.service.security.AWSCredentials;
 27 
 28 class Jets3tFileSystemStore implements FileSystemStore {
 29   
 30   private static final String FILE_SYSTEM_NAME = "fs";
 31   private static final String FILE_SYSTEM_VALUE = "Hadoop";
 32 
 33   private static final String FILE_SYSTEM_TYPE_NAME = "fs-type";
 34   private static final String FILE_SYSTEM_TYPE_VALUE = "block";
 35 
 36   private static final String FILE_SYSTEM_VERSION_NAME = "fs-version";
 37   private static final String FILE_SYSTEM_VERSION_VALUE = "1";
 38   
 39   private static final Map<String, String> METADATA =
 40     new HashMap<String, String>();
 41   
 42   static {
 43     METADATA.put(FILE_SYSTEM_NAME, FILE_SYSTEM_VALUE);
 44     METADATA.put(FILE_SYSTEM_TYPE_NAME, FILE_SYSTEM_TYPE_VALUE);
 45     METADATA.put(FILE_SYSTEM_VERSION_NAME, FILE_SYSTEM_VERSION_VALUE);
 46   }
 47 
 48   private static final String PATH_DELIMITER = Path.SEPARATOR;
 49   private static final String BLOCK_PREFIX = "block_";
 50 
 51   private Configuration conf;
 52   
 53   private S3Service s3Service;
 54 
 55   private S3Bucket bucket;
 56   
 57   private int bufferSize;
 58   
 59   public void initialize(URI uri, Configuration conf) throws IOException {
 60     
 61     this.conf = conf;
 62     
 63     S3Credentials s3Credentials = new S3Credentials();
 64     s3Credentials.initialize(uri, conf);
 65     try {
 66       AWSCredentials awsCredentials =
 67         new AWSCredentials(s3Credentials.getAccessKey(),
 68             s3Credentials.getSecretAccessKey());
 69       this.s3Service = new RestS3Service(awsCredentials);
 70     } catch (S3ServiceException e) {
 71       if (e.getCause() instanceof IOException) {
 72         throw (IOException) e.getCause();
 73       }
 74       throw new S3Exception(e);
 75     }
 76     bucket = new S3Bucket(uri.getHost());
 77 
 78     this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
 79   }
 80 
 81   public String getVersion() throws IOException {
 82     return FILE_SYSTEM_VERSION_VALUE;
 83   }
 84 
 85   private void delete(String key) throws IOException {
 86     try {
 87       s3Service.deleteObject(bucket, key);
 88     } catch (S3ServiceException e) {
 89       if (e.getCause() instanceof IOException) {
 90         throw (IOException) e.getCause();
 91       }
 92       throw new S3Exception(e);
 93     }
 94   }
 95 
 96   public void deleteINode(Path path) throws IOException {
 97     delete(pathToKey(path));
 98   }
 99 
100   public void deleteBlock(Block block) throws IOException {
101     delete(blockToKey(block));
102   }
103 
104   public boolean inodeExists(Path path) throws IOException {
105     InputStream in = get(pathToKey(path), true);
106     if (in == null) {
107       return false;
108     }
109     in.close();
110     return true;
111   }
112   
113   public boolean blockExists(long blockId) throws IOException {
114     InputStream in = get(blockToKey(blockId), false);
115     if (in == null) {
116       return false;
117     }
118     in.close();
119     return true;
120   }
121 
122   private InputStream get(String key, boolean checkMetadata)
123       throws IOException {
124     
125     try {
126       S3Object object = s3Service.getObject(bucket, key);
127       if (checkMetadata) {
128         checkMetadata(object);
129       }
130       return object.getDataInputStream();
131     } catch (S3ServiceException e) {
132       if ("NoSuchKey".equals(e.getS3ErrorCode())) {
133         return null;
134       }
135       if (e.getCause() instanceof IOException) {
136         throw (IOException) e.getCause();
137       }
138       throw new S3Exception(e);
139     }
140   }
141 
142   private InputStream get(String key, long byteRangeStart) throws IOException {
143     try {
144       S3Object object = s3Service.getObject(bucket, key, null, null, null,
145                                             null, byteRangeStart, null);
146       return object.getDataInputStream();
147     } catch (S3ServiceException e) {
148       if ("NoSuchKey".equals(e.getS3ErrorCode())) {
149         return null;
150       }
151       if (e.getCause() instanceof IOException) {
152         throw (IOException) e.getCause();
153       }
154       throw new S3Exception(e);
155     }
156   }
157 
158   private void checkMetadata(S3Object object) throws S3FileSystemException,
159       S3ServiceException {
160     
161     String name = (String) object.getMetadata(FILE_SYSTEM_NAME);
162     if (!FILE_SYSTEM_VALUE.equals(name)) {
163       throw new S3FileSystemException("Not a Hadoop S3 file.");
164     }
165     String type = (String) object.getMetadata(FILE_SYSTEM_TYPE_NAME);
166     if (!FILE_SYSTEM_TYPE_VALUE.equals(type)) {
167       throw new S3FileSystemException("Not a block file.");
168     }
169     String dataVersion = (String) object.getMetadata(FILE_SYSTEM_VERSION_NAME);
170     if (!FILE_SYSTEM_VERSION_VALUE.equals(dataVersion)) {
171       throw new VersionMismatchException(FILE_SYSTEM_VERSION_VALUE,
172           dataVersion);
173     }
174   }
175 
176   public INode retrieveINode(Path path) throws IOException {
177     return INode.deserialize(get(pathToKey(path), true));
178   }
179 
180   public File retrieveBlock(Block block, long byteRangeStart)
181     throws IOException {
182     File fileBlock = null;
183     InputStream in = null;
184     OutputStream out = null;
185     try {
186       fileBlock = newBackupFile();
187       in = get(blockToKey(block), byteRangeStart);
188       out = new BufferedOutputStream(new FileOutputStream(fileBlock));
189       byte[] buf = new byte[bufferSize];
190       int numRead;
191       while ((numRead = in.read(buf)) >= 0) {
192         out.write(buf, 0, numRead);
193       }
194       return fileBlock;
195     } catch (IOException e) {
196       // close output stream to file then delete file
197       closeQuietly(out);
198       out = null; // to prevent a second close
199       if (fileBlock != null) {
200         fileBlock.delete();
201       }
202       throw e;
203     } finally {
204       closeQuietly(out);
205       closeQuietly(in);
206     }
207   }
208   
209   private File newBackupFile() throws IOException {
210     File dir = new File(conf.get("fs.s3.buffer.dir"));
211     if (!dir.exists() && !dir.mkdirs()) {
212       throw new IOException("Cannot create S3 buffer directory: " + dir);
213     }
214     File result = File.createTempFile("input-", ".tmp", dir);
215     result.deleteOnExit();
216     return result;
217   }
218 
219   public Set<Path> listSubPaths(Path path) throws IOException {
220     try {
221       String prefix = pathToKey(path);
222       if (!prefix.endsWith(PATH_DELIMITER)) {
223         prefix += PATH_DELIMITER;
224       }
225       S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER);
226       Set<Path> prefixes = new TreeSet<Path>();
227       for (int i = 0; i < objects.length; i++) {
228         prefixes.add(keyToPath(objects[i].getKey()));
229       }
230       prefixes.remove(path);
231       return prefixes;
232     } catch (S3ServiceException e) {
233       if (e.getCause() instanceof IOException) {
234         throw (IOException) e.getCause();
235       }
236       throw new S3Exception(e);
237     }
238   }
239   
240   public Set<Path> listDeepSubPaths(Path path) throws IOException {
241     try {
242       String prefix = pathToKey(path);
243       if (!prefix.endsWith(PATH_DELIMITER)) {
244         prefix += PATH_DELIMITER;
245       }
246       S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
247       Set<Path> prefixes = new TreeSet<Path>();
248       for (int i = 0; i < objects.length; i++) {
249         prefixes.add(keyToPath(objects[i].getKey()));
250       }
251       prefixes.remove(path);
252       return prefixes;
253     } catch (S3ServiceException e) {
254       if (e.getCause() instanceof IOException) {
255         throw (IOException) e.getCause();
256       }
257       throw new S3Exception(e);
258     }    
259   }
260 
261   private void put(String key, InputStream in, long length, boolean storeMetadata)
262       throws IOException {
263     
264     try {
265       S3Object object = new S3Object(key);
266       object.setDataInputStream(in);
267       object.setContentType("binary/octet-stream");
268       object.setContentLength(length);
269       if (storeMetadata) {
270         object.addAllMetadata(METADATA);
271       }
272       s3Service.putObject(bucket, object);
273     } catch (S3ServiceException e) {
274       if (e.getCause() instanceof IOException) {
275         throw (IOException) e.getCause();
276       }
277       throw new S3Exception(e);
278     }
279   }
280 
281   public void storeINode(Path path, INode inode) throws IOException {
282     put(pathToKey(path), inode.serialize(), inode.getSerializedLength(), true);
283   }
284 
285   public void storeBlock(Block block, File file) throws IOException {
286     BufferedInputStream in = null;
287     try {
288       in = new BufferedInputStream(new FileInputStream(file));
289       put(blockToKey(block), in, block.getLength(), false);
290     } finally {
291       closeQuietly(in);
292     }    
293   }
294 
295   private void closeQuietly(Closeable closeable) {
296     if (closeable != null) {
297       try {
298         closeable.close();
299       } catch (IOException e) {
300         // ignore
301       }
302     }
303   }
304 
305   private String pathToKey(Path path) {
306     if (!path.isAbsolute()) {
307       throw new IllegalArgumentException("Path must be absolute: " + path);
308     }
309     return path.toUri().getPath();
310   }
311 
312   private Path keyToPath(String key) {
313     return new Path(key);
314   }
315   
316   private String blockToKey(long blockId) {
317     return BLOCK_PREFIX + blockId;
318   }
319 
320   private String blockToKey(Block block) {
321     return blockToKey(block.getId());
322   }
323 
324   public void purge() throws IOException {
325     try {
326       S3Object[] objects = s3Service.listObjects(bucket);
327       for (int i = 0; i < objects.length; i++) {
328         s3Service.deleteObject(bucket, objects[i].getKey());
329       }
330     } catch (S3ServiceException e) {
331       if (e.getCause() instanceof IOException) {
332         throw (IOException) e.getCause();
333       }
334       throw new S3Exception(e);
335     }
336   }
337 
338   public void dump() throws IOException {
339     StringBuilder sb = new StringBuilder("S3 Filesystem, ");
340     sb.append(bucket.getName()).append("\n");
341     try {
342       S3Object[] objects = s3Service.listObjects(bucket, PATH_DELIMITER, null);
343       for (int i = 0; i < objects.length; i++) {
344         Path path = keyToPath(objects[i].getKey());
345         sb.append(path).append("\n");
346         INode m = retrieveINode(path);
347         sb.append("\t").append(m.getFileType()).append("\n");
348         if (m.getFileType() == FileType.DIRECTORY) {
349           continue;
350         }
351         for (int j = 0; j < m.getBlocks().length; j++) {
352           sb.append("\t").append(m.getBlocks()[j]).append("\n");
353         }
354       }
355     } catch (S3ServiceException e) {
356       if (e.getCause() instanceof IOException) {
357         throw (IOException) e.getCause();
358       }
359       throw new S3Exception(e);
360     }
361     System.out.println(sb);
362   }
363 
364 }
View Code

 

 四、有图有真相

 五、结论

  Hadoop对S3文件系统的支持通过S3文件系统接口,实现的对S3文件系统的整合。有感兴趣的可以自行参照源码。


作者:张子良
出处:http://www.cnblogs.com/hadoopdev
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

相关文章
|
存储 分布式计算 Hadoop
aws s3常用命令、hdfs dfs/hadoop fs常用命令
aws s3常用命令、hdfs dfs/hadoop fs常用命令
1179 0
|
4月前
|
分布式计算 Hadoop Linux
Hadoop检查本地文件系统:
【7月更文挑战第24天】
43 6
|
4月前
|
分布式计算 Hadoop
|
分布式计算 Java Hadoop
Java: Hadoop文件系统的读写操作
Java: Hadoop文件系统的读写操作
165 0
|
存储 分布式计算 安全
|
存储 缓存 分布式计算
|
存储 缓存 分布式计算
|
存储 分布式计算 监控
|
存储 分布式计算 负载均衡

相关实验场景

更多