一、实验目的
- 深入理解HDFS工作原理和编程思想
- 使用HDFS的Java接口进行文件的读写
- 使用HDFS的Java接口进行之上传文件
- 使用HDFS的Java接口进行之删除文件
二、实验内容
- HDFS的Java API接口进行文件的读写操作
- HDFS的Java API接口进行之上传文件操作
- HDFS的Java API接口进行之删除文件操作
三、实验步骤
(一)HDFS-JAVA接口之读取文件
我们要深入探索Hadoop
的FileSystem
类,它是与Hadoop
的某一文件系统进行交互的API
。
为了完成接下来的操作,你需要学习并掌握:
1.FileSystem
对象的使用,2.FSDataInputSteam
对象的使用。
FileSystem对象
要从Hadoop
文件系统中读取文件,最简单的办法是使用java.net.URL
对象打开数据流,从中获取数据。不过这种方法一般要使用FsUrlStreamHandlerFactory
实例调用setURLStreamHandlerFactory()
方法。不过每个Java
虚拟机只能调用一次这个方法,所以如果其他第三方程序声明了这个对象,那我们将无法使用了。 因为有时候我们不能在程序中设置URLStreamHandlerFactory
实例,这个时候咱们就可以使用FileSystem API
来打开一个输入流,进而对HDFS
进行操作。
代码如下:
1. public sattic void main(String[] args){ 2. URI uri = URI.create("hdfs://localhost:9000/user/tmp/test.txt"); 3. Configuration config = new Configuration(); 4. FileSystem fs = FileSystem.get(uri, config); 5. InputStream in = null; 6. try { 7. in = fs.open(new Path(uri)); 8. IOUtils.copyBytes(in, System.out, 2048, false); 9. } catch (Exception e) { 10. IOUtils.closeStream(in); 11. } 12. }
FileSystem
是一个通用的文件系统API
,FileSystem
实例有下列几个静态工厂方法用来构造对象。
1. public static FileSystem get(Configuration conf)throws IOException 2. public static FileSystem get(URI uri,Configuration conf)throws IOException 3. public static FileSystem get(URI uri,Configuration conf,String user)throws IOException
Configuration
对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如:/etc/hadoop/core-site.xml
)。
- 第一个方法返回的默认文件系统是在
core-site.xml
中指定的,如果没有指定,就使用默认的文件系统。 - 第二个方法使用给定的
URI
方案和权限来确定要使用的文件系统,如果给定URI
中没有指定方案,则返回默认文件系统, - 第三个方法作为给定用户来返回文件系统,这个在安全方面来说非常重要。
FSDataInputStream对象
实际上,FileSystem
对象中的open()
方法返回的就是FSDataInputStream
对象,而不是标准的java.io
类对象。这个类是继承了java.io.DataInputStream
的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。
在有了FileSystem
实例之后,我们调用open()
函数来获取文件的输入流。
1. public FSDataInputStream open(Path p)throws IOException 2. public abst\fract FSDataInputStream open(Path f,int bufferSize)throws IOException
了解了这些,我们在来回顾上文代码,就能更好的理解这些方法的作用了:
编写代码实现如下功能:
使用FSDataInputStream
获取HDFS
的/user/hadoop/
目录下的task.txt
的文件内容,并输出;
预期输出: WARN [main] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
怕什么真理无穷,进一寸有一寸的欢喜。
相关代码:
1. 1. //启动hadoop: start-dfs.sh 2. 2. package step2; 3. 3. 4. 4. import java.io.IOException; 5. 5. import java.io.InputStream; 6. 6. import java.net.URI; 7. 7. 8. 8. import org.apache.hadoop.conf.Configuration; 9. 9. import org.apache.hadoop.fs.FileSystem; 10. 10. import org.apache.hadoop.fs.Path; 11. 11. import org.apache.hadoop.io.IOUtils; 12. 12. 13. 13. 14. 14. public class FileSystemCat { 15. 15. 16. 16. public static void main(String[] args) throws IOException { 17. 17. /********* Begin *********/ 18. 18. Configuration config = new Configuration(); 19. 19. URI uri = URI.create("hdfs://localhost:9000/user/hadoop/task.txt"); 20. 20. FileSystem fs = FileSystem.get(uri, config); 21. 21. InputStream in = null; 22. 22. try { 23. 23. in = fs.open(new Path(uri)); 24. 24. IOUtils.copyBytes(in, System.out, 2048, false); 25. 25. } catch (Exception e) { 26. 26. IOUtils.closeStream(in); 27. 27. } 28. 28. /********* End *********/ 29. 29. } 30. 30. 31. 31. }
(二)HDFS-JAVA接口之上传文件
FSDataOutputStream对象
我们知道在Java
中要将数据输出到终端,需要文件输出流,HDFS
的JavaAPI
中也有类似的对象。 FileSystem
类有一系列新建文件的方法,最简单的方法是给准备新建的文件制定一个path
对象,然后返回一个用于写入数据的输出流:
public FSDataOutputStream create(Path p)throws IOException
该方法有很多重载方法,允许我们指定是否需要强制覆盖现有文件,文件备份数量,写入文件时所用缓冲区大小,文件块大小以及文件权限。
注意:create()
方法能够为需要写入且当前不存在的目录创建父目录,即就算传入的路径是不存在的,该方法也会为你创建一个目录,而不会报错。如果有时候我们并不希望它这么做,可以先用exists()
方法先判断目录是否存在。
我们在写入数据的时候经常想要知道当前的进度,API
也提供了一个Progressable
用于传递回调接口,这样我们就可以很方便的将写入datanode
的进度通知给应用了。
1. package org.apache.hadoop.util; 2. public interface Progressable{ 3. public void progress(); 4. }
编写代码与脚本实现如下功能:
在/develop/input/
目录下创建hello.txt
文件,并输入如下数据: 迢迢牵牛星,皎皎河汉女。
纤纤擢素手,札札弄机杼。
终日不成章,泣涕零如雨。
河汉清且浅,相去复几许?
盈盈一水间,脉脉不得语。
《迢迢牵牛星》
使用FSDataOutputStream
对象将文件上传至HDFS
的/user/tmp/
目录下,并打印进度。
预期输出:
相关代码:
shell指令:
1. 1. mkdir /develop 2. 2. mkdir /develop/input 3. 3. cd /develop/input 4. 4. touch hello.txt 5. 5. vim hello.txt 插入数据 wq 保存退出 6. 6. start-dfs.sh
1. 1. package step3; 2. 2. 3. 3. 4. 4. import java.io.BufferedInputStream; 5. 5. import java.io.FileInputStream; 6. 6. import java.io.FileNotFoundException; 7. 7. import java.io.IOException; 8. 8. import java.io.InputStream; 9. 9. import java.net.URI; 10. 10. import java.io.File; 11. 11. 12. 12. import org.apache.hadoop.conf.Configuration; 13. 13. import org.apache.hadoop.fs.FSDataOutputStream; 14. 14. import org.apache.hadoop.fs.FileSystem; 15. 15. import org.apache.hadoop.fs.Path; 16. 16. import org.apache.hadoop.io.IOUtils; 17. 17. import org.apache.hadoop.util.Progressable; 18. 18. 19. 19. 20. 20. public class FileSystemUpload { 21. 21. 22. 22. public static void main(String[] args) throws IOException { 23. 23. /********* Begin *********/ 24. 24. File localPath = new File("/develop/input/hello.txt"); 25. 25. String hdfsPath = "hdfs://localhost:9000/user/tmp/hello.txt"; 26. 26. 27. 27. InputStream in = new BufferedInputStream(new FileInputStream(localPath));// 获取输入流对象 28. 28. 29. 29. Configuration config = new Configuration(); 30. 30. 31. 31. FileSystem fs = FileSystem.get(URI.create(hdfsPath), config); 32. 32. 33. 33. long fileSize = localPath.length() > 65536 ? localPath.length() / 65536 : 1; // 待上传文件大小 34. 34. 35. 35. FSDataOutputStream out = fs.create(new Path(hdfsPath), new Progressable() { 36. 36. // 方法在每次上传了64KB字节大小的文件之后会自动调用一次 37. 37. long fileCount = 0; 38. 38. 39. 39. public void progress() { 40. 40. System.out.println("总进度" + (fileCount / fileSize) * 100 + "%"); 41. 41. fileCount++; 42. 42. } 43. 43. }); 44. 44. 45. 45. IOUtils.copyBytes(in, out, 2048, true);// 最后一个参数的意思是使用完之后是否关闭流 46. 46. /********* End *********/ 47. 47. } 48. 48. }
(三)HDFS-JAVA接口之删除文件
我们在开发或者维护系统时,经常会需要列出目录的内容,在HDFS
的API
中就提供了listStatus()
方法来实现该功能。
1. public FileStatus[] listStatus(Path f)throws IOException 2. public FileStatus[] listStatus(Path f,PathFilter filter)throws IOException 3. public FileStatus listStatus(Path[] files)throws IOException 4. public FileStatus() listStatus(Path[] files,PathFilter filter)throws IOException
当传入参数是一个文件时,他会简单的转变成以数组方式返回长度为1
的FileStatus
对象,当传入参数是一个目录时,则返回0
或多个FileStatus
对象,表示此目录中包含的文件和目录。
删除文件
使用FileSystem
的delete()
方法可以永久性删除文件或目录。
public boolean delete(Path f,boolean recursive)throws IOException
如果f
是一个文件或者空目录,那么recursive
的值可以忽略,当recursize
的值为true
,并且p
是一个非空目录时,非空目录及其内容才会被删除(否则将会抛出IOException
异常)。
编写代码实现如下功能:
- 删除
HDFS
的/user/hadoop/
目录(空目录); - 删除
HDFS
的/tmp/test/
目录(非空目录); - 列出
HDFS
根目录下所有的文件和文件夹; - 列出
HDFS
下/tmp/
的所有文件和文件夹。
预期输出:
相关代码:
1. 1. package step4; 2. 2. import java.io.IOException; 3. 3. import java.net.URI; 4. 4. import org.apache.hadoop.conf.Configuration; 5. 5. import org.apache.hadoop.fs.FileStatus; 6. 6. import org.apache.hadoop.fs.FileSystem; 7. 7. import org.apache.hadoop.fs.FileUtil; 8. 8. import org.apache.hadoop.fs.Path; 9. 9. 10. 10. public class FileSystemDelete { 11. 11. 12. 12. public static void main(String[] args) throws IOException { 13. 13. /********* Begin *********/ 14. 14. String root = "hdfs://localhost:9000/";//根目录 15. 15. String path = "hdfs://localhost:9000/tmp"; //要列出的目录 16. 16. //待删除的两个目录 17. 17. String del1 = "hdfs://localhost:9000/user/hadoop"; 18. 18. String del2 = "hdfs://localhost:9000/tmp/test"; 19. 19. 20. 20. Configuration config = new Configuration(); 21. 21. FileSystem fs = FileSystem.get(URI.create(root),config); 22. 22. fs.delete(new Path(del1),true); 23. 23. fs.delete(new Path(del2),true); 24. 24. Path[] paths = {new Path(root),new Path(path)}; 25. 25. FileStatus[] status = fs.listStatus(paths); 26. 26. Path[] listPaths = FileUtil.stat2Paths(status); 27. 27. 28. 28. for(Path path1 : listPaths){ 29. 29. System.out.println(path1); 30. 30. } 31. 31. /********* End *********/ 32. 32. } 33. 33. }
四、实验心得
会使用HDFS的Java接口进行文件的读写
会使用HDFS的Java接口进行之上传文件
会使用HDFS的Java接口进行之删除文件