《深入理解大数据:大数据处理与编程实践》一一3.5 HDFS基本编程接口与示例

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

本节书摘来自华章计算机《深入理解大数据:大数据处理与编程实践》一书中的第3章,第3.5节,作者 主 编:黄宜华(南京大学)副主编:苗凯翔(英特尔公司),更多章节内容可以访问云栖社区“华章计算机”公众号查看。

3.5 HDFS基本编程接口与示例

除了上一节提到的命令之外,Hadoop提供了可用于读写、操作文件的API,这样可以让程序员通过编程实现自己的HDFS文件操作。
Hadoop提供的大部分文件操作API都位于org.apache.hadoop.fs这个包中。基本的文件操作包括打开、读取、写入、关闭等。为了保证能跨文件系统交换数据,Hadoop的API也可以对部分非HDFS的文件系统提供支持;也就是说,用这些API来操作本地文件系统的文件也是可行的。
3.5.1 HDFS编程基础知识
在Hadoop中,基本上所有的文件API都来自FileSystem类。FileSystem是一个用来与文件系统交互的抽象类,可以通过实现FileSystem的子类来处理具体的文件系统,比如HDFS或者其他文件系统。通过factory方法FileSystem.get(Configuration conf),可以获得所需的文件系统实例(factory方法是软件开发的一种设计模式,指:基类定义接口,但是由子类实例化之;在这里FileSystem定义get接口,但是由FileSytem的子类(比如FilterFileSystem)实现)。Configuration类比较特殊,这个类通过键值对的方式保存了一些配置参数。这些配置默认情况下来自对应文件系统的资源配置。我们可以通过如下方式获得具体的FileSystem实例:
Conf?iguration conf = new Conf?iguration();
FileSystem hdfs = FileSystem.get(conf);
如果要获得本地文件系统对应的FileSystem实例,则可以通过factory方法FileSystem.getLocal(Configuration conf)实现:
FileSystem local = FileSystem.getLocal(conf);
Hadoop中,使用Path类的对象来编码目录或者文件的路径,使用后面会提到的FileStatus类来存放目录和文件的信息。在Java的文件API中,文件名都是String类型的字符串,在这里则是Path类型的对象。
3.5.2 HDFS基本文件操作API
接下来看一下具体的文件操作。我们按照“创建、打开、获取文件信息、获取目录信息、读取、写入、关闭、删除”的顺序讲解Hadoop提供的文件操作的API。
以下接口的实际内容可以在Hadoop API和Hadoop源代码中进一步了解。
1.?创建文件
FileSystem.create方法有很多种定义形式,参数最多的一个是:

public abstract FSDataOutputStream create(Path?f,
              FsPermission?permission,
              boolean overwrite,
              int bufferSize,
              short?replication,
              long?blockSize,
              Progressable?progress)
              Throws IOException

