hadoop之操作window下HDFS API编程(8)

简介: hadoop之操作window下HDFS API编程(8)

代码操作HDFS


前面都是hdfs的命令行概念,概念这东西一旦会了,会手痒,所以我们可以通过java程序代码操作hdfs。


1.准备:window10下配置hadoop环境


配置环境变量


准备hadoop-2.8.4.tar.gz,解压到一个不是c盘的地方,配置环境变量,我的如下

1dc618a0ed9580ce8bfa6facb208c08f.png

5d4c6812c8535adbb050f4ddf2e1bce8.png


拷贝2个文件到HADOOP_HOME/bin下


46a9d80a6e05e4e3b19d57a0ee70bcdf.png

文件地址可以在https://github.com/hufanglei/comon-tool/tree/hadoop这里找下,

另外,一些资料中还说吧 hadoop.dll放在C:\Windows\System32下

66ba272a0bfc97be54a5fa679e3d5482.png


2.pom相关的依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.hfl.tz.bigdata12</groupId>
    <artifactId>hdfs-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.7</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>


3.通过API操作HDFS


1) HDFS获取文件系统


@Test
    public void initHDFS() throws IOException {
        //创建配置信息
        Configuration config = new Configuration();
        //获取文件系统 
        FileSystem fs = FileSystem.get(config);
        //打印文件系统
        System.out.println(fs.toString());
    }


2)HDFS文件上传


@Test
    public void putHDFS() throws IOException, URISyntaxException, InterruptedException {
        //创建配置信息
        Configuration conf = new Configuration();
        //设置部分的临时参数
        conf.set("dfs.replication", "2");
        //获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //要上传的本地文件上传的路径
//        Path input = new Path("F:\\input\\word.txt");
        Path input = new Path("F:\\soft\\hadoop-2.8.4.tar.gz");
        //输出的路径,iput复制到output
        Path output = new Path("hdfs://bigdata121:9000/");
        //以拷贝的方式
        fs.copyFromLocalFile(input, output);
        fs.close();
        System.out.println("上传成功!!");
    }


3)HDFS文件下载


@Test
    public void getHDFS() throws URISyntaxException, IOException, InterruptedException {
        //创建配置信息
        Configuration conf = new Configuration();
        //获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //下载文件
        //boolean delSrc:是否将原文件删除
        //Path src :要下载的路径
        //Path dst :要下载到哪
        //boolean useRawLocalFileSystem :是否校验文件
        fs.copyToLocalFile(false,
                new Path("hdfs://bigdata121:9000/word.txt"),
                new Path("F:\\output"),
                true);
        fs.close();
        System.out.println("下载成功!!");
    }


4)HDFS目录创建


@Test
    public void mkmdirHDFS() throws Exception {
        //1.创新配置信息对象
        Configuration configuration = new Configuration();
        //2.链接文件系统
        //final URI uri  地址
        //final Configuration conf  配置
        //String user   Linux用户
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), configuration, "root");
        //3.创建目录
        fs.mkdirs(new Path("hdfs://bigdata121:9000/idea/aa"));
        //4.关闭
        fs.close();
        System.out.println("创建文件夹成功");
    }


5)HDFS文件夹删除


@Test
    public void deleteHDFS() throws Exception {
        //1.创建配置对象
        Configuration conf = new Configuration();
        //2.链接文件系统
        //final URI uri, final Configuration conf, String user
        //final URI uri  地址
        //final Configuration conf  配置
        //String user   Linux用户
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //Path var1   : HDFS地址
        //boolean var2 : 是否递归删除
        fs.delete(new Path("hdfs://bigdata121:9000/idea/bb"), true);
        fs.close();
        System.out.println("删除成功啦");
    }


6)HDFS文件名更改


@Test
    public void renameAtHDFS() throws Exception {
        // 1 创建配置信息对象
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), configuration, "root");
        //2 重命名文件或文件夹
        fs.rename(new Path("hdfs://bigdata121:9000/idea/aa"), new Path("hdfs://bigdata121:9000/idea/cc"));
        fs.close();
        System.out.println("重命名成功啦");
    }


7)HDFS文件详情查看


