hadoop之操作window下HDFS API编程(8)

简介: hadoop之操作window下HDFS API编程(8)

代码操作HDFS


前面都是hdfs的命令行概念,概念这东西一旦会了,会手痒,所以我们可以通过java程序代码操作hdfs。


1.准备:window10下配置hadoop环境


配置环境变量


准备hadoop-2.8.4.tar.gz,解压到一个不是c盘的地方,配置环境变量,我的如下

1dc618a0ed9580ce8bfa6facb208c08f.png

5d4c6812c8535adbb050f4ddf2e1bce8.png


拷贝2个文件到HADOOP_HOME/bin下


46a9d80a6e05e4e3b19d57a0ee70bcdf.png

文件地址可以在https://github.com/hufanglei/comon-tool/tree/hadoop这里找下,

另外,一些资料中还说吧 hadoop.dll放在C:\Windows\System32下

66ba272a0bfc97be54a5fa679e3d5482.png


2.pom相关的依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.hfl.tz.bigdata12</groupId>
    <artifactId>hdfs-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.7</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>


3.通过API操作HDFS


1) HDFS获取文件系统


@Test
    public void initHDFS() throws IOException {
        //创建配置信息
        Configuration config = new Configuration();
        //获取文件系统 
        FileSystem fs = FileSystem.get(config);
        //打印文件系统
        System.out.println(fs.toString());
    }


2)HDFS文件上传


@Test
    public void putHDFS() throws IOException, URISyntaxException, InterruptedException {
        //创建配置信息
        Configuration conf = new Configuration();
        //设置部分的临时参数
        conf.set("dfs.replication", "2");
        //获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //要上传的本地文件上传的路径
//        Path input = new Path("F:\\input\\word.txt");
        Path input = new Path("F:\\soft\\hadoop-2.8.4.tar.gz");
        //输出的路径,iput复制到output
        Path output = new Path("hdfs://bigdata121:9000/");
        //以拷贝的方式
        fs.copyFromLocalFile(input, output);
        fs.close();
        System.out.println("上传成功!!");
    }


3)HDFS文件下载


@Test
    public void getHDFS() throws URISyntaxException, IOException, InterruptedException {
        //创建配置信息
        Configuration conf = new Configuration();
        //获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //下载文件
        //boolean delSrc:是否将原文件删除
        //Path src :要下载的路径
        //Path dst :要下载到哪
        //boolean useRawLocalFileSystem :是否校验文件
        fs.copyToLocalFile(false,
                new Path("hdfs://bigdata121:9000/word.txt"),
                new Path("F:\\output"),
                true);
        fs.close();
        System.out.println("下载成功!!");
    }


4)HDFS目录创建


@Test
    public void mkmdirHDFS() throws Exception {
        //1.创新配置信息对象
        Configuration configuration = new Configuration();
        //2.链接文件系统
        //final URI uri  地址
        //final Configuration conf  配置
        //String user   Linux用户
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), configuration, "root");
        //3.创建目录
        fs.mkdirs(new Path("hdfs://bigdata121:9000/idea/aa"));
        //4.关闭
        fs.close();
        System.out.println("创建文件夹成功");
    }


5)HDFS文件夹删除


@Test
    public void deleteHDFS() throws Exception {
        //1.创建配置对象
        Configuration conf = new Configuration();
        //2.链接文件系统
        //final URI uri, final Configuration conf, String user
        //final URI uri  地址
        //final Configuration conf  配置
        //String user   Linux用户
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //Path var1   : HDFS地址
        //boolean var2 : 是否递归删除
        fs.delete(new Path("hdfs://bigdata121:9000/idea/bb"), true);
        fs.close();
        System.out.println("删除成功啦");
    }


6)HDFS文件名更改


@Test
    public void renameAtHDFS() throws Exception {
        // 1 创建配置信息对象
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), configuration, "root");
        //2 重命名文件或文件夹
        fs.rename(new Path("hdfs://bigdata121:9000/idea/aa"), new Path("hdfs://bigdata121:9000/idea/cc"));
        fs.close();
        System.out.println("重命名成功啦");
    }


7)HDFS文件详情查看


