Hadoop中文件读写(Java)

简介:

前言

    在本文档中,你将了解到如何用Java接口读写Hadoop分布式系统中的文件,以及编码的转换等问题。其中有些细节,在你不知道的时候,是非常容易出错的。 这边读写文件分以下三种情况:

    1. 在非Map Reduce过程中读写分布式文件系统中的文件

    比如说,你想自己遍历一个文件,想截断一个文件,都属于这种方式。一般该过程发生在run函数中,程序员处理Map Reduce产生的中间文件上。

    2. 在map(或reduce)函数中读写一个Record。

对于TextInputFormat,一个Record就是一行。我们会得到一个Text对象,作为一行。要注意的是如果读入的文件不是UTF-8 格式(比如GBK,因为TextInputFormat只能解码UTF-8文件,直接读会产生乱码),我们如何正确转换成Unicode。

    3. 在map(或reduce)函数中,读写文件

比如说,在map函数中,你想通过读入文件初始化一个HashMap。

非Map Reduce过程中读文件

    主要用到FileSystem类,打开一个文件后得到FSDataInputStream,据说这个FSDataInputStream会对数据缓 存,所以没必要包装成一个BufferedReader,但其readLine()方法是被deprecated,所以你要想一行行的读,还是转成 BufferedReader吧。在转换的过程中,还能指定输入文件的编码,比如这边是"UTF-8"。 该读文件的方式无法在map或reduce方法中调用,因为它们处于一个static类中,无法创建一个JobConf。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