@Test
    public void readListFiles() throws Exception {
        //1.创建配置对象
        Configuration conf = new Configuration();
        //2.链接文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.迭代器
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
        //4.遍历迭代器
        while (listFiles.hasNext()) {
            //一个一个出
            LocatedFileStatus fileStatus = listFiles.next();
            //名字
            System.out.println("文件名:" + fileStatus.getPath().getName());
            //块大小
            System.out.println("大小:" + fileStatus.getBlockSize());
            //权限
            System.out.println("权限:" + fileStatus.getPermission());
            System.out.println(fileStatus.getLen());
            BlockLocation[] locations = fileStatus.getBlockLocations();
            for (BlockLocation bl : locations) {
                System.out.println("block-offset:" + bl.getOffset());
                String[] hosts = bl.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
            System.out.println("------------------华丽的分割线----------------");
        }
  }


8)HDFS文件和文件夹判断


@Test
    public void judge() throws Exception {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.遍历所有的文件
        //List the statuses of the files/directories in the given path if the path is a directory.
        FileStatus[] liststatus = fs.listStatus(new Path("/"));
        for(FileStatus status :liststatus)
        {
            //判断是否是文件
            if (status.isFile()){
                //ctrl + d:复制一行
                //ctrl + x 是剪切一行,可以用来当作是删除一行
                System.out.println("文件:" + status.getPath().getName());
            } else {
                System.out.println("目录:" + status.getPath().getName());
            }
        }
    }


4.通过IO流操作HDFS


1)HDFS文件上传


@Test
    public void putFileToHDFSIO() throws URISyntaxException, IOException, InterruptedException {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.创建输入流
        FileInputStream fis = new FileInputStream(new File("F:\\input\\hello.txt"));
        //4.输出路径
        //注意:不能/Andy  记得后边写个名 比如:/Andy/Sogou.txt
        Path writePath = new Path("hdfs://bigdata121:9000/hfl/Sogou.txt");
        FSDataOutputStream fos = fs.create(writePath);
        //5.流对接
        //InputStream in    输入
        //OutputStream out  输出
        //int buffSize      缓冲区
        //boolean close     是否关闭流
        try {
            IOUtils.copyBytes(fis,fos,4 * 1024,false);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            IOUtils.closeStream(fos);
            IOUtils.closeStream(fis);
            System.out.println("上传成功啦");
        }
    }


2)HDFS文件下载( IO读取HDFS到控制台)


@Test
    public void getFileToHDFSIO() throws URISyntaxException, IOException, InterruptedException {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.读取路径
        Path readPath = new Path("hdfs://bigdata121:9000/hfl/Sogou.txt");
        //4.输入
        FSDataInputStream fis = fs.open(readPath);
        //5.输出到控制台
        //InputStream in    输入
        //OutputStream out  输出
        //int buffSize      缓冲区
        //boolean close     是否关闭流
        IOUtils.copyBytes(fis,System.out,4 * 1024 ,true);
    }


3)定位文件读取


①IO读取第一块的内容


@Test
    public void  readFlieSeek1() throws Exception {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.输入
        Path path = new Path("hdfs://bigdata121:9000/hadoop-2.8.4.tar.gz");
        FSDataInputStream fis = fs.open(path);
        //4.输出
        FileOutputStream fos = new FileOutputStream("F:\\output\\readFileSeek\\A1");
        //5.流对接
        byte[] buf = new byte[1024];
        for (int i = 0; i < 128 * 1024; i++) {
            fis.read(buf);
            fos.write(buf);
        }
        //6.关闭流
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
    }


②IO读取第二块的内容


@Test
    public void readFlieSeek2() throws Exception {
        //1.创建配置文件信息
        Configuration conf = new Configuration();
        //2.获取文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf, "root");
        //3.输入
        Path path = new Path("hdfs://bigdata121:9000/hadoop-2.8.4.tar.gz");
        FSDataInputStream fis = fs.open(path);
        //4.输出
        FileOutputStream fos = new FileOutputStream("F:\\output\\readFileSeek\\A2");
        //5.定位偏移量/offset/游标/读取进度 (目的:找到第一块的尾巴,第二块的开头)
        fis.seek(128 * 1024 * 1024);
        //6.流对接
        IOUtils.copyBytes(fis, fos, 1024);
        //7.关闭流
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
    }


5.源码地址:


https://github.com/hufanglei/daily-code/blob/bigdata12-hdfs-api/src/test/java/com/hfl/hdfs/HdfsApi.java



相关文章
|
1月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
158 6
|
1月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
62 3
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
44 4
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
46 2
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
87 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
39 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
48 0
|
1月前
|
存储 分布式计算 资源调度
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
76 5
|
1月前
|
资源调度 数据可视化 大数据
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(二)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(二)
36 4
|
1月前
|
XML 分布式计算 资源调度
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
155 5