HDFS的API调用,创建Maven工程,创建一个非Maven工程,HDFS客户端操作数据代码示例,文件方式操作和流式操作

简介: 1. HDFS的java操作hdfs在生产应用中主要是客户端的开发,其核心步骤是从hdfs提供的api中构造一个HDFS的访问客户端对象,然后通过该客户端对象操作(增删改查)HDFS上的文件1.1 搭建开发环境                   1.1.1创建Maven工程快速创建一个Maven工程和目录结构的方式是执行下面的命令:mvn archetype:generate

1. HDFS的java操作

hdfs在生产应用中主要是客户端的开发,其核心步骤是从hdfs提供的api中构造一个HDFS的访问客户端对象,然后通过该客户端对象操作(增删改查)HDFS上的文件

1.1 搭建开发环境                  

1.1.1创建Maven工程

快速创建一个Maven工程和目录结构的方式是执行下面的命令:

mvn archetype:generate -DgroupId=com.toto.hadoop -DartifactId=hadoop -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

接着执行下面的命令:

mvn -Pall eclipse:eclipse

mvn clean

mvn compile -Dmaven.test.skip=true

mvn install -Dmaven.test.skip=true

mvn package -Dmaven.test.skip=true

最后将工程导入到Eclipse中:

 

1、配置pom文件,引入下面的jar之后,其它相关的maven依赖的jar包,它会自动引入:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.toto.hadoop</groupId>

  <artifactId>hadoop</artifactId>

  <packaging>jar</packaging>

  <version>1.0-SNAPSHOT</version>

  <name>hadoop</name>

  <url>http://maven.apache.org</url>

  <dependencies>

        <dependency> 

            <groupId>org.apache.hadoop</groupId> 

            <artifactId>hadoop-common</artifactId> 

            <version>2.5.1</version> 

        </dependency> 

        <dependency> 

            <groupId>org.apache.hadoop</groupId> 

            <artifactId>hadoop-hdfs</artifactId> 

            <version>2.5.1</version> 

        </dependency> 

        <dependency> 

            <groupId>org.apache.hadoop</groupId> 

            <artifactId>hadoop-client</artifactId> 

            <version>2.5.1</version> 

        </dependency>

       

        <dependency>

           <groupId>junit</groupId>

           <artifactId>junit</artifactId>

           <version>4.12</version>

           <scope>test</scope>

       </dependency>

  </dependencies>

</project>

 

1.1.2创建一个非Maven工程

注:如需手动引入jar包,hdfs的jar包----hadoop的安装目录的share下,他们分别是:

hadoop-2.8.0\share\hadoop\common\hadoop-common-2.8.0.jar

hadoop-2.8.0\share\hadoop\common\lib

hadoop-2.8.0\share\hadoop\hdfs\hadoop-hdfs-2.8.0.jar

hadoop-2.8.0\share\hadoop\hdfs\lib

如果是创建非Maven工程,步骤如下:

第一步:创建一个User Library

第二步:添加library


第三步:添加common中的所有的jar


第四步:添加hadoop-hdfs-2.8.0.jar


第五步:添加hdfs依赖的jar


最后到工程里面查看引包情况


接着上面的配置,可以进行hdfs客户端调用的代码的编写了。

 

2、window下开发的说明

建议在linux下进行hadoop应用的开发,不会存在兼容性问题。如在window上做客户端应用开发,需要设置以下环境:

A、用老师给的windows平台下编译的hadoop安装包解压一份到windows的任意一个目录下

B、在window系统中配置HADOOP_HOME指向你解压的安装包目录

C、在windows系统的path变量中加入HADOOP_HOME的bin目录


1.2 获取api中的客户端对象

在java中操作hdfs,首先要获得一个客户端实例

Configuration conf = new Configuration()

FileSystem fs = FileSystem.get(conf)

 

而我们的操作目标是HDFS,所以获取到的fs对象应该是DistributedFileSystem的实例;

get方法是从何处判断具体实例化那种客户端类呢?

——从conf中的一个参数 fs.defaultFS的配置值判断;

 

如果我们的代码中没有指定fs.defaultFS,并且工程classpath下也没有给定相应的配置,conf中的默认值就来自于hadoop的jar包中的core-default.xml,默认值为: file:///,则获取的将不是一个DistributedFileSystem的实例,而是一个本地文件系统的客户端对象

 

1.3 DistributedFileSystem实例对象所具备的方法

1.4 HDFS客户端操作数据代码示例:

1.4.1 文件的增删改查

package com.toto.hadooptest;

 

import java.net.URI;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.BlockLocation;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.LocatedFileStatus;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.fs.RemoteIterator;

import org.junit.Before;

import org.junit.Test;

 

public class HdfsClient {

 

    FileSystem fs = null;

   

    @Before

    public void init() throws Exception {

        // 构造一个配置参数对象,设置一个参数:我们要访问的hdfsURI

        // 从而FileSystem.get()方法就知道应该是去构造一个访问hdfs文件系统的客户端,以及hdfs的访问地址

        // new Configuration();的时候,它就会去加载jar包中的hdfs-default.xml

        // 然后再加载classpath下的hdfs-site.xml

        Configuration conf = new Configuration();

        conf.set("df.defaultFS", "hdfs://hadoop:9000");

        /**

         * 参数优先级:1、客户端代码中设置的值 2classpath下的用户自定义配置文件 3、然后是服务器的默认配置

         */

        conf.set("dfs.replication", "3");

       

        // 获取一个hdfs的访问客户端,根据参数,这个实例应该是DistributedFileSystem的实例

        // fs = FileSystem.get(conf);

       

        //如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份识别已经是hadoop用户

        //注意,最后的toto表示的是hadoop集群安装的Linux用户

        fs = FileSystem.get(new URI("hdfs://hadoop:9000"),conf,"toto");

    }

   

    /**

     * \brief hdfs上传文件

     *

     * 执行完成之后,进入Linux,执行一下命令,发现会出现以下列表

     * [toto@hadoop software]$ hadoop fs -ls /toto2

       Found 1 items

       -rw-r--r--   3 toto supergroup    2286088 2017-05-30 13:25 /toto2/ik-analyzer-solr5-master.zip

     */

    @Test

    public void testAddFileToHdfs() {

        try {

            //要上传的所在的本地路径

            Path src = new Path("d:/ik-analyzer-solr5-master.zip");

            //要上传的hdfs的目标路径

            Path dst = new Path("/toto2");

            fs.copyFromLocalFile(src, dst);

            fs.close();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

   

    /**

     * \brief hdfs中复制文件到本地文件系统

     * @attention执行完成之后,进入E:/learnTempFolder这个文件夹,发现在改文件夹下已经有了自己拷贝下来的文件了

     * @author toto

     * @date 2017530

     * @note  begin modify by 涂作权 2017530   原始创建

     */

    @Test

    public void testDownLoadFileToLocal() {

        try {

            fs.copyToLocalFile(new Path("/toto2/findbugs-1.3.9"), new Path("E:/learnTempFolder"));

            fs.close();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

   

    /**

     * 创建目录,删除目录,重命名

     * 执行之前hadoop的文件系统的目录结构如下:

     * [toto@hadoop sbin]$ hadoop fs -ls /

        Found 7 items

        drwxr-xr-x   - toto supergroup          0 2017-05-30 14:06 /aaa

        drwxr-xr-x   - toto supergroup          0 2017-05-29 14:01 /findbugs-1.3.9

        drwxr-xr-x   - toto supergroup          0 2017-05-29 03:23 /hive

        drwx------   - toto supergroup          0 2017-05-29 14:47 /tmp

        drwxr-xr-x   - toto supergroup          0 2017-05-29 23:59 /toto

        drwxr-xr-x   - toto supergroup          0 2017-05-30 13:25 /toto2

        drwxr-xr-x   - toto supergroup          0 2017-05-30 00:18 /user

        [toto@hadoop sbin]$

       

        执行之后的效果:

        [toto@hadoop sbin]$ hadoop fs -ls /

        Found 7 items

        drwxr-xr-x   - toto supergroup          0 2017-05-30 14:11 /a2

        drwxr-xr-x   - toto supergroup          0 2017-05-29 14:01 /findbugs-1.3.9

        drwxr-xr-x   - toto supergroup          0 2017-05-29 03:23 /hive

        drwx------   - toto supergroup          0 2017-05-29 14:47 /tmp

        drwxr-xr-x   - toto supergroup          0 2017-05-29 23:59 /toto

        drwxr-xr-x   - toto supergroup          0 2017-05-30 13:25 /toto2

        drwxr-xr-x   - toto supergroup          0 2017-05-30 00:18 /user

       

        总结:

        1、发现/aaa文件夹被删除了

        2、发现出现了/a2,但是最开始没有,说明是创建的a1,然后又被改成的a2

     */

    @Test

    public void testMkdirDeleteAndRename() {

        try {

            //创建目录

            fs.mkdirs(new Path("/a1/b1/c1"));

           

            //删除文件夹,如果是非空文件夹,参数2必须给值true

            fs.delete(new Path("/aaa"),true);

           

            //重命名文件或文件夹

            fs.rename(new Path("/a1"), new Path("/a2"));

        } catch(Exception e) {

            e.printStackTrace();

        }

    }

   

    /**

     * \brief 查看目录信息,只显示文件 

     */

    @Test

    public void testListFiles() {

        try {

            //思考:为什么返回迭代器,而不是List之类的容器,这里的只有调用hasNext()的时候,才会返回实际的数据信息

            RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

           

            while(listFiles.hasNext()) {

                LocatedFileStatus fileStatus = listFiles.next();

                System.out.println(fileStatus.getPath().getName());

                System.out.println(fileStatus.getBlockSize());

                System.out.println(fileStatus.getPermission());

                System.out.println(fileStatus.getLen());

                BlockLocation[] blockLocations = fileStatus.getBlockLocations();

                for(BlockLocation b1 : blockLocations) {

                    System.out.println("block-length:" + b1.getLength() + "--" + "block-offset:" + b1.getOffset());

                    String[] hosts = b1.getHosts();

                    for(String host : hosts) {

                        System.out.println(host);

                    }

                }

                System.out.println("-----------------------------------");

            }

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

   

    /**

     * 查看文件及文件夹信息

     *

     * 显示的内容如下:

     * d--             a2

     * d--             findbugs-1.3.9

     * d--             hive

     * d--             tmp

     * d--             toto

     * d--             toto2

     * d--             user

     */

    @Test

    public void testListAll() {

        try {

            FileStatus[] listStatus = fs.listStatus(new Path("/"));

           

            String flag = "d--             ";

            for(FileStatus fstatus : listStatus) {

                if (fstatus.isFile())  flag = "f--         ";

                System.out.println(flag + fstatus.getPath().getName());

            }

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

 

1.4.2 通过流的方式访问hdfs

package com.toto.hadooptest;

 

import java.io.File;

import java.io.FileInputStream;

import java.io.FileOutputStream;

import java.net.URI;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.BlockLocation;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.junit.Before;

import org.junit.Test;

 

/**

 * 相对那些封装好的方法而言的更底层一些的操作方式

 * 上层那些mapreduce,spark等运算框架,去hdfs中获取数据的时候,就是调用这种底层的api

 */

public class StreamAccess {

    FileSystem fs = null;

 

    @Before

    public void init() throws Exception {

 

       Configuration conf = new Configuration();

       fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "toto");

 

    }

   

       /**

     * 通过流的方式上传文件到hdfs

     * @throws Exception

     */

    @Test

    public void testUpload() throws Exception {

       FSDataOutputStream outputStream = fs.create(new Path("/README.md"), true);

       FileInputStream inputStream = new FileInputStream("E:/learnTempFolder/dubbo-master/README.md");

      

       //注意这里的IOUtilsorg.apache.hadoop.io.IOUtils中的包,否则创建不成功

       IOUtils.copyBytes(inputStream, outputStream, 4096);

    }

 

    /**

     * hdfs上下载资源文件到本地

     */

    @Test

    public void testDownLoadFileToLocal() {

       try{

           //先获取一个文件的输入流---针对hdfs上的

           FSDataInputStream in = fs.open(new Path("/hive/apache-hive-2.0.0-bin.tar.gz"));

          

           //再构造一个文件的输出流---针对本地的

           FileOutputStream out = new FileOutputStream(new File("E:/apache-hive-2.0.0-bin.tar.gz"));

          

           //再将输入流中数据传输到输出流

           IOUtils.copyBytes(in, out, 4096);

       } catch (Exception e) {

           e.printStackTrace();

       }

    }

   

    /**

     * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度

     * 用于上层分布式运算框架并发处理数据

     */

    @Test

    public void testRandomAccess() {

       try {

           //先获取一个文件的输入流---针对hdfs上的

           FSDataInputStream in = fs.open(new Path("/pom.xml"));

          

           //可以将流的起始偏移量进行自定义

           in.seek(200);

          

           //在构造一个文件的输出流----针对本地的

           FileOutputStream out = new FileOutputStream(new File("E:/pom.xml"));

          

           //第三个参数的意思是获取长度为3000的文本内容

           IOUtils.copyBytes(in, out,3000L, true);

       } catch (Exception e) {

           e.printStackTrace();

       }

    }

   

    /**

     * 显示hdfs上文件的内容

     */

    @Test

    public void testCat() {

       try {

           FSDataInputStream in = fs.open(new Path("/pom.xml"));

          

           IOUtils.copyBytes(in, System.out, 1024);

       } catch (Exception e) {

           e.printStackTrace();

       }

    }

   

}

 

1.4.3 场景编程

在mapreduce 、spark等运算框架中,有一个核心思想就是将运算移往数据,或者说,就是要在并发计算中尽可能让运算本地化,这就需要获取数据所在位置的信息并进行相应范围读取以下模拟实现:获取一个文件的所有block位置信息,然后读取指定block中的内容

         @Test

    public void testCat2() {

       try {

           FSDataInputStream in = fs.open(new Path("/toto2/findbugs-1.3.9"));

           //拿到文件信息

           FileStatus[] listStatus = fs.listStatus(new Path("/toto2/findbugs-1.3.9"));

           //获取这个文件的所有的block的信息

           BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0, listStatus[0].getLen());

           //第一个block的长度

           long length = fileBlockLocations[0].getLength();

           //第一个block的起始偏移量

           long offset = fileBlockLocations[0].getOffset();

          

           System.out.println(length);

           System.out.println(offset);

          

           //获取第一个block写入输出流

           IOUtils.copyBytes(in, System.out, (int)length);

           byte[] b = new byte[4096];

          

           @SuppressWarnings("resource")

           FileOutputStream os = new FileOutputStream(new File("e:/block"));

           while(in.read(offset,b,0,4096) != -1) {

              os.write(b);

              offset += 4096;

              if (offset >= length) return;

           }

          

           os.flush();

           os.close();

           in.close();

       }  catch (Exception e) {

           e.printStackTrace();

       }

    }

 

 

 

 

目录
相关文章
|
3月前
|
JSON 安全 Java
什么是用于REST API的JWT Bearer令牌以及如何通过代码和工具进行调试
在Web开发中,保护REST API至关重要,而JSON Web令牌(JWT)特别是JWT Bearer令牌,是一种高效方法。它通过紧凑、自包含的结构实现安全信息交换,提升用户体验。本文探讨JWT Bearer的基本概念、结构与实现,包括在Java中的应用步骤,以及使用Apipost和cURL进行测试的方法。JWT优势明显:无状态、互操作性强,适用于分布式系统。掌握JWT Bearer,可助开发者构建更安全、高效的API解决方案。
|
3月前
|
人工智能 JSON API
0代码将存量 API 适配 MCP 协议
本文主要讲述通过 Nacos+Higress 的方案实现0代码改造将 Agent 连接到存量应用,能够显著降低存量应用的改造成本。
695 44
0代码将存量 API 适配 MCP 协议
|
3月前
|
安全 Java API
什么是用于 REST API 的 Bearer Token以及如何通过代码和工具进行调试
Bearer Token 是一种基于 OAuth 2.0 的身份验证机制,广泛应用于 REST API 的授权访问中。它通过在 HTTP 请求头中传递令牌,确保用户凭据安全传输并验证。本文深入解析了 Bearer Token 的概念、实现步骤及调试方法,包括其无状态特性、灵活性与安全性优势。同时,提供了 Java 实现示例和使用 Apipost、cURL 等工具测试的实践指导,帮助开发者掌握这一核心技能,保障 API 系统的安全与高效运行。
|
4月前
|
JavaScript 前端开发 API
JavaScript中通过array.map()实现数据转换、创建派生数组、异步数据流处理、复杂API请求、DOM操作、搜索和过滤等,array.map()的使用详解(附实际应用代码)
array.map()可以用来数据转换、创建派生数组、应用函数、链式调用、异步数据流处理、复杂API请求梳理、提供DOM操作、用来搜索和过滤等,比for好用太多了,主要是写法简单,并且非常直观,并且能提升代码的可读性,也就提升了Long Term代码的可维护性。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
5月前
|
数据采集 供应链 API
实战指南:通过1688开放平台API获取商品详情数据(附Python代码及避坑指南)
1688作为国内最大的B2B供应链平台,其API为企业提供合法合规的JSON数据源,直接获取批发价、SKU库存等核心数据。相比爬虫方案,官方API避免了反爬严格、数据缺失和法律风险等问题。企业接入1688商品API需完成资质认证、创建应用、签名机制解析及调用接口四步。应用场景包括智能采购系统、供应商评估模型和跨境选品分析。提供高频问题解决方案及安全合规实践,确保数据安全与合法使用。立即访问1688开放平台,解锁B2B数据宝藏!
|
7月前
|
Java
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
222 34
|
6月前
|
JSON Java 数据挖掘
利用 Java 代码获取淘宝关键字 API 接口
在数字化商业时代,精准把握市场动态与消费者需求是企业成功的关键。淘宝作为中国最大的电商平台之一,其海量数据中蕴含丰富的商业洞察。本文介绍如何通过Java代码高效、合规地获取淘宝关键字API接口数据,帮助商家优化产品布局、制定营销策略。主要内容包括: 1. **淘宝关键字API的价值**:洞察用户需求、优化产品标题与详情、制定营销策略。 2. **获取API接口的步骤**:注册账号、申请权限、搭建Java开发环境、编写调用代码、解析响应数据。 3. **注意事项**:遵守法律法规与平台规则,处理API调用限制。 通过这些步骤,商家可以在激烈的市场竞争中脱颖而出。
|
7月前
|
JavaScript API C#
【Azure Developer】Python代码调用Graph API将外部用户添加到组,结果无效,也无错误信息
根据Graph API文档,在单个请求中将多个成员添加到组时,Python代码示例中的`members@odata.bind`被错误写为`members@odata_bind`,导致用户未成功添加。
93 10
|
7月前
|
JSON API 数据安全/隐私保护
淘宝评论API接口操作步骤详解,代码示例参考
淘宝评论API接口是淘宝开放平台提供的一项服务,通过该接口,开发者可以访问商品的用户评价和评论。这些评论通常包括评分、文字描述、图片或视频等内容。商家可以利用这些信息更好地了解消费者的需求和偏好,优化产品和服务。同时,消费者也可以从这些评论中获得准确的购买参考,做出更明智的购买决策。
|
7月前
|
API Python
【Azure Developer】分享一段Python代码调用Graph API创建用户的示例
分享一段Python代码调用Graph API创建用户的示例
101 11

热门文章

最新文章