《Hadoop海量数据处理:技术详解与项目实战》一3.3 如何访问HDFS

简介:

本节书摘来异步社区《Hadoop海量数据处理:技术详解与项目实战》一书中的第3章,第3.3节,作者: 范东来 责编: 杨海玲,更多章节内容可以访问云栖社区“异步社区”公众号查看。

3.3 如何访问HDFS

Hadoop海量数据处理:技术详解与项目实战
HDFS提供给HDFS客户端访问的方式多种多样,用户可以根据不同的情况选择不同的方式。

3.3.1 命令行接口

Hadoop自带一组命令行工具,而其中有关HDFS的命令是其工具集的一个子集。命令行工具虽然是最基础的文件操作方式,但却是最常用的。作为一名合格的Hadoop开发人员和运维人员,熟练掌握是非常有必要的。

执行hadoop dfs命令可以显示基本的使用信息。

[hadoop@master bin]$ hadoop dfs
Usage: java FsShell
         [-ls <path>]
         [-lsr <path>]
         [-df [<path>]]
         [-du <path>]
         [-dus <path>]
         [-count[-q] <path>]
         [-mv <src> <dst>]
         [-cp <src> <dst>]
         [-rm [-skipTrash] <path>]
         [-rmr [-skipTrash] <path>]
         [-expunge]
         [-put <localsrc> ... <dst>]
         [-copyFromLocal <localsrc> ... <dst>]
         [-moveFromLocal <localsrc> ... <dst>]
         [-get [-ignoreCrc] [-crc] <src> <localdst>]
         [-getmerge <src> <localdst> [addnl]]
         [-cat <src>]
         [-text <src>]
         [-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>]
         [-moveToLocal [-crc] <src> <localdst>]
         [-mkdir <path>]
         [-setrep [-R] [-w] <rep> <path/file>]
         [-touchz <path>]
         [-test -[ezd] <path>]
         [-stat [format] <path>]
         [-tail [-f] <file>]
         [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
         [-chown [-R] [OWNER][:[GROUP]] PATH...]
         [-chgrp [-R] GROUP PATH...]
         [-help [cmd]]

表3-2列出了hadoop命令行接口,并用例子说明各自的功能。


b3_2

3.3.2 Java API

本地访问HDFS最主要的方式是HDFS提供的Java应用程序接口,其他的访问方式都建立在这些应用程序接口之上。为了访问HDFS,HDFS客户端必须拥有一份HDFS的配置文件,也就是hdfs-site.xml文件,以获取NameNode的相关信息,每个应用程序也必须能访问Hadoop程序库JAR文件,也就是在$HADOOP_HOME、$HADOOP_HOME/lib下面的jar文件。

Hadoop是由Java编写的,所以通过Java API可以调用所有的HDFS的交互操作接口,最常用的是FileSystem类,它也是命令行hadoop fs的实现,其他接口在这一节也会有 介绍。

1.java.net.URL
先来看一个例子,如代码清单3-1所示。

代码清单3-1 java.net.URL示例

package com.hdfsclient;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

public class MyCat {

static{
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    public static void main(String[] args) throws MalformedURLException, IOException{
        InputStream in = null;
    try {
        in = new URL(args[0]).openStream();
        IOUtils.copyBytes(in, System.out, 4096,false);
    } finally{
        IOUtils.closeStream(in);
    }
  }
}

编译代码清单3-1所示的代码,导出为xx.jar文件,执行命令:

hadoop jar xx.jar hdfs://master:9000/user/hadoop/test

执行完成后,屏幕上输出HDFS的文件/user/hadoop/test的文件内容。

该程序是从HDFS读取文件的最简单的方式,即用java.net.URL对象打开数据流。代码中,静态代码块的作用是让Java程序识别Hadoop的HDFS url。

2.org.apache.hadoop.fs.FileSystem
虽然上面的方式是最简单的方式,但是在实际开发中,访问HDFS最常用的类还是FileSystem类。

(1)读取文件

读取文件的示例如代码清单3-2所示。

代码清单3-2 读取文件示例

package com.hdfsclient;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileSystemCat {

    public static void main(String[] args) throws IOException {
        String uri = "hdfs://master:9000/user/hadoop/test";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);

        InputStream in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096,false);
        } finally{
            IOUtils.closeStream(in);
        }
    }
}

编译代码3-2,导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.FileSystemCat

执行完成后控制台会输出文件内容。

FileSystem类的实例获取是通过工厂方法:

public static FileSystem get(URI uri,Configuration conf) throws IOException

其中Configuration对象封装了HDFS客户端或者HDFS集群的配置,该方法通过给定的URI方案和权限来确定要使用的文件系统。得到FileSystem实例之后,调用open()函数获得文件的输入流:

Public FSDataInputStream open(Path f) throws IOException

方法返回Hadoop独有的FSDataInputStream对象。

(2)写入文件

写入文件的示例如代码清单3-3所示。

代码清单3-3 写入文件示例

package com.hdfsclient;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileCopyFromLocal {
    public static void main(String[] args) throws IOException {
        //本地文件路径
        String source = "/home/hadoop/test";
        String destination = "hdfs://master:9000/user/hadoop/test2";
        InputStream in = new BufferedInputStream(new FileInputStream(source));
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(destination),conf);
        OutputStream out = fs.create(new Path(destination));
        IOUtils.copyBytes(in, out, 4096,true);
    }
}

编译代码清单3-3所示的代码,导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.FileCopyFromLocal
FileSystem实例的create()方法返回FSDataOutputStream对象,与FSDataInputStream类不同的是,FSDataOutputStream不允许在文件中定位,这是因为HDFS只允许一个已打开的文件顺序写入,或在现有文件的末尾追加数据。

(3)创建HDFS的目录

创建HDFS目录的示例如代码清单3-4所示。

代码清单3-4 创建HDFS目录示例

package com.hdfsclient;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CreateDir {

    public static void main(String[] args){
        String uri = "hdfs://master:9000/user/test";
        Configuration conf = new Configuration();
        try {
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            Path dfs=new Path("hdfs://master:9000/user/test");
            fs.mkdirs(dfs);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

编译代码清单3-4所示的代码,导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.CreateDir

运行完成后可以用命令hadoop dfs -ls验证目录是否创建成功。

(4)删除HDFS上的文件或目录

删除HDFS上的文件或目录的示例如代码清单3-5所示。

代码清单3-5 删除HDFS上的文件或目录示例

package com.hdfsclient;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class DeleteFile {
    public static void main(String[] args){
    String uri = "hdfs://master:9000/user/hadoop/test";
        Configuration conf = new Configuration();
        try {
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            Path delef=new Path("hdfs://master:9000/user/hadoop");
            boolean isDeleted=fs.delete(delef,false);
            //是否递归删除文件夹及文件夹下的文件
            //boolean isDeleted=fs.delete(delef,true);
            System.out.println(isDeleted);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

编译代码清单3-5所示的代码,导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.DeleteFile

如果需要递归删除文件夹,则需将fs.delete(arg0, arg1)方法的第二个参数设为true。

(5)查看文件是否存在

查看文件的示例如代码清单3-6所示。

代码清单3-6 查看文件示例

package com.hdfsclient;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CheckFileIsExist {
    public static void main(String[] args){
     String uri = "hdfs://master:9000/user/hadoop/test";
         Configuration conf = new Configuration();

         try {
             FileSystem fs = FileSystem.get(URI.create(uri), conf);
             Path path=new Path(url);
             boolean isExists=fs.exists(path);
             System.out.println(isExists);
         } catch (IOException e) {
             e.printStackTrace();
         }
     }
}

编译代码清单3-6所示的代码,导出为xx.jar文件,上传至集群任意一台节点,执行命令:

hadoop jar xx.jar com.hdfsclient.CheckFileIsExist

(6)列出目录下的文件或目录名称

列出目录下的文件或目录名称的示例如代码清单3-7所示。

代码清单3-7 列出目录下的文件或目录名称示例

package com.hdfsclient;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class ListFiles {
    public static void main(String[] args){
        String uri = "hdfs://master:9000/user";
        Configuration conf = new Configuration();
        try {
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            Path path =new Path(uri);
            FileStatus stats[]=fs.listStatus(path);
            for(int i = 0; i < stats.length; ++i){
                System.out.println(stats[i].getPath().toString());
        }
            fs.close();
        }   catch (IOException e) {
            e.printStackTrace();
        }
    }
}

编译代码清单3-7所示的代码,导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.ListFiles

运行后,控制台会打印出/user目录下的目录名称或文件名。

(7)文件存储的位置信息

文件存储的位置信息的示例如代码清单3-8所示。

代码清单3-8 文件存储的位置信息示例

package com.hdfsclient;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class LocationFile {
    public static void main(String[] args){
     String uri = "hdfs://master:9000/user/test/test";
         Configuration conf = new Configuration();
         try {
             FileSystem fs = FileSystem.get(URI.create(uri), conf);
             Path fpath=new Path(uri);
             FileStatus filestatus = fs.getFileStatus(fpath);
             BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
            int blockLen = blkLocations.length;
            for(int i=0;i<blockLen;i++){
               String[] hosts = blkLocations[i].getHosts();
               System.out.println("block_"+i+"_location:"+hosts[0]);
            }
         } catch (IOException e) {
             e.printStackTrace();
         }
     }
}

编译代码清单3-8所示的代码,导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.LocationFile

前面提到过,HDFS的存储由DataNode的块完成,执行成功后,控制台会输出:

block_0_location:slave1
block_1_location:slave2
block_2_location:slave3

表示该文件被分为3个数据块存储,存储的位置分别为slave1、slave2、slave3。

3.SequenceFile
SequeceFile是HDFS API提供的一种二进制文件支持,这种二进制文件直接将对序列化到文件中,所以SequenceFile是不能直接查看的,可以通过hadoop dfs -text命令查看,后面跟要查看的SequenceFile的HDFS路径。

(1)写入SequenceFile

写入SequenceFile示例如代码清单3-9所示。

代码清单3-9 写入SequenceFile示例

package com.hdfsclient;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class SequenceFileWriter {

    private static final String[] text = {
        "两行黄鹂鸣翠柳",
        "一行白鹭上青天",
        "窗含西岭千秋雪",
        "门泊东吴万里船",
    };


public static void main(String[] args) {
        String uri = "hdfs://master:9000/user/hadoop/testseq";
        Configuration conf = new Configuration();
        SequenceFile.Writer writer = null;
        try {
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            Path path =new Path(uri);
            IntWritable key = new IntWritable();
            Text value = new Text();
            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());

            for (int i = 0;i<100;i++){
                key.set(100-i);
                value.set(text[i%text.length]);
                writer.append(key, value);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            IOUtils.closeStream(writer);
        }
    }
}

可以看到SequenceFile.Writer的构造方法需要制定键值对的类型。如果是日志文件,那么时间戳作为key,日志内容是value是非常合适的。

编译代码清单3-9所示的代码,导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.SequenceFileWriter

运行完成后,执行命令:

hadoop dfs -text /user/hadoop/testseq

可以看到如下内容:

100 两行黄鹂鸣翠柳
99 一行白鹭上青天
98 窗含西岭千秋雪
97 门泊东吴万里船
……
2 窗含西岭千秋雪
1 门泊东吴万里船

(2)读取SequenceFile

读取SequenceFile示例如代码清单3-10所示。

代码清单3-10 读取SequenceFile示例

package com.hdfsclient;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileReader {
    public static void main(String[] args) {
        String uri = "hdfs://master:9000/user/hadoop/testseq";
        Configuration conf = new Configuration();
        SequenceFile.Reader reader = null;

        try {
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            Path path = new Path(uri);
            reader = new SequenceFile.Reader(fs, path, conf);
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
            long position = reader.getPosition();   
            while(reader.next(key,value)){
                System.out.printf("[%s]\t%s\n",key,value);
                position = reader.getPosition();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            IOUtils.closeStream(reader);
        }
    }
}

编译代码清单3-10所示的代码,导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.SequenceFileReader

运行完成后,控制台会输出:

[100] 两行黄鹂鸣翠柳
[99]  一行白鹭上青天
[98]  窗含西岭千秋雪
[97]  门泊东吴万里船
……
[2]  窗含西岭千秋雪
[1]  门泊东吴万里船
3.3.3 其他常用的接口

1.Thrift
HDFS的底层的接口是通过Java API提供的,所以非Java程序访问HDFS比较麻烦。弥补方法是通过thriftfs功能模块中的Thrift API将HDFS封装成一个Apache Thrift服务。这样任何与Thrift绑定的语言都能与HDFS进行交互。为了使用Thrift API,需要运行提供Thrift服务的服务器,并以代理的方式访问Hadoop。目前支持远程调用Thrift API的语言有C++、Perl、PHP、Python和Ruby。

2.FUSE
用户空间文件系统(Filesystem in Universe,FUSE)允许把按照用户空间实现的文件系统整合成一个Unix文件系统。通过使用Hadoop的Fuse-DFS功能模块,任意一个Hadoop文件系统(如HDFS)均可作为一个标准文件系统进行挂载。随后便可以使用普通Unix命令,如ls、cat等与该文件系统交互,还可以通过任意一种编程语言调用POSIX库来访问文件系统。

其余可以访问HDFS的还有WebDAV、HTTP、ftp接口,不过并不常用。

3.3.4 Web UI

我们还可以通过NameNode的50070端口号访问HDFS的Web UI,HDFS的Web UI包含了非常丰富并且实用的信息,如图3-10所示。通过HDFS的Web UI了解集群的状况是一名合格Hadoop开发和运维人员必备的条件。

我们可以直接在浏览器中输入master:9000(即NameNode的主机名:端口号)便可进入Web UI。如图3-10,点击“Browse the filesystem”可以查看整个HDFS的目录树,点击“Namenode Logs”可以查看所有的NameNode的日志,这对于排查错误十分有帮助。下面的表格展示了整个HDFS大致的信息,如总容量、使用量、剩余量等,其中点击“Live Nodes”选项,可以看到所有DataNode节点的状况,如图3-11所示。


3_10_11

相关文章
|
3月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
201 6
|
3月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
73 3
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
129 2
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
93 1
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
116 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
54 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
69 0
|
3月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
88 2
|
13天前
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
49 4
|
3月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
87 1

热门文章

最新文章

相关实验场景

更多