那些参数较少的create只不过是将其中一部分参数用默认值代替,最终还是要调用这个函数。其中各项的含义如下:
f:文件名
overwrite:如果已存在同名文件,overwrite=true覆盖之,否则抛出错误;默认为true。
buffersize:文件缓存大小。默认值:Configuration中io.file.buffer.size的值,如果Configuration中未显式设置该值,则是4096。
replication:创建的副本个数,默认值为1。
blockSize:文件的block大小,默认值:Configuration中fs.local.block.size的值,如果Configuration中未显式设置该值,则是32M。
permission和progress的值与具体文件系统实现有关。
但是大部分情况下,只需要用到最简单的几个版本:
publicFSDataOutputStream create(Path?f);
publicFSDataOutputStream create(Path?f,boolean?overwrite);
publicFSDataOutputStream create(Path?f,boolean?overwrite,int?bufferSize);
2.?打开文件
FileSystem.open方法有2个,参数最多的一个定义如下:
public abstract FSDataInputStream open(Path f, intbufferSize) throws IOException
其中各项的含义如下:
f:文件名
buffersize:文件缓存大小。默认值:Configuration中io.file.buffer.size的值,如果Configur
ation中未显式设置该值,则是4096。
3.?获取文件信息
FileSystem.getFileStatus方法格式如下:
public abstract FileStatus getFileStatus(Path f) throws IOException;
这一函数会返回一个FileStatus对象。通过阅读源代码可知,FileStatus保存了文件的很多信息,包括:
path:文件路径
length:文件长度
isDir:是否为目录
block_replication:数据块副本因子
blockSize:文件长度(数据块数)
modification_time:最近一次修改时间
access_time:最近一次访问时间
owner:文件所属用户
group:文件所属组
如果想了解文件的这些信息,可以在获得文件的FileStatus实例之后,调用相应的getXXX方法(比如,FileStatus.getModificationTime()获得最近修改时间)。
4.?获取目录信息
获取目录信息,不仅是目录本身,还有目录之下的文件和子目录信息,如下所述。FileStatus.listStatus方法格式如下:
public FileStatus[] listStatus(Path f) throws IOException;
如果f是目录,那么将目录之下的每个目录或文件信息保存在FileStatus数组中返回。如果f是文件,和getFileStatus功能一致。
另外,listStatus还有参数为Path[]的版本的接口定义以及参数带路径过滤器PathFilter的接口定义,参数为Path[]的listStatus就是对这个数组中的每个path都调用上面的参数为Path的listStatus。参数中的PathFilter则是一个接口,实现接口的accept方法可以自定义文件过滤规则。
另外,HDFS还可以通过正则表达式匹配文件名来提取需要的文件,这个方法是:
public FileStatus[] globStatus(Path pathPattern) throws IOException;
参数pathPattern中,可以像正则表达式一样,使用通配符来表示匹配规则:
?:表示任意的单个字符。
:表示任意长度的任意字符,可以用来表示前缀后缀,比如.java表示所有java文件。
[abc]:表示匹配a,b,c中的单个字符。
[a-b]:表示匹配a-b范围之间的单个字符。
1:表示匹配除a之外的单个字符。
c:表示取消特殊字符的转义,比如*的结果是*而不是随意匹配。
{ab,cd}:表示匹配ab或者cd中的一个串。
{ab,c{de,fh}}:表示匹配ab或者cde或者cfh中的一个串
5.?读取
3.3.2节提到,调用open打开文件之后,使用了一个FSDataInputStream对象来负责数据的读取。通过FSDataInputStream进行文件读取时,提供的API就是FSDataInputStream.read方法:
public int read(long?position, byte[] buffer, int?offset, int?length) throws IOException
函数的意义是:从文件的指定位置position开始,读取最多length字节的数据,保存到buffer中从offset个元素开始的空间中;返回值为实际读取的字节数。此函数不改变文件当前offset值。不过,使用更多的还有一种简化版本:
public f?inal int read(byte[]?b)throws IOException
从文件当前位置读取最多长度为b.len的数据保存到b中,返回值为实际读取的字节数。
6.?写入
从接口定义可以看出,调用create创建文件以后,使用了一个FSDataOutputStream对象来负责数据的写入。通过FSDataOutputStream进行文件写入时,最常用的API就是write方法:
public void write(byte[]?b,int?off,int?len) throws IOException
函数的意义是:将b中从off开始的最多len个字节的数据写入文件当前位置。返回值为实际写入的字节数。
7.?关闭
关闭为打开的逆过程,FileSystem.close定义如下:

public void close() throws IOException

不需要其他操作而关闭文件。释放所有持有的锁。
8.?删除
删除过程FileSystem.delete定义如下:

public abstract boolean delete(Path f,boolean?recursive) throws IOException

其中各项含义如下:
f:待删除文件名。
recursive:如果recursive为true,并且f是目录,那么会递归删除f下所有文件;如果f是文件,recursive为true还是false无影响。
另外,类似Java中File的接口DeleteOnExit,如果某些文件需要删除,但是当前不能被删;或者说当时删除代价太大,想留到退出时再删除的话,FileSystem中也提供了一个deleteOnExit接口:
Public Boolean deleteOnExit(Path?f) throws IOException
标记文件f,当文件系统关闭时才真正删除此文件。但是这个文件f在文件系统关闭前必须存在。
3.5.3 HDFS基本编程实例
本节介绍使用HDFS的API编程的简单示例。
下面的程序可以实现如下功能:在输入文件目录下的所有文件中,检索某一特定字符串所出现的行,将这些行的内容输出到本地文件系统的输出文件夹中。这一功能在分析MapReduce作业的Reduce输出时很有用。
这个程序假定只有第一层目录下的文件才有效,而且,假定文件都是文本文件。当然,如果输入文件夹是Reduce结果的输出,那么一般情况下,上述条件都能满足。为了防止单个的输出文件过大,这里还加了一个文件最大行数限制,当文件行数达到最大值时,便关闭此文件,创建另外的文件继续保存。保存的结果文件名为1,2,3,4,…,以此类推。
如上所述,这个程序可以用来分析MapReduce的结果,所以称为ResultFilter。
程序:Result Filter
输入参数:此程序接收4个命令行输入参数,参数含义如下:

<dfs path>:HDFS上的路径
<local path>:本地路径
<match str>:待查找的字符串
<single file lines>:结果每个文件的行数
程序:ResultFilter
import java.util.Scanner;
import java.io.IOException;
import java.io.File;

import org.apache.hadoop.conf.Conf?iguration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class resultFilter
{
    public static void main(String[] args) throws IOException {
        Conf?iguration conf = new Conf?iguration();
        // 以下两句中,hdfs和local分别对应HDFS实例和本地文件系统实例
        FileSystem hdfs = FileSystem.get(conf);
        FileSystem local = FileSystem.getLocal(conf);

        Path inputDir, localFile;

        FileStatus[] inputFiles;
        FSDataOutputStream out = null;
        FSDataInputStream in = null;
        Scanner scan;
        String str;
        byte[] buf;
        int singleFileLines;
        int numLines, numFiles, i;

        if(args.length!=4)
        {
            // 输入参数数量不够,提示参数格式后终止程序执行
            System.err.println("usage resultFilter <dfs path><local path>" + 
            " <match str><single f?ile lines>");
            return;
        }
        inputDir = new Path(args[0]);
        singleFileLines = Integer.parseInt(args[3]);

        try {
            inputFiles = hdfs.listStatus(inputDir);    // 获得目录信息
            numLines = 0;
            numFiles = 1;                // 输出文件从1开始编号
            localFile = new Path(args[1]);
            if(local.exists(localFile))        // 若目标路径存在,则删除之
                local.delete(localFile, true);
            for (i = 0; i<inputFiles.length; i++) {
                if(inputFiles[i].isDir() == true)    // 忽略子目录
                        continue;
            System.out.println(inputFiles[i].getPath().getName());
            in = hdfs.open(inputFiles[i].getPath());
            scan = new Scanner(in);
            while (scan.hasNext()) {
                str = scan.nextLine();
                if(str.indexOf(args[2])==-1)
                    continue;            // 如果该行没有match字符串,则忽略之
                numLines++;
                if(numLines == 1)            // 如果是1,说明需要新建文件了
                {
                    localFile = new Path(args[1] + File.separator + numFiles);
                    out = local.create(localFile);    // 创建文件
                    numFiles++;
                }
                buf = (str+"\n").getBytes();
                out.write(buf, 0, buf.length);        // 将字符串写入输出流
                if(numLines == singleFileLines)    // 如果已满足相应行数,关闭文件
                {
                    out.close();
                    numLines = 0;            // 行数变为0,重新统计
                }
            }// end of while
                scan.close();
                in.close();
            }// end of for
            if(out != null)
                out.close();
            } // end of try
            catch (IOException e) {
                e.printStackTrace();
            }
        }// end of main
    }// end of resultFilter

程序的编译命令:
javac *.java
运行命令:

hadoop jar resultFilter.jar resultFilter <dfs path>\
          <local path><match str><single f?ile lines>

参数和含义如下:
:HDFS上的路径
:本地路径
:待查找的字符串
:结果的每个文件的行数
上述程序的逻辑很简单,获取该目录下所有文件的信息,对每一个文件,打开文件、循环读取数据、写入目标位置,然后关闭文件,最后关闭输出文件。这里粗体打印的几个函数上面都有介绍,不再赘述。
我们在自己机器上预装的hadoop-1.0.4上简单试验了这个程序,在hadoop源码中拷贝了几个文件,然后上传到HDFS中,文件如下(见图3-17):
image

然后,编译运行一下该示例程序,显示一下目标文件内容,结果如图3-18所示,其中,将出现“java”字符串的每一行都输出到文件中。

image


  1. a
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
5月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
284 6
|
5月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
76 4
|
5月前
|
消息中间件 分布式计算 关系型数据库
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
88 0
|
5月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
114 0
|
3月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
135 2
|
5月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
80 4
|
5月前
|
存储 分布式计算 资源调度
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
137 5
|
5月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
67 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
5月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
80 0
|
5月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
110 0