Hadoop文件系统访问的两种方式

简介: <p>在这里记录下学习hadoop 的过程,并对重要内容记录下来,以备以后查漏补缺。</p> <p>要从Hadoop文件系统中读取文件,一般有两种方式:</p> <p>1.使用java.net.URL对象</p> <p></p> <pre name="code" class="java">package com.ytu.chapter3;import java.io.IOExc

在这里记录下学习hadoop 的过程,并对重要内容记录下来,以备以后查漏补缺。

要从Hadoop文件系统中读取文件,一般有两种方式:

1.使用java.net.URL对象

package com.ytu.chapter3;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

public class URLCat {
	static {
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
	}
	public static void main(String[] args) {
		InputStream input = null;
		
		try {
			input = new URL("hdfs://localhost:9000/user/liujiacai/build.xml").openStream();
			IOUtils.copyBytes(input, System.out, 4096, false);
		} catch (MalformedURLException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(input);
		}
		
	}
}

这种方式需要让Java识别Hadoop文件系统的URL方案,就是通过一个FsUrlStreamHandlerFactory实例来调用URL中的setURLStreamHandlerFactory方法。这种方法在一个java虚拟机中只被调用一次,所以一般放在static块中。这个限制意味着如果程序的其他部件设置了一个URLStreamHandlerFactory,我们便无法再从Hadoop中读取数据了。

这需要是我第二种方法。

2.使用FileSystemAPI读取数据

在命令行中,我们可以和使用linux系统命令一样来操作hdfs系统。

hadoop fs -ls /

这个命令可以查看根目录下的文件,如果想要递归查看,参数改为  -lsr 即可

如果想知道更多的帮助可以用以下命令:

hadoop fs -help ls

可以得到ls的用法提示。

这里重点讲解用hadoop api操作hdfs文件系统。


通过调用FileSystem.get(Configuration conf)工厂方法可以得到FileSystem的实例。

Configuration class is a special class for holding key/value configuration parameters.

Configuration对象封装了一个客户端或者服务器的配置,这是用从类路径对去而来的配置文件(如conf/core-site.xml)来设置。

public static FileSystem get(Configuraion conf) throws IOException

这个静态方法返回的是默认文件系统(在conf/core-site.xml中设置,如果没有设置过,则是默认的本地文件系统)。

我们可以这样得到HDFS文件系统:

Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);

我们可以这样得到本地文件系统

FileSystem local = FileSystem.getLocal(conf);

在hadoop文件api中,我们用Path对象来编码文件名和文件夹名,用FileStatus对象来存储文件与文件夹的元信息(metadata)

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class PutMerge {

	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FileSystem local = FileSystem.getLocal(conf);

		Path inputDir = new Path(args[0]);
		Path outputDir = new Path(args[1]);

		FileStatus[] inputFiles = local.listStatus(inputDir);

		//FSDataOutputStream是Java标准库java.io.DataOutputSteam的子类,同时增加了随机访问的功能
		FSDataOutputStream out = hdfs.create(outputDir);

		for (int i = 0; i < inputFiles.length; i++) {
			System.out.println(inputFiles[i].getPath().getName());
			FSDataInputStream in = local.open(inputFiles[i].getPath());//默认4K为缓冲区大小
			byte[] buffer = new byte[256];
			int bytesRead = 0;
			while((bytesRead = in.read(buffer))>0) {
				out.write(buffer,0,bytesRead);
			}
			in.close();
		}
		out.close();
	}

}

上面这个完整的程序完成的功能是:把本地的一个文件夹中的文件再上传到hdfs文件系统时将它们合并。

除此之外FileSystem类也有诸如delete(),exists(),mkdirs(),rename()等方法。


FSDataInputStream不是标准的java.io类,这个类是java.io.DataInputStream的一个子类,支持随机访问,这样就可以从流的任意位置读取数据了。

public class FSDataInputStream extends DataInputStream
    implements Seekable, PositionedReadable, Closeable 

上面是它的签名,Seekable接口允许在文件中定位,并提供一个查询方式,用于查询当前位置相对于文件开始位置的偏移量。

public interface Seekable {
  /**
   * Seek to the given offset from the start of the file.
   * The next read() will be from that location.  Can't
   * seek past the end of the file.
   */
  void seek(long pos) throws IOException;
  
  /**
   * Return the current offset from the start of the file
   */
  long getPos() throws IOException;

  /**
   * Seeks a different copy of the data.  Returns true if 
   * found a new source, false otherwise.
   */
  boolean seekToNewSource(long targetPos) throws IOException;
}

调用seek方法来定位大于文件长度的位置会导致IOException异常。与java.io.InputStream中的skip()方法不同,seek()并没有指出数据流当前位置之后的一点,他可以转移到文件中任何一个位置。

应用程序员并不常用seekToNewSource方法。此方法一般倾向于切换到数据的另一个副本并在新的副本中寻找targetPos指定的位置。HDFS内部就采用这种方式在数据节点故障时为客户端提供可靠的数据流。





FsDataInputStream也实现了PositionedReadable接口

public interface PositionedReadable {
  /**
   * Read upto the specified number of bytes, from a given
   * position within a file, and return the number of bytes read. This does not
   * change the current offset of a file, and is thread-safe.
   */
  public int read(long position, byte[] buffer, int offset, int length)
    throws IOException;
  
  /**
   * Read the specified number of bytes, from a given
   * position within a file. This does not
   * change the current offset of a file, and is thread-safe.
   */
  public void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException;
  
  /**
   * Read number of bytes equalt to the length of the buffer, from a given
   * position within a file. This does not
   * change the current offset of a file, and is thread-safe.
   */
  public void readFully(long position, byte[] buffer) throws IOException;
}
这个接口内的方法都会保留文件当前位置并且是线程安全的,因此他们提供了在读取文件的主要部分时访问其他部分的便利方法。


最后务必记住:seek方法是一个相对高开销的操作,需要慎重使用。







目录
相关文章
|
6月前
|
分布式计算 Hadoop Linux
Hadoop检查本地文件系统:
【7月更文挑战第24天】
58 6
|
6月前
|
分布式计算 Hadoop
|
存储 SQL 弹性计算
手把手教你使用自建Hadoop访问全托管服务化HDFS(OSS-HDFS服务)
1. 服务介绍OSS-HDFS服务(JindoFS 服务)是一款云原生数据湖3.0存储产品,基于统一的元数据管理能力,在完全兼容 HDFS 文件系统接口的同时,提供充分的 POSIX 能力支持,能更好的满足大数据和 AI 领域丰富多样的数据湖计算场景。通过OSS-HDFS服务,无需对现有的 Hadoop/Spark 大数据分析应用做任何修改,通过简单的配置就可以像在原生HDFS中那样管理和访问数据
手把手教你使用自建Hadoop访问全托管服务化HDFS(OSS-HDFS服务)
|
分布式计算 Hadoop Java
Hadoop/Spark 访问 OSS 加速 | 学习笔记
快速学习Hadoop/Spark 访问 OSS 加速。
476 0
|
机器学习/深度学习 分布式计算 安全
大规模集群下Hadoop NameNode如何承载每秒上千次的高并发访问
大规模集群下Hadoop NameNode如何承载每秒上千次的高并发访问
313 0
大规模集群下Hadoop NameNode如何承载每秒上千次的高并发访问
|
分布式计算 资源调度 数据可视化
|
分布式计算 Hadoop Java
|
分布式计算 Java Hadoop
Java: Hadoop文件系统的读写操作
Java: Hadoop文件系统的读写操作
173 0
|
存储 分布式计算 安全
|
存储 缓存 分布式计算