JobConf confQ =  new  JobConf(getConf(), XXXX. class );
FileSystem fs= FileSystem.get(confQ);
FSDataInputStream fin = fs.open( new  Path( "filePathXX" ));
BufferedReader in =  null ;
String line;
try  {
     in =  new  BufferedReader( new  InputStreamReader(fin,  "UTF-8" ));
     while  ((line = in.readLine()) !=  null ) {
         //...
     }
 
finally  {
     if  (in !=  null ) {
         in.close();
     }
}

非Map Reduce过程中写文件

该过程和读文件对应,注意在close之前,调用BufferedWriter的flush()方法,不然有数据会没写出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
JobConf confQ =  new  JobConf(getConf(), XXXX. class );
FileSystem fs= FileSystem.get(confQ);
FSDataOutputStream fout = fs.create( new  Path( "要写入的文件全路径" ));
BufferedWriter out =  null ;
try  {
     out =  new  BufferedWriter( new  OutputStreamWriter(fout,  "UTF-8" ));
     out.write( "XXXXXX" );
     out.newLine();
     }
     out.flush();
finally  {
     if  (out !=  null ) {
         out.close();
     }
}

Map或reduce方法中需要读取一个文件的解决方案

    对于很大的文件,目前的方案可能是预先把这个文件部署到每个集群节点上。这边讲两种小文件的处理方法。

方法一:把文件打包到运行的jar包中(可以放在根目录下),然后用以下方式读取(fileName就是在根目录下要读取的文件名):

1
2
3
4
5
6
7
8
9
10
11
12
13
BufferedReader in =  null ;
try  {
     InputStream fstream = Thread.currentThread()
             .getContextClassLoader().getResourceAsStream( "fileName" );
     in =  new  BufferedReader( new  InputStreamReader( new  DataInputStream(
             fstream),  "UTF-8" ));
     String line;
     while  ((line = in.readLine()) !=  null ) {
         //...
     }
finally  {
     in.close();
}

方法二:在Map或Reduce类中,复写public void configure(JobConf jobIn)方法(在基类MapReduceBase中定义)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public  void  configure(JobConf jobIn) {
     super .configure(jobIn);
     try {
         FileSystem fs =  null ;
         fs = FileSystem.get(jobIn);
         FSDataInputStream in;
         BufferedReader bufread;
         String strLine;
         String[] strList;
         IpField ipField;
         Path IpCityPath =  new  Path( "/user/hadoop/dw/dim/ip_cityid.txt" );
         if  (!fs.exists(IpCityPath))
             throw  new  IOException( "Input file not found" );
         if  (!fs.isFile(IpCityPath))
             throw  new  IOException( "Input should be a file" );
         in = fs.open(IpCityPath);
         bufread =  new  BufferedReader( new  InputStreamReader(in));
         while  ((strLine = bufread.readLine()) !=  null ) {
             strList =strLine.split( "\"" );
             if (strList.length <  3 )
                 continue ;
             IpField nodeIp =  new  IpField(strList[ 0 ], strList[ 1 ], strList[ 2 ]);
             CityIpLocal.add(nodeIp);
         }
         in.close();
     }
     catch (IOException e)
     {
        e.printStackTrace();
     }
}

CityIpLocal可以是外部类的一个ArrayList对象

map方法中读GBK文件

    主要是编码转换的问题,我曾经在这个问题上调试蛮久的。我们输入的文件是GBK编码的,map中,调用Text.toString并不会把编码自动转换成Unicode,但Java中的字符串都是当Unicode来处理的,我们需要手动转换,方式如下:

1
2
Text value; //map传入的参数
String line =  new  String(value.getBytes(),  0 , value.getLength(), "GBK" );

    这边指定的GBK代表读入文件的编码,line返回的是解码成Unicode后的结果。 特别注意的是,这边需要指出value.getBytes()得到的byte数组的起始和长度,(0, value.getLength())。 千万不要这样调用:String line = new String(value.getBytes(),"GBK");这样得到的结果,末尾可能会有多余字串。看来内部是这样实现的:Text对象是被复用 的,共用一个byte数组(也可能是char数组)来存东西,下一次只是从头开始覆写这个数组,到后面如果没写到,那么原来的内容还会在。


引用一段镇方的话

    如果以TextInputFormat使用文件作为输入,map的输入value为Text类型,text内部实际维护的是一个byte数组,从输入文件中直接以byte的形式读入,不会对byte做转义:输入文件中的字节流直接进入text的byte数组。

    Text假设内部编码为utf8,调用Text.toString,Text会把内部byte当作utf8处理,如果读入文件的实际编码为gbk

  就会产生乱码;此时可以通过 new String(Text.getBytes(), 0, Text.getLength(), "输入文件实际编码")实现正确转义。

    以TextOutputFormat作为输出时,reduce过程会强制以utf8编码输出。但是这一步有一个小triky,Text的输出会调用它的Text.getBytes方法,而由于Text内部以byte数组形式存储,实际上可以放任意内容。

    注意:

    1. Hadoop的TextInput/TextOutput很系统编码完全无关,是通过代码硬性写入为UTF8的。

    2. Text和String对数据的处理是很不一样的。

Map Reduce输出结果为GBK文件

    这个是竹庄给的解决方法中文问题。基本上就是因为TextOutputFormat把输出编码写死成UTF-8了,自己把这个改掉就可以输出GBK了。

保存成Excel能识别的编码(Unicode)

    有的需求,需要用Excel来打开最后生成的文件。现在的方式是用Tab分割字段,这样就能在Excel中显示为不同的栏了。 困难一些的是保存的编码形式。我研究了下,发现本地的Excel文件是用Unicode编码的,于是在Java程序中注明了Unicode,或者UTF- 16,结果还是不行。再看一下,原来需要的是小端的Unicode,而默认UTF-16保存的是大端的,于是改成了UTF-16LE,指定为小端的。结果 还是失败,能用记事本正确打开,却不能用Excel打开。后来研究了下,原来还需要指定字节序的(BOM,Byte Order Mark)。最后写了如下代码搞定(这个方法把UTF-8的文件转成了Unicode的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private  void  convertToUnicode(FSDataInputStream convert_in,
             FSDataOutputStream convert_out,String head)  throws  IOException {
     BufferedReader in =  null ;
     BufferedWriter out =  null ;
     try  {
         in =  new  BufferedReader( new  InputStreamReader(convert_in,  "UTF-8" ));
         out =  new  BufferedWriter( new  OutputStreamWriter(convert_out,
                 "UTF-16LE" ));
         out.write( "\uFEFF" );
         out.write(head+ "\n" );
         String line;
         while  ((line = in.readLine()) !=  null ) {
             out.write(line);
             out.newLine();
         }
         out.flush();
     finally  {
         if (in!= null )
             in.close();
         if (out!= null )
             out.close();
     }
}

    这边还有个值得注意的现象,out.write("\uFEFF");我们可以查得,Unicode小端的BOM应该是FFFE,这边为什么反过来 了呢?原因在于Java的字节码顺序是大端的,\uFEFF在内存中表示为FF在前(低地址),FE在后,写出后即为FFFE(用UltraEditor 16进制查看,的确如此)。这边的现象应该和机器的大小端无关的。


转自:http://www.cnblogs.com/noures/archive/2012/08/17/2643841.html


相关阅读:使用java api操作Hadoop文件


*** walker ***

本文转自walker snapshot博客51CTO博客, 原文链接http://blog.51cto.com/walkerqt/1432238如需转载请自行联系原作者


RQSLT

相关文章
|
6天前
|
分布式计算 DataWorks Java
DataWorks操作报错合集之在使用MaxCompute的Java SDK创建函数时,出现找不到文件资源的情况,是BUG吗
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
19 0
|
27天前
|
存储 缓存 安全
Java 中 IO 流、File文件
Java 中 IO 流、File文件
|
2月前
|
前端开发 Java
Java压缩20M文件非常厉害
Java压缩20M文件非常厉害
27 1
|
2月前
|
Java BI API
Java如何实现文件批量导入导出(兼容xls,xlsx)
Java如何实现文件批量导入导出(兼容xls,xlsx)
40 0
|
2月前
|
Java
有关Java发送邮件信息(支持附件、html文件模板发送)
有关Java发送邮件信息(支持附件、html文件模板发送)
37 1
|
10天前
|
Java Unix Windows
|
14天前
|
Java 关系型数据库 MySQL
Elasticsearch【问题记录 01】启动服务&停止服务的2类方法【及 java.nio.file.AccessDeniedException: xx/pid 问题解决】(含shell脚本文件)
【4月更文挑战第12天】Elasticsearch【问题记录 01】启动服务&停止服务的2类方法【及 java.nio.file.AccessDeniedException: xx/pid 问题解决】(含shell脚本文件)
47 3
|
2天前
|
Oracle Java 关系型数据库
windows 下 win11 JDK17安装与环境变量的配置(配置简单详细,包含IJ中java文件如何使用命令运行)
本文介绍了Windows 11中安装JDK 17的步骤,包括从官方网站下载JDK、配置环境变量以及验证安装是否成功。首先,下载JDK 17的安装文件,如果没有Oracle账户,可以直接解压缩文件到指定目录。接着,配置系统环境变量,新建`JAVA_HOME`变量指向JDK安装路径,并在`Path`变量中添加。然后,通过命令行(cmd)验证安装,分别输入`java -version`和`javac -version`检查版本信息。最后,作者分享了如何在任意位置运行Java代码,包括在IntelliJ IDEA(IJ)中创建的Java文件,只需去掉包声明,就可以通过命令行直接运行。
17 0
|
4天前
|
存储 监控 Java
如何在Java中实现等待文件修改后再读取数据的功能?
如何在Java中实现等待文件修改后再读取数据的功能?
10 0
|
4天前
|
存储 Java 数据格式
Java实战:轻松掌握文件重命名与路径提取技巧
Java实战:轻松掌握文件重命名与路径提取技巧
11 0

热门文章

最新文章