java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下

简介: java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下

1.jpg

java实现下载hdfs文件及文件夹

说明:java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下

 <!--阿里 FastJson依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.1</version>
        </dependency>

==相关类引入jar包,代码上方查看对照即可==

1.下载xxx文件

“下载文件” 执行流程说明:
            1.构建hdfs连接,初始化Configuration
            2.获取文件输入流FSDataInputStream,调用downloadFile()
            3.方法内部先设置header请求头,格式以文件名(convertFileName(fileName))输出文件,然后输出流内部信息以流的形式输出
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import util.ExportUtil;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
     *  下载文件
     * @author liudz
     * @date 2020/6/9
     * @return 执行结果
     **/
    @RequestMapping(value = "/down", method = RequestMethod.GET)
    public ResponseEntity<InputStreamResource> Test01() throws URISyntaxException, IOException {
   
        //下面两行,初始化hdfs配置连接
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://172.16.1.9:8020"), conf);
        FSDataInputStream inputStream = fs.open(new Path("hdfs://172.16.1.9:8020/spark/testLog.txt"));
        ResponseEntity<InputStreamResource> result = ExportUtil.downloadFile(inputStream, "testLog.txt");
        return result;
    }
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;

import lombok.extern.slf4j.Slf4j;

import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
/**
     * 文件以流的形式读取
     * 
     * @param in 字符输入流
     * @param fileName 文件名字
     * @return 返回结果
     */
    public static ResponseEntity<InputStreamResource> downloadFile(InputStream in, String fileName) {
   

        try {
   
            byte[] testBytes = new byte[in.available()];
            HttpHeaders headers = new HttpHeaders();
            headers.add("Cache-Control", "no-cache, no-store, must-revalidate");
            headers.add("Content-Disposition", String.format("attachment; filename=\"%s\"", convertFileName(fileName)));
            headers.add("Pragma", "no-cache");
            headers.add("Expires", "0");
            headers.add("Content-Language", "UTF-8");
            //最终这句,让文件内容以流的形式输出
            return ResponseEntity.ok().headers(headers).contentLength(testBytes.length)
                .contentType(MediaType.parseMediaType("application/octet-stream")).body(new InputStreamResource(in));
        } catch (IOException e) {
   
            log.info("downfile is error" + e.getMessage());
        }
        log.info("file is null" + fileName);
        return null;
    }

2.下载xx文件夹

“下载文件夹及内部文件” 执行流程说明:
    1.初始化header请求头信息,格式以xx.zip输出文件夹,调用down2()
    2.构建hdfs连接,初始化Configuration
    3.调用迭代器compress,传入参数(文件夹整体路径 + ZipOutputStream实例 + FileSystem实例)
    4.迭代器执行思路:
            遍历对应子目录:1)如果为文件夹,zip写入一个文件进入点(路径末尾单词 + “/”)
                          2)如果为文件,zip写入文件(目录文件的整体路径)

----------------------------------------------------------------------------------------                      
******注意:容易出错2行代码:******
压缩文件:zipOutputStream.putNextEntry(new ZipEntry(name.substring(1)));
压缩文件夹:zipOutputStream.putNextEntry(new ZipEntry(fileStatulist[i].getPath().getName() + "/"));
**name属性用于zip创建文件,fileStatulist[i].getPath().getName()用于zip创建文件夹**
-----------------------------------------------------------------------------------------
举例说明:
    假设文件夹spark-warehouse路径下有2文件夹data1和data2,文件夹下各一个a.txt文本文件
    第一步:获取路径“C:/Users/liudz/Desktop/spark-warehouse”下的目录,也就是(C:/Users/liudz/Desktop/spark-warehouse/data1、C:/Users/liudz/Desktop/spark-warehouse/data2)
    lastName=spark-warehouse
    name=/spark-warehouse/data1
    判断“C:/Users/liudz/Desktop/spark-warehouse/data1”为目录,zip写入“data1/”文件夹
    第二步:获取路径“C:/Users/liudz/Desktop/spark-warehouse/data1”下的目录,也就是(C:/Users/liudz/Desktop/spark-warehouse/data1/a.txt)
    lastName=data1
    name=/data1/a.txt
    判断“C:/Users/liudz/Desktop/spark-warehouse/data1/a.txt”为文件,zip写入“data1/a。txt”文件
    。
    。
    。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import util.ExportUtil;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
     *  下载文件夹
     * @param businessId 业务ID
     * @author liudz
     * @date 2020/6/9
     * @return 执行结果
     **/
    @RequestMapping(value = "/downloadFolder", method = RequestMethod.GET)
    public ResponseEntity<byte[]> downloadFolder(Long businessId) throws IOException {
   
        ResponseEntity<byte[]> response = null;
        HttpHeaders headers = new HttpHeaders();
        headers.add("Cache-Control", "no-cache, no-store, must-revalidate");
        headers.add("Content-Disposition", "attachment; filename=spark-warehouse.zip");
        headers.add("Pragma", "no-cache");
        headers.add("Expires", "0");
        headers.add("Content-Language", "UTF-8");
        ByteArrayOutputStream zos =
                (ByteArrayOutputStream) hdfsClientService.down2("hdfs://172.16.1.9:8020/spark/spark-warehouse");
        byte[] out = zos.toByteArray();
        zos.close();
        response = new ResponseEntity<>(out, headers, HttpStatus.OK);

        return response;
    }
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.stereotype.Service;
/**
     * 多文件
     * 
     * @param cloudPath
     *            cloudPath
     * @author liudz
     * @date 2020/6/8
     * @return 执行结果
     **/
    public OutputStream down2(String cloudPath) {
   
        // 1获取对象
        ByteArrayOutputStream out = null;
        try {
   
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.16.1.9:8020"), conf);
            out = new ByteArrayOutputStream();
            ZipOutputStream zos = new ZipOutputStream(out);
            compress(cloudPath, zos, fs);
            zos.close();
        } catch (IOException e) {
   
            log.info("----error:{}----" + e.getMessage());
        } catch (URISyntaxException e) {
   
            log.info("----error:{}----" + e.getMessage());
        }
        return out;
    }
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.stereotype.Service;
/**
     * compress
     * 
     * @param baseDir
     *            baseDir
     * @param zipOutputStream
     *            zipOutputStream
     * @param fs
     *            fs
     * @author liudz
     * @date 2020/6/8
     **/
    public void compress(String baseDir, ZipOutputStream zipOutputStream, FileSystem fs) throws IOException {
   

        try {
   
            FileStatus[] fileStatulist = fs.listStatus(new Path(baseDir));
            log.info("basedir = " + baseDir);
            String[] strs = baseDir.split("/");
            //lastName代表路径最后的单词
            String lastName = strs[strs.length - 1];

            for (int i = 0; i < fileStatulist.length; i++) {
   

                String name = fileStatulist[i].getPath().toString();
                name = name.substring(name.indexOf("/" + lastName));

                if (fileStatulist[i].isFile()) {
   
                    Path path = fileStatulist[i].getPath();
                    FSDataInputStream inputStream = fs.open(path);
                    zipOutputStream.putNextEntry(new ZipEntry(name.substring(1)));
                    IOUtils.copyBytes(inputStream, zipOutputStream, Integer.parseInt("1024"));
                    inputStream.close();
                } else {
   
                    zipOutputStream.putNextEntry(new ZipEntry(fileStatulist[i].getPath().getName() + "/"));
                    log.info("fileStatulist[i].getPath().toString() = " + fileStatulist[i].getPath().toString());
                    compress(fileStatulist[i].getPath().toString(), zipOutputStream, fs);
                }
            }
        } catch (IOException e) {
   
            log.info("----error:{}----" + e.getMessage());
        }
    }
目录
相关文章
|
2月前
|
监控 Java API
Java语言按文件创建日期排序及获取最新文件的技术
这段代码实现了文件创建时间的读取、文件列表的获取与排序以及获取最新文件的需求。它具备良好的效率和可读性,对于绝大多数处理文件属性相关的需求来说足够健壮。在实际应用中,根据具体情况,可能还需要进一步处理如访问权限不足、文件系统不支持某些属性等边界情况。
173 14
|
2月前
|
存储 Java 编译器
深入理解Java虚拟机--类文件结构
本内容介绍了Java虚拟机与Class文件的关系及其内部结构。Class文件是一种与语言无关的二进制格式,包含JVM指令集、符号表等信息。无论使用何种语言,只要能生成符合规范的Class文件,即可在JVM上运行。文章详细解析了Class文件的组成,包括魔数、版本号、常量池、访问标志、类索引、字段表、方法表和属性表等,并说明其在Java编译与运行过程中的作用。
|
2月前
|
XML 人工智能 Java
java通过自定义TraceId实现简单的链路追踪
本文介绍了如何在Spring Boot项目中通过SLF4J的MDC实现日志上下文traceId追踪。内容涵盖依赖配置、拦截器实现、网关与服务间调用的traceId传递、多线程环境下的上下文同步,以及logback日志格式配置。适用于小型微服务架构的链路追踪,便于排查复杂调用场景中的问题。
112 0
|
2月前
|
存储 人工智能 Java
java之通过Http下载文件
本文介绍了使用Java实现通过文件链接下载文件到本地的方法,主要涉及URL、HttpURLConnection及输入输出流的操作。
144 0
|
3月前
|
存储 Java 数据安全/隐私保护
Java技术栈揭秘:Base64加密和解密文件的实战案例
以上就是我们今天关于Java实现Base64编码和解码的实战案例介绍。希望能对你有所帮助。还有更多知识等待你去探索和学习,让我们一同努力,继续前行!
300 5
|
3月前
|
网络协议 安全 Java
实现Java语言的文件断点续传功能的技术方案。
像这样,我们就完成了一项看似高科技、实则亲民的小工程。这样的技术实现不仅具备实用性,也能在面对网络不稳定的挑战时,稳稳地、不失乐趣地完成工作。
221 0
|
6月前
|
XML 存储 分布式计算
【赵渝强老师】史上最详细:Hadoop HDFS的体系架构
HDFS(Hadoop分布式文件系统)由三个核心组件构成:NameNode、DataNode和SecondaryNameNode。NameNode负责管理文件系统的命名空间和客户端请求,维护元数据文件fsimage和edits;DataNode存储实际的数据块,默认大小为128MB;SecondaryNameNode定期合并edits日志到fsimage中,但不作为NameNode的热备份。通过这些组件的协同工作,HDFS实现了高效、可靠的大规模数据存储与管理。
588 70
|
11月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
432 6
|
11月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
166 3
|
11月前
|
存储 分布式计算 资源调度
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
240 5

热门文章

最新文章