在这里记录下学习hadoop 的过程,并对重要内容记录下来,以备以后查漏补缺。
要从Hadoop文件系统中读取文件,一般有两种方式:
1.使用java.net.URL对象
package com.ytu.chapter3; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; public class URLCat { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) { InputStream input = null; try { input = new URL("hdfs://localhost:9000/user/liujiacai/build.xml").openStream(); IOUtils.copyBytes(input, System.out, 4096, false); } catch (MalformedURLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeStream(input); } } }
这种方式需要让Java识别Hadoop文件系统的URL方案,就是通过一个FsUrlStreamHandlerFactory实例来调用URL中的setURLStreamHandlerFactory方法。这种方法在一个java虚拟机中只被调用一次,所以一般放在static块中。这个限制意味着如果程序的其他部件设置了一个URLStreamHandlerFactory,我们便无法再从Hadoop中读取数据了。
这需要是我第二种方法。
2.使用FileSystemAPI读取数据
在命令行中,我们可以和使用linux系统命令一样来操作hdfs系统。
hadoop fs -ls /
这个命令可以查看根目录下的文件,如果想要递归查看,参数改为 -lsr 即可
如果想知道更多的帮助可以用以下命令:
hadoop fs -help ls
可以得到ls的用法提示。
这里重点讲解用hadoop api操作hdfs文件系统。
通过调用FileSystem.get(Configuration conf)工厂方法可以得到FileSystem的实例。
Configuration class is a special class for holding key/value configuration parameters.
Configuration对象封装了一个客户端或者服务器的配置,这是用从类路径对去而来的配置文件(如conf/core-site.xml)来设置。
public static FileSystem get(Configuraion conf) throws IOException
这个静态方法返回的是默认文件系统(在conf/core-site.xml中设置,如果没有设置过,则是默认的本地文件系统)。
我们可以这样得到HDFS文件系统:
Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf);
我们可以这样得到本地文件系统
FileSystem local = FileSystem.getLocal(conf);
在hadoop文件api中,我们用Path对象来编码文件名和文件夹名,用FileStatus对象来存储文件与文件夹的元信息(metadata)
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class PutMerge { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); FileSystem local = FileSystem.getLocal(conf); Path inputDir = new Path(args[0]); Path outputDir = new Path(args[1]); FileStatus[] inputFiles = local.listStatus(inputDir); //FSDataOutputStream是Java标准库java.io.DataOutputSteam的子类,同时增加了随机访问的功能 FSDataOutputStream out = hdfs.create(outputDir); for (int i = 0; i < inputFiles.length; i++) { System.out.println(inputFiles[i].getPath().getName()); FSDataInputStream in = local.open(inputFiles[i].getPath());//默认4K为缓冲区大小 byte[] buffer = new byte[256]; int bytesRead = 0; while((bytesRead = in.read(buffer))>0) { out.write(buffer,0,bytesRead); } in.close(); } out.close(); } }
上面这个完整的程序完成的功能是:把本地的一个文件夹中的文件再上传到hdfs文件系统时将它们合并。
除此之外FileSystem类也有诸如delete(),exists(),mkdirs(),rename()等方法。
FSDataInputStream不是标准的java.io类,这个类是java.io.DataInputStream的一个子类,支持随机访问,这样就可以从流的任意位置读取数据了。
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, Closeable
上面是它的签名,Seekable接口允许在文件中定位,并提供一个查询方式,用于查询当前位置相对于文件开始位置的偏移量。
public interface Seekable { /** * Seek to the given offset from the start of the file. * The next read() will be from that location. Can't * seek past the end of the file. */ void seek(long pos) throws IOException; /** * Return the current offset from the start of the file */ long getPos() throws IOException; /** * Seeks a different copy of the data. Returns true if * found a new source, false otherwise. */ boolean seekToNewSource(long targetPos) throws IOException; }
调用seek方法来定位大于文件长度的位置会导致IOException异常。与java.io.InputStream中的skip()方法不同,seek()并没有指出数据流当前位置之后的一点,他可以转移到文件中任何一个位置。
应用程序员并不常用seekToNewSource方法。此方法一般倾向于切换到数据的另一个副本并在新的副本中寻找targetPos指定的位置。HDFS内部就采用这种方式在数据节点故障时为客户端提供可靠的数据流。
FsDataInputStream也实现了PositionedReadable接口
public interface PositionedReadable { /** * Read upto the specified number of bytes, from a given * position within a file, and return the number of bytes read. This does not * change the current offset of a file, and is thread-safe. */ public int read(long position, byte[] buffer, int offset, int length) throws IOException; /** * Read the specified number of bytes, from a given * position within a file. This does not * change the current offset of a file, and is thread-safe. */ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException; /** * Read number of bytes equalt to the length of the buffer, from a given * position within a file. This does not * change the current offset of a file, and is thread-safe. */ public void readFully(long position, byte[] buffer) throws IOException; }这个接口内的方法都会保留文件当前位置并且是线程安全的,因此他们提供了在读取文件的主要部分时访问其他部分的便利方法。
最后务必记住:seek方法是一个相对高开销的操作,需要慎重使用。