16 HDFS的java操作

简介: 16 HDFS的java操作

hdfs 在生产应用中主要是客户端的开发,其核心步骤是从hdfs提供的api中构造一个HDFS的访问客户端对象,然后通过该客户端对象操作(增删改查)HDFS上的文件。

1.搭建开发环境

1.引入依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.1</version>
</dependency>

2、window下开发的说明

建议在linux下进行hadoop应用的开发,不会存在兼容性问题。如在window上做客户端应用开发,需要设置以下环境:

1. 在windows的某个目录下解压一个hadoop的安装包
2. 将安装包下的lib和bin目录用对应windows版本平台编译的本地库替换
3. 在window系统中配置HADOOP_HOME指向你解压的安装包
4. 在windows系统的path变量中加入hadoop的bin目录
2.获取api中的客户端对象

在java中操作hdfs,首先要获得一个客户端实例

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);

而我们的操作目标是HDFS,所以获取到的fs对象应该是DistributedFileSystem的实例;

get方法是从何处判断具体实例化那种客户端类呢?

  • conf中的一个参数 fs.defaultFS的配置值判断;

如果我们的代码中没有指定 fs.defaultFS ,并且工程classpath下也没有给定相应的配置,conf中的默认值就来自于hadoop的jar包中的core-default.xml,默认值为: file:///,则获取的将不是一个DistributedFileSystem的实例,而是一个本地文件系统的客户端对象。

3.DistributedFileSystem实例对象所具备的方法

4.HDFS客户端操作数据代码示例

1.文件的增删改查:

public class HdfsClient {
  FileSystem fs = null;
  @Before
  public void init() throws Exception {
    // 构造一个配置参数对象,设置一个参数:我们要访问的hdfs的URI
    // 从而FileSystem.get()方法就知道应该是去构造一个访问hdfs文件系统的客户端,以及hdfs的访问地址
    // new Configuration();的时候,它就会去加载jar包中的hdfs-default.xml
    // 然后再加载classpath下的hdfs-site.xml
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
    /**
     * 参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置
     */
    conf.set("dfs.replication", "3");
    // 获取一个hdfs的访问客户端,根据参数,这个实例应该是DistributedFileSystem的实例
    // fs = FileSystem.get(conf);
    // 如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是hadoop用户
    fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
  }
  /**
   * 往hdfs上传文件
   * 
   * @throws Exception
   */
  @Test
  public void testAddFileToHdfs() throws Exception {
    // 要上传的文件所在的本地路径
    Path src = new Path("g:/redis-recommend.zip");
    // 要上传到hdfs的目标路径
    Path dst = new Path("/aaa");
    fs.copyFromLocalFile(src, dst);
    fs.close();
  }
  /**
   * 从hdfs中复制文件到本地文件系统
   * 
   * @throws IOException
   * @throws IllegalArgumentException
   */
  @Test
  public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
    fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
    fs.close();
  }
  @Test
  public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
    // 创建目录
    fs.mkdirs(new Path("/a1/b1/c1"));
    // 删除文件夹 ,如果是非空文件夹,参数2必须给值true
    fs.delete(new Path("/aaa"), true);
    // 重命名文件或文件夹
    fs.rename(new Path("/a1"), new Path("/a2"));
  }
  /**
   * 查看目录信息,只显示文件
   * 
   * @throws IOException
   * @throws IllegalArgumentException
   * @throws FileNotFoundException
   */
  @Test
  public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
    // 思考:为什么返回迭代器,而不是List之类的容器
    RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
    while (listFiles.hasNext()) {
      LocatedFileStatus fileStatus = listFiles.next();
      System.out.println(fileStatus.getPath().getName());
      System.out.println(fileStatus.getBlockSize());
      System.out.println(fileStatus.getPermission());
      System.out.println(fileStatus.getLen());
      BlockLocation[] blockLocations = fileStatus.getBlockLocations();
      for (BlockLocation bl : blockLocations) {
        System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
        String[] hosts = bl.getHosts();
        for (String host : hosts) {
          System.out.println(host);
        }
      }
      System.out.println("--------------为angelababy打印的分割线--------------");
    }
  }
  /**
   * 查看文件及文件夹信息
   * 
   * @throws IOException
   * @throws IllegalArgumentException
   * @throws FileNotFoundException
   */
  @Test
  public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
    FileStatus[] listStatus = fs.listStatus(new Path("/"));
    String flag = "d--             ";
    for (FileStatus fstatus : listStatus) {
      if (fstatus.isFile())  flag = "f--         ";
      System.out.println(flag + fstatus.getPath().getName());
    }
  }
}

2.通过流的方式访问hdfs:

/**
 * 相对那些封装好的方法而言的更底层一些的操作方式
 * 上层那些mapreduce   spark等运算框架,去hdfs中获取数据的时候,就是调的这种底层的api
 * @author
 *
 */
public class StreamAccess {
  FileSystem fs = null;
  @Before
  public void init() throws Exception {
    Configuration conf = new Configuration();
    fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
  }
  @Test
  public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
    //先获取一个文件的输入流----针对hdfs上的
    FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
    //再构造一个文件的输出流----针对本地的
    FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
    //再将输入流中数据传输到输出流
    IOUtils.copyBytes(in, out, 4096);
  }
  /**
   * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度
   * 用于上层分布式运算框架并发处理数据
   * @throws IllegalArgumentException
   * @throws IOException
   */
  @Test
  public void testRandomAccess() throws IllegalArgumentException, IOException{
    //先获取一个文件的输入流----针对hdfs上的
    FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
    //可以将流的起始偏移量进行自定义
    in.seek(22);
    //再构造一个文件的输出流----针对本地的
    FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
    IOUtils.copyBytes(in,out,19L,true);
  }
  /**
   * 显示hdfs上文件的内容
   * @throws IOException 
   * @throws IllegalArgumentException 
   */
  @Test
  public void testCat() throws IllegalArgumentException, IOException{
    FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
    IOUtils.copyBytes(in, System.out, 1024);
  }
}

2.场景编程:

在mapreduce 、spark等运算框架中,有一个核心思想就是将运算移往数据,或者说,就是要在并发计算中尽可能让运算本地化,这就需要获取数据所在位置的信息并进行相应范围读取以下模拟实现:获取一个文件的所有block位置信息,然后读取指定block中的内容

@Test
  public void testCat() throws IllegalArgumentException, IOException{
    FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));
    //拿到文件信息
    FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10"));
    //获取这个文件的所有block的信息
    BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());
    //第一个block的长度
    long length = fileBlockLocations[0].getLength();
    //第一个block的起始偏移量
    long offset = fileBlockLocations[0].getOffset();
    System.out.println(length);
    System.out.println(offset);
    //获取第一个block写入输出流
//    IOUtils.copyBytes(in, System.out, (int)length);
    byte[] b = new byte[4096];
    FileOutputStream os = new FileOutputStream(new File("d:/block0"));
    while(in.read(offset, b, 0, 4096)!=-1){
      os.write(b);
      offset += 4096;
      if(offset>=length) return;
    };
    os.flush();
    os.close();
    in.close();
  }


目录
相关文章
|
28天前
|
Java
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
86 34
|
3月前
|
Java
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
71 2
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
|
3月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
129 3
|
3月前
|
分布式计算 资源调度 Hadoop
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
103 4
|
4月前
|
域名解析 分布式计算 网络协议
java遍历hdfs路径信息,报错EOFException
java遍历hdfs路径信息,报错EOFException
44 3
|
6月前
|
存储 Java 索引
Java ArrayList操作指南:如何移除并返回第一个元素
通过上述方法,你可以方便地从Java的 `ArrayList` 中移除并返回第一个元素。这种操作在日常编程中非常常见,是处理列表时的基本技能之一。希望这篇指南能帮助你更好地理解和运用Java的 `ArrayList`。
69 4
|
6月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
分布式计算 DataWorks Java
DataWorks操作报错合集之使用ODPS Tunnel Upload功能时,遇到报错:Java 堆内存不足,该如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
并行计算 Java 数据挖掘
Java面试题:解释Java中的Stream API及其操作
Java面试题:解释Java中的Stream API及其操作
68 0
|
6月前
|
SQL 缓存 Java
使用MyBatis优化Java持久层操作
使用MyBatis优化Java持久层操作