@Test
    public void readListFiles() throws Exception {
        //1.创建配置对象
        Configuration conf = new Configuration();
        //2.链接文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.迭代器
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
        //4.遍历迭代器
        while (listFiles.hasNext()) {
            //一个一个出
            LocatedFileStatus fileStatus = listFiles.next();
            //名字
            System.out.println("文件名:" + fileStatus.getPath().getName());
            //块大小
            System.out.println("大小:" + fileStatus.getBlockSize());
            //权限
            System.out.println("权限:" + fileStatus.getPermission());
            System.out.println(fileStatus.getLen());
            BlockLocation[] locations = fileStatus.getBlockLocations();
            for (BlockLocation bl : locations) {
                System.out.println("block-offset:" + bl.getOffset());
                String[] hosts = bl.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
            System.out.println("------------------华丽的分割线----------------");
        }
  }


8)HDFS文件和文件夹判断


@Test
    public void judge() throws Exception {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.遍历所有的文件
        //List the statuses of the files/directories in the given path if the path is a directory.
        FileStatus[] liststatus = fs.listStatus(new Path("/"));
        for(FileStatus status :liststatus)
        {
            //判断是否是文件
            if (status.isFile()){
                //ctrl + d:复制一行
                //ctrl + x 是剪切一行,可以用来当作是删除一行
                System.out.println("文件:" + status.getPath().getName());
            } else {
                System.out.println("目录:" + status.getPath().getName());
            }
        }
    }


4.通过IO流操作HDFS


1)HDFS文件上传


@Test
    public void putFileToHDFSIO() throws URISyntaxException, IOException, InterruptedException {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.创建输入流
        FileInputStream fis = new FileInputStream(new File("F:\\input\\hello.txt"));
        //4.输出路径
        //注意:不能/Andy  记得后边写个名 比如:/Andy/Sogou.txt
        Path writePath = new Path("hdfs://bigdata121:9000/hfl/Sogou.txt");
        FSDataOutputStream fos = fs.create(writePath);
        //5.流对接
        //InputStream in    输入
        //OutputStream out  输出
        //int buffSize      缓冲区
        //boolean close     是否关闭流
        try {
            IOUtils.copyBytes(fis,fos,4 * 1024,false);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            IOUtils.closeStream(fos);
            IOUtils.closeStream(fis);
            System.out.println("上传成功啦");
        }
    }


2)HDFS文件下载( IO读取HDFS到控制台)


@Test
    public void getFileToHDFSIO() throws URISyntaxException, IOException, InterruptedException {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.读取路径
        Path readPath = new Path("hdfs://bigdata121:9000/hfl/Sogou.txt");
        //4.输入
        FSDataInputStream fis = fs.open(readPath);
        //5.输出到控制台
        //InputStream in    输入
        //OutputStream out  输出
        //int buffSize      缓冲区
        //boolean close     是否关闭流
        IOUtils.copyBytes(fis,System.out,4 * 1024 ,true);
    }


3)定位文件读取


①IO读取第一块的内容


@Test
    public void  readFlieSeek1() throws Exception {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.输入
        Path path = new Path("hdfs://bigdata121:9000/hadoop-2.8.4.tar.gz");
        FSDataInputStream fis = fs.open(path);
        //4.输出
        FileOutputStream fos = new FileOutputStream("F:\\output\\readFileSeek\\A1");
        //5.流对接
        byte[] buf = new byte[1024];
        for (int i = 0; i < 128 * 1024; i++) {
            fis.read(buf);
            fos.write(buf);
        }
        //6.关闭流
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
    }


②IO读取第二块的内容


@Test
    public void readFlieSeek2() throws Exception {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.输入
        Path path = new Path("hdfs://bigdata121:9000/hadoop-2.8.4.tar.gz");
        FSDataInputStream fis = fs.open(path);
        //4.输出
        FileOutputStream fos = new FileOutputStream("F:\\output\\readFileSeek\\A2");
        //5.定位偏移量/offset/游标/读取进度 (目的:找到第一块的尾巴,第二块的开头)
        fis.seek(128 * 1024 * 1024);
        //6.流对接
        IOUtils.copyBytes(fis, fos, 1024);
        //7.关闭流
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
    }


5.源码地址:


https://github.com/hufanglei/daily-code/blob/bigdata12-hdfs-api/src/test/java/com/hfl/hdfs/HdfsApi.java



相关文章
|
3月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
206 6
|
3月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
75 3
|
3月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
61 4
|
3月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
56 2
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
127 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
59 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
74 0
|
3月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
91 2
|
18天前
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
54 4
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
138 2