一、实验目的
熟练采用JAVA API访问 HDFS。
二、实验原理
HDFS是hadoop平台的核心组成之一。熟悉使用hadoop平台需要熟练访问HDFS。HDFS的访问方式有多种。可通过web访问,也可通过shell方式或者API方式访问。基本操作有对文件的读、写、追加、删除等。新建文件夹、删除文件夹等。还可显示文件及文件夹的属性。
HDFS主要用到了FileSystem类,相关的接口可以在这里查到:
http://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/fs/FileSystem.html
或通过IDEA,Ctrl+点击 FileSystem类,也可以看到源码。
下面列了FileSystem的常用接口:
三、实验环境
hadoop2.7.3
Java IDE:IDEA
四、实验内容
打开桌面terminal,在家目录下,下载项目,并解压缩
wget http://i9000.net:8888/sgn/HUP/HadoopDeployPro/cloud-disk.zip unzip cloud-disk.zip
检查是否已启动Hadoop
用jps查看是否启动成功,如果有进程未启动,可以尝试再次启动Hadoop
1.打开IDEA,导入项目“clouddisk”。
选择clouddisk项目的pom.xml
2.修改配置
application.properties文件中修改下面的IP为虚拟机的IP,此时我们写localhost即可
hadoop.namenode.rpc.url=hdfs://localhost:8020
3.修改用户代码观察网盘效果
将如图文件的,将所有的hadoop用户改为ubuntu,涉及创建目录、删除目录等,总共5处需要修改。
4.右键运行
当看到如下截图时:
访问页面:
http://localhost:9090/ (链接到外部网站。)链接到外部网站。
可以看到实验一、实验二在hdfs上的目录,我们还可以自己创建,删除,重命名
无论删除还是增加都会有日志:
NOTE:每一次修改代码都需要重新运行代码,点击如图Rerun图标,重新运行代码,当tomcat与项目代码都启动时,打开浏览器再进行操作
5.开发,实现上传、下载
类名:com.mypro.clouddisk.hdfs.FileSystemImpl
实现upload 、download的代码。
@Override public void upload(InputStream is, String dstHDFSFile) throws Exception { //TODO //代码待实现 System.out.println( "Upload Successfully!" ); } @Override public void download(String file, OutputStream os) throws Exception { //TODO //代码待实现 System.out.println( "Download Successfully!" ); }
五、代码附录
IndexController
package com.mypro.clouddisk.controller; import com.mypro.clouddisk.hdfs.IFileSystem; import com.mypro.clouddisk.model.FileIndex; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; import org.springframework.web.servlet.mvc.support.RedirectAttributes; import javax.servlet.http.HttpServletResponse; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; @Controller public class IndexController { @Autowired private IFileSystem fileSystem = null; @RequestMapping("/") public String index(String path,Model model) { FileIndex fileIndex = new FileIndex(); path = (path==null || path.trim().isEmpty()) ? "/": path.trim(); fileIndex.setPath(path); String fileName = fileSystem.getFileName(path); fileIndex.setName(fileName); model.addAttribute("rootDir",fileIndex); List<FileIndex> list = new ArrayList<FileIndex>(); try { list = fileSystem.ls(path); } catch (Exception e) { e.printStackTrace(); } model.addAttribute("rootFiles",list); return "index"; } @GetMapping("/download") public String download(HttpServletResponse response, @RequestParam String file) throws Exception { // String filename="xxx.txt"; File hFile = new File(file); String filename = hFile.getName(); response.setHeader("Content-Disposition", "attachment;fileName=" + filename); fileSystem.download(file,response.getOutputStream()); return null; } /** * * @param path //??????? * @return * @throws Exception */ @RequestMapping("/delete") public String delete(@RequestParam String path) throws Exception { String parentPath = fileSystem.rm(path); //?????? return "redirect:./?path=" + parentPath; } @ResponseBody @RequestMapping(value="/checkMD5",method=RequestMethod.POST) public String checkMD5(String md5code){ //??????????MD5??md5code??? return "no"; } @PostMapping("/uploadFile") public String singleFileUpload(@RequestParam("file") MultipartFile file, String parentPath,RedirectAttributes redirectAttributes) throws IOException { String fileName = getOriginalFilename(file.getOriginalFilename()); InputStream is = file.getInputStream(); Long size = file.getSize(); if (file.isEmpty()) { redirectAttributes.addFlashAttribute("message", "Please select a file to upload"); return "redirect:uploadStatus"; } try { String dstPath = parentPath.endsWith("/")? (parentPath+fileName) : (parentPath +"/"+fileName); fileSystem.upload(is,dstPath); redirectAttributes.addFlashAttribute("message", "You successfully uploaded '" + file.getOriginalFilename() + "'"); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return "redirect:./?path="+parentPath; } @PostMapping("/mkdir") public String mkdir(String directName,String parentPath) throws Exception { String newDir = parentPath.endsWith("/")?(parentPath+directName):(parentPath+"/"+directName); fileSystem.mkdir(newDir); return "redirect:./?path="+parentPath; } @RequestMapping("renameForm") public String renameForm(String directName,String isRoot,String path) throws Exception { //??fileindex?name?path //???????path String[] arr= fileSystem.rename(path,directName); //????????????????????????????????????????????????????????? if(isRoot!=null && isRoot.equals("yes")){ return "redirect:./?path=" + arr[1]; //mypath }else{ return "redirect:./?path=" + arr[0];//parent } } @RequestMapping("/getOptionalPath") @ResponseBody public List getOptionalPath(String path){ //?????fileIndexId????????????????????????????? List result = fileSystem.getOptionTranPath(path); return result; } @RequestMapping("/searchFiles") public String searchFiles(String keyWord,@RequestParam(value="pageNum",defaultValue="1")int pageNum,Model model) throws Exception { int pageSize = 3; List<FileIndex> result = fileSystem.searchFileByPage(keyWord,pageSize,pageNum); com.github.pagehelper.Page<FileIndex> page =new com.github.pagehelper.Page<FileIndex>(); page.addAll(result); model.addAttribute("result", page); model.addAttribute("keyWord",keyWord); return "searchResult"; } @RequestMapping("/stasticFiles") public String stasticFiles(Model model){ // List staticResult = fileSystem.getStaticNums(); List<Map> staticResult = new ArrayList<Map>(); Map map = new HashMap(); map.put("doc_number",100); map.put("video_number",87); map.put("pic_number",66); map.put("code_number",44); map.put("other_number",23); staticResult.add(map); model.addAttribute("staticResult", staticResult); return "stasticfiles"; } private String getOriginalFilename(String originalFilename){ if(originalFilename == null) { return ""; } if(originalFilename.contains("/") || originalFilename.contains("\\")){ File file = new File(originalFilename); return file.getName(); } return originalFilename; } } }
FileSystemImpl
package com.mypro.clouddisk.hdfs; import com.mypro.clouddisk.model.FileIndex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.*; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Date; import java.util.List; @Component public class FileSystemImpl implements IFileSystem { Logger logger = LoggerFactory.getLogger(FileSystemImpl.class); @Value("${hadoop.namenode.rpc.url}") private String namenodeRpcUrl; // private static String NAMENODE_RPC="hdfs://192.168.72.128:8020"; @Override public List<FileIndex> ls(String dir) throws Exception{ Configuration conf=new Configuration(); //??NameNode?? URI uri=new URI(namenodeRpcUrl); //?????,??FileSystem?? FileSystem fs=FileSystem.get(uri,conf,"ubuntu"); Path path = new Path(dir); //???????? if(! fs.exists(path)){ logger.error("dir:"+dir+" not exists!"); throw new RuntimeException("dir:"+dir+" not exists!"); } List<FileIndex> list = new ArrayList<FileIndex>(); FileStatus[] filesStatus = fs.listStatus(path); for(FileStatus f:filesStatus){ FileIndex fileIndex = new FileIndex(); fileIndex.setIsFile(f.isDirectory()?"0":"1"); fileIndex.setName(f.getPath().getName()); fileIndex.setPath(f.getPath().toUri().getPath()); fileIndex.setCreateTime(new Date()); list.add(fileIndex); } //??????FileSystem???? fs.close(); return list; } @Override public void mkdir(String dir) throws Exception{ Configuration conf=new Configuration(); //??NameNode?? URI uri=new URI(namenodeRpcUrl); //?????,??FileSystem?? FileSystem fs=FileSystem.get(uri,conf,"ubuntu"); fs.mkdirs(new Path(dir)); //??????FileSystem????client fs.close(); System.out.println( "mkdir "+dir+" Successfully!" ); } @Override /** * ??????? */ public String rm(String path) throws Exception { Configuration conf=new Configuration(); //??NameNode?? URI uri=new URI(namenodeRpcUrl); //?????,??FileSystem?? FileSystem fs=FileSystem.get(uri,conf,"ubuntu"); Path filePath = new Path(path); fs.delete(filePath,true); //??????FileSystem????client fs.close(); System.out.println( "Delete "+path+" Successfully!" ); return filePath.getParent().toUri().getPath(); } @Override public void upload(InputStream is, String dstHDFSFile) throws Exception { //TODO //??? System.out.println( "Upload Successfully!" ); } @Override public void download(String file, OutputStream os) throws Exception { //TODO //??? System.out.println( "Download Successfully!" ); } @Override public void mv() { } @Override public String[] rename(String path, String dirName) throws Exception { Configuration conf=new Configuration(); //??NameNode?? URI uri=new URI(namenodeRpcUrl); //?????,??FileSystem?? FileSystem fs=FileSystem.get(uri,conf,"ubuntu"); Path oldPath = new Path(path); if(oldPath.getParent() ==null){ String[] arr = new String[2]; arr[0]="/"; arr[1]="/"; return arr; } String parentPath = oldPath.getParent().toUri().getPath(); String newPathStr = parentPath.endsWith("/")?(parentPath+dirName):(parentPath + "/" +dirName); Path newPath = new Path(newPathStr ); fs.rename(oldPath,newPath); //??????FileSystem????client fs.close(); String[] arr = new String[2]; arr[0]=parentPath; arr[1]=newPathStr; System.out.println( "rename Successfully!" ); return arr; } @Override public String getFileName(String path) { Path filePath = new Path(path); return filePath.getName(); } @Override public List getOptionTranPath(String path) { return new ArrayList(); } @Override public List<FileIndex> searchFileByPage(String keyWord, int pageSize, int pageNum) throws Exception{ Configuration conf=new Configuration(); //??NameNode?? URI uri=new URI(namenodeRpcUrl); //?????,??FileSystem?? FileSystem fs=FileSystem.get(uri,conf,"ubuntu"); ArrayList<FileStatus> results = null; PathFilter filter = new PathFilter() { @Override public boolean accept(Path path) { if(keyWord == null || keyWord.trim().isEmpty()){ return false; } if(path.getName().contains(keyWord)){ return true; } return false; } }; List<FileIndex> list = new ArrayList<FileIndex>(); FileStatus[] fileStatusArr = fs.listStatus(new Path("/"),filter); for(FileStatus status :fileStatusArr){ FileIndex fileIndex = new FileIndex(); fileIndex.setPath(status.getPath().toUri().getPath()); fileIndex.setName(status.getPath().getName()); fileIndex.setIsFile(status.isFile()?"1":"0"); list.add(fileIndex); } //??????FileSystem????client fs.close(); System.out.println( "Search Successfully!" ); return list; } @Override public List getStaticNums() { return null; } }
FileSystem
package com.mypro.clouddisk.hdfs; import com.mypro.clouddisk.model.FileIndex; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URISyntaxException; import java.util.List; public interface IFileSystem { /** * ls ???????? * @return */ List<FileIndex> ls(String dir) throws Exception; /** * ???? * @return */ void mkdir(String dir)throws Exception; /** * ??????? * ?????? */ String rm(String path) throws Exception; /** * ?? */ void upload(InputStream is, String dstHDFSFile) throws Exception; /** * ?? */ void download(String file, OutputStream os) throws Exception; /** * ??? */ void mv(); /** * ??? */ String[] rename(String path,String dirName) throws Exception; /** * ????? * @param path * @return */ String getFileName(String path); //?????fileIndexId????????????????????????????? List getOptionTranPath(String path); List<FileIndex> searchFileByPage(String keyWord, int pageSize, int pageNum) throws Exception; List getStaticNums(); }