1. HDFS操作常用Shell命令
1.1 查看命令使用方法
启动Hadoop
start-dfs.sh
查看各种命令
hdfs dfs -help
1.2 HDFS目录操作
1.2.1 目录操作方法
查看HDFS下所有的目录
hdfs dfs -ls
创建一个input_test的目录
hdfs dfs -mkdir input_test
删除input_test的目录
hdfs dfs -rm -r input_test
1.2.2 文件操作方法
查看HDFS中一个文件in0.txt的内容
hdfs dfs -cat in0.txt
把HDFS中的in0.txt文件内容下载到本地系统/home/zqc/download
hdfs dfs -get in0.txt /home/zqc/download
文件上传到HDFS out文件夹中
hdfs dfs -put /home/zqc/score.txt out
把文件从HDFS的一个目录复制到另外一个目录
hdfs dfs -cp out/score.txt wordcount/input
2. 利用HDFS的Web管理界面
3. HDFS编程实践
在IDEA中创建项目
为项目添加需要用到的JAR包
编写Java应用程序
编译运行程序
应用程序的部署
3.1 题目1
编写 FileUtils 类,其中包含文件下载与上传函数的实现,要求如下:
A. 函数UploadFile()向HDFS上传任意文本文件,如果指定的文件在HDFS中已经存在,由用户指定是追加到原有文件末尾还是覆盖原有的文件;
B. 函数DownloadFile()从HDFS中下载指定文件,如果本地文件与要下载的文件名称相同,则自动对下载的文件重命名;
C. 在本地Download文件夹中创建文本文件 localfile.txt ,在main函数中编写逻辑实现将其上传到hdfs的input文件夹中;
import java.io.*; import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class FileUtils { public static void appendToFile(Configuration conf, String LocalPath, String UploadPath) { Path uploadpath = new Path(UploadPath); try (FileSystem fs = FileSystem.get(conf); FileInputStream in = new FileInputStream(LocalPath);) { FSDataOutputStream out = fs.append(uploadpath); byte[] data = new byte[1024]; int read = -1; while ((read = in.read(data)) > 0) { out.write(data, 0, read); } out.close(); } catch (IOException e) { e.printStackTrace(); } } public static void coverFile(Configuration conf, String LocalPath, String UploadPath) { Path uploadpath = new Path(UploadPath); try (FileSystem fs = FileSystem.get(conf); FileInputStream in = new FileInputStream(LocalPath);) { FSDataOutputStream out = fs.create(uploadpath); byte[] data = new byte[1024]; int read = -1; while ((read = in.read(data)) > 0) { out.write(data, 0, read); } out.close(); } catch (IOException e) { e.printStackTrace(); } } public static void UploadFile(Configuration conf, String LocalPath, String UploadPath) { try { FileSystem fs = FileSystem.get(conf); Path localpath = new Path(LocalPath); Path uploadpath = new Path(UploadPath); if (fs.exists(uploadpath)) { System.out.println("File \"" + UploadPath + "\" exist!"); System.out.println("1. append\t2. cover"); Scanner sc = new Scanner(System.in); String s = sc.nextLine(); if (s.equals("1")) { try { appendToFile(conf, LocalPath, UploadPath); } catch (Exception e) { e.printStackTrace(); } } else { try { coverFile(conf, LocalPath, UploadPath); } catch (Exception e) { e.printStackTrace(); } } } else { System.out.println("File \"" + UploadPath + "\" not exist!"); InputStream in = new FileInputStream(LocalPath); OutputStream out = fs.create(uploadpath); IOUtils.copyBytes(in, out, 4096, true); System.out.println("File uploaded successfully!"); } } catch (IOException e) { e.printStackTrace(); } } public static void DownloadFile(Configuration conf, String LocalPath, String DownloadPath) { Path downloadpath = new Path(DownloadPath); try (FileSystem fs = FileSystem.get(conf)) { File f = new File(LocalPath); if (f.exists()) { System.out.println(LocalPath + " exits!"); Integer i = Integer.valueOf(0); while (true) { f = new File(LocalPath + "_" + i.toString()); if (!f.exists()) { LocalPath = LocalPath + "_" + i.toString(); break; } else { i++; continue; } } System.out.println("rename: " + LocalPath); } Path localpath = new Path(LocalPath); fs.copyToLocalFile(downloadpath, localpath); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true"); conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); conf.set("fs.defaultFS", "hdfs://localhost:9000"); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); String LocalPath = "/home/zqc/Downloads/localfile.txt"; String UploadPath = "/user/zqc/input/localfile.txt"; // String DownloadPath = "/user/hadoop/input/score.txt"; UploadFile(conf, LocalPath, UploadPath); // DownloadFile(conf, LocalPath, DownloadPath); // try { // String CreateDir = "/home/zqc/Downloads/"; // String FileName = "localfile.txt"; // String HDFSDir = "/user/hadoop/input"; // File file = new File(CreateDir, FileName); // if (file.createNewFile()) { // FileSystem hdfs = FileSystem.get(conf); // Path localpath = new Path(CreateDir + FileName); // Path hdfspath = new Path(HDFSDir); // hdfs.copyFromLocalFile(localpath, hdfspath); // } // } catch (Exception e) { // e.printStackTrace(); // } } }
3.2 题目2
A. 编程实现一个类“MyFSDataInputStream”,该类继承“org.apache.hadoop.fs.FSDataInputStream”,要求如下:实现按行读取HDFS中指定文件的方法“readLine()”,如果读到文件末尾,则返回空,否则返回文件一行的文本。
B. 在main函数中编写逻辑实现按行读取input文件夹中的file.txt (查看附件)文件,将长度超过15个字符的行在控制台中打印出来;
import java.io.*; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class ReadLine { public class MyFSDataInputStream extends FSDataInputStream { public MyFSDataInputStream(InputStream in) { super(in); } } public static String readline(Configuration conf, String filepath) throws IOException { Path path = new Path(filepath); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf); FSDataInputStream in = fs.open(path); BufferedReader d = new BufferedReader(new InputStreamReader(in)); String line = null; while ((line = d.readLine()) != null) { System.out.println(line); } d.close(); in.close(); return null; } public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://localhost:9000"); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); String filepath = "/user/zqc/input/file.txt"; try { Path path = new Path(filepath); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf); FSDataInputStream in = fs.open(path); BufferedReader d = new BufferedReader(new InputStreamReader(in)); String line = null; while ((line = d.readLine()) != null) { if (line.length() > 15) { System.out.println(line); } } d.close(); in.close(); } catch (Exception e) { e.printStackTrace(); } } }