java实现下载hdfs文件及文件夹
说明:java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
<!--阿里 FastJson依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.1</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.1</version>
</dependency>
==相关类引入jar包,代码上方查看对照即可==
1.下载xxx文件
“下载文件” 执行流程说明:
1.构建hdfs连接,初始化Configuration
2.获取文件输入流FSDataInputStream,调用downloadFile()
3.方法内部先设置header请求头,格式以文件名(convertFileName(fileName))输出文件,然后输出流内部信息以流的形式输出
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import util.ExportUtil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* 下载文件
* @author liudz
* @date 2020/6/9
* @return 执行结果
**/
@RequestMapping(value = "/down", method = RequestMethod.GET)
public ResponseEntity<InputStreamResource> Test01() throws URISyntaxException, IOException {
//下面两行,初始化hdfs配置连接
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://172.16.1.9:8020"), conf);
FSDataInputStream inputStream = fs.open(new Path("hdfs://172.16.1.9:8020/spark/testLog.txt"));
ResponseEntity<InputStreamResource> result = ExportUtil.downloadFile(inputStream, "testLog.txt");
return result;
}
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
/**
* 文件以流的形式读取
*
* @param in 字符输入流
* @param fileName 文件名字
* @return 返回结果
*/
public static ResponseEntity<InputStreamResource> downloadFile(InputStream in, String fileName) {
try {
byte[] testBytes = new byte[in.available()];
HttpHeaders headers = new HttpHeaders();
headers.add("Cache-Control", "no-cache, no-store, must-revalidate");
headers.add("Content-Disposition", String.format("attachment; filename=\"%s\"", convertFileName(fileName)));
headers.add("Pragma", "no-cache");
headers.add("Expires", "0");
headers.add("Content-Language", "UTF-8");
//最终这句,让文件内容以流的形式输出
return ResponseEntity.ok().headers(headers).contentLength(testBytes.length)
.contentType(MediaType.parseMediaType("application/octet-stream")).body(new InputStreamResource(in));
} catch (IOException e) {
log.info("downfile is error" + e.getMessage());
}
log.info("file is null" + fileName);
return null;
}
2.下载xx文件夹
“下载文件夹及内部文件” 执行流程说明:
1.初始化header请求头信息,格式以xx.zip输出文件夹,调用down2()
2.构建hdfs连接,初始化Configuration
3.调用迭代器compress,传入参数(文件夹整体路径 + ZipOutputStream实例 + FileSystem实例)
4.迭代器执行思路:
遍历对应子目录:1)如果为文件夹,zip写入一个文件进入点(路径末尾单词 + “/”)
2)如果为文件,zip写入文件(目录文件的整体路径)
----------------------------------------------------------------------------------------
******注意:容易出错2行代码:******
压缩文件:zipOutputStream.putNextEntry(new ZipEntry(name.substring(1)));
压缩文件夹:zipOutputStream.putNextEntry(new ZipEntry(fileStatulist[i].getPath().getName() + "/"));
**name属性用于zip创建文件,fileStatulist[i].getPath().getName()用于zip创建文件夹**
-----------------------------------------------------------------------------------------
举例说明:
假设文件夹spark-warehouse路径下有2文件夹data1和data2,文件夹下各一个a.txt文本文件
第一步:获取路径“C:/Users/liudz/Desktop/spark-warehouse”下的目录,也就是(C:/Users/liudz/Desktop/spark-warehouse/data1、C:/Users/liudz/Desktop/spark-warehouse/data2)
lastName=spark-warehouse
name=/spark-warehouse/data1
判断“C:/Users/liudz/Desktop/spark-warehouse/data1”为目录,zip写入“data1/”文件夹
第二步:获取路径“C:/Users/liudz/Desktop/spark-warehouse/data1”下的目录,也就是(C:/Users/liudz/Desktop/spark-warehouse/data1/a.txt)
lastName=data1
name=/data1/a.txt
判断“C:/Users/liudz/Desktop/spark-warehouse/data1/a.txt”为文件,zip写入“data1/a。txt”文件
。
。
。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import util.ExportUtil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* 下载文件夹
* @param businessId 业务ID
* @author liudz
* @date 2020/6/9
* @return 执行结果
**/
@RequestMapping(value = "/downloadFolder", method = RequestMethod.GET)
public ResponseEntity<byte[]> downloadFolder(Long businessId) throws IOException {
ResponseEntity<byte[]> response = null;
HttpHeaders headers = new HttpHeaders();
headers.add("Cache-Control", "no-cache, no-store, must-revalidate");
headers.add("Content-Disposition", "attachment; filename=spark-warehouse.zip");
headers.add("Pragma", "no-cache");
headers.add("Expires", "0");
headers.add("Content-Language", "UTF-8");
ByteArrayOutputStream zos =
(ByteArrayOutputStream) hdfsClientService.down2("hdfs://172.16.1.9:8020/spark/spark-warehouse");
byte[] out = zos.toByteArray();
zos.close();
response = new ResponseEntity<>(out, headers, HttpStatus.OK);
return response;
}
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.stereotype.Service;
/**
* 多文件
*
* @param cloudPath
* cloudPath
* @author liudz
* @date 2020/6/8
* @return 执行结果
**/
public OutputStream down2(String cloudPath) {
// 1获取对象
ByteArrayOutputStream out = null;
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://172.16.1.9:8020"), conf);
out = new ByteArrayOutputStream();
ZipOutputStream zos = new ZipOutputStream(out);
compress(cloudPath, zos, fs);
zos.close();
} catch (IOException e) {
log.info("----error:{}----" + e.getMessage());
} catch (URISyntaxException e) {
log.info("----error:{}----" + e.getMessage());
}
return out;
}
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.stereotype.Service;
/**
* compress
*
* @param baseDir
* baseDir
* @param zipOutputStream
* zipOutputStream
* @param fs
* fs
* @author liudz
* @date 2020/6/8
**/
public void compress(String baseDir, ZipOutputStream zipOutputStream, FileSystem fs) throws IOException {
try {
FileStatus[] fileStatulist = fs.listStatus(new Path(baseDir));
log.info("basedir = " + baseDir);
String[] strs = baseDir.split("/");
//lastName代表路径最后的单词
String lastName = strs[strs.length - 1];
for (int i = 0; i < fileStatulist.length; i++) {
String name = fileStatulist[i].getPath().toString();
name = name.substring(name.indexOf("/" + lastName));
if (fileStatulist[i].isFile()) {
Path path = fileStatulist[i].getPath();
FSDataInputStream inputStream = fs.open(path);
zipOutputStream.putNextEntry(new ZipEntry(name.substring(1)));
IOUtils.copyBytes(inputStream, zipOutputStream, Integer.parseInt("1024"));
inputStream.close();
} else {
zipOutputStream.putNextEntry(new ZipEntry(fileStatulist[i].getPath().getName() + "/"));
log.info("fileStatulist[i].getPath().toString() = " + fileStatulist[i].getPath().toString());
compress(fileStatulist[i].getPath().toString(), zipOutputStream, fs);
}
}
} catch (IOException e) {
log.info("----error:{}----" + e.getMessage());
}
}