Hadoop HDFS编程 API入门系列之合并小文件到HDFS(三)

简介:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 代码版本1

复制代码
  1 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs7;
  2 
  3 import java.io.IOException;
  4 import java.net.URI;
  5 import java.net.URISyntaxException;
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.fs.FSDataInputStream;
  8 import org.apache.hadoop.fs.FSDataOutputStream;
  9 import org.apache.hadoop.fs.FileStatus;
 10 import org.apache.hadoop.fs.FileSystem;
 11 import org.apache.hadoop.fs.FileUtil;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.fs.PathFilter;
 14 import org.apache.hadoop.io.IOUtils;
 15 /**
 16  * function 合并小文件至 HDFS 
 17  * 
 18  *
 19  */
 20 public class MergeSmallFilesToHDFS 
 21 {
 22     private static FileSystem fs = null;  //定义文件系统对象,是HDFS上的
 23     private static FileSystem local = null; //定义文件系统对象,是本地上的
 24     
 25     /**
 26      * @function main 
 27      * @param args
 28      * @throws IOException
 29      * @throws URISyntaxException
 30      */
 31     
 32     public static void main(String[] args) throws IOException,URISyntaxException{
 33     
 34         list();
 35     }
 36 
 37     /**
 38      * 
 39      * @throws IOException
 40      * @throws URISyntaxException
 41      */
 42     public static void list() throws IOException, URISyntaxException{
 43         // 读取hadoop配置文件
 44         Configuration conf = new Configuration();
 45         // 文件系统访问接口和创建FileSystem对象,在本地上运行模式
 46         URI uri = new URI("hdfs://HadoopMaster:9000");
 47         fs = FileSystem.get(uri, conf);
 48         // 获得本地文件系统
 49         local = FileSystem.getLocal(conf);
 50         // 过滤目录下的 svn 文件
 51         FileStatus[] dirstatus = local.globStatus(new Path("./data/mergeSmallFilesToHDFS/73/*"),new RegexExcludePathFilter("^.*svn$"));
 52 //    FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$"));
 53         //获取D:\Data\tvdata目录下的所有文件路径
 54         Path[] dirs = FileUtil.stat2Paths(dirstatus);
 55         FSDataOutputStream out = null;
 56         FSDataInputStream in = null;
 57         for (Path dir : dirs) 
 58         {//比如拿2012-09-17为例
 59             //将文件夹名称2012-09-17的-去掉,直接,得到20120901文件夹名称
 60             String fileName = dir.getName().replace("-", "");//文件名称
 61             //只接受20120917日期目录下的.txt文件
 62             FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
 63             // 获得20120917日期目录下的所有文件
 64             Path[] listedPaths = FileUtil.stat2Paths(localStatus);
 65             // 输出路径
 66             Path block = new Path("hdfs://HadoopMaster:9000/middle/tv/"+ fileName + ".txt");
 67             System.out.println("合并后的文件名称:"+fileName+".txt");
 68             // 打开输出流
 69             out = fs.create(block);    
 70             //循环20120917日期目录下的所有文件
 71             for (Path p : listedPaths){//这是星型for循环,即listedPaths的值传给Path p
 72                 in = local.open(p);// 打开输入流
 73                 IOUtils.copyBytes(in, out, 4096, false); // 复制数据
 74                 // 关闭输入流
 75                 in.close();
 76             }
 77             if (out != null){
 78                 // 关闭输出流
 79                 out.close();
 80             }
 81             //当循环完20120917日期目录下的所有文件之后,接着依次20120918,20120919,,,
 82         }
 83     }
 84 
 85     /**
 86      * 
 87      * @function 过滤 regex 格式的文件
 88      *
 89      */
 90     public static class RegexExcludePathFilter implements PathFilter{
 91         private final String regex;
 92 
 93         public RegexExcludePathFilter(String regex){
 94             this.regex = regex;
 95         }
 96 
 97         
 98         public boolean accept(Path path){
 99             // TODO Auto-generated method stub
100             boolean flag = path.toString().matches(regex);
101             return !flag;
102         }
103 
104     }
105 
106     /**
107      * 
108      * @function 接受 regex 格式的文件
109      *
110      */
111     public static class RegexAcceptPathFilter implements PathFilter{
112         private final String regex;
113 
114         public RegexAcceptPathFilter(String regex){
115             this.regex = regex;
116         }
117 
118     
119         public boolean accept(Path path){
120             // TODO Auto-generated method stub
121             boolean flag = path.toString().matches(regex);
122             return flag;
123         }
124 
125     }
126 }
复制代码

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码版本2

复制代码
  1 package com.dajiangtai.Hadoop.HDFS;
  2 
  3 import java.io.IOException;
  4 import java.net.URI;
  5 import java.net.URISyntaxException;
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.fs.FSDataInputStream;
  8 import org.apache.hadoop.fs.FSDataOutputStream;
  9 import org.apache.hadoop.fs.FileStatus;
 10 import org.apache.hadoop.fs.FileSystem;
 11 import org.apache.hadoop.fs.FileUtil;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.fs.PathFilter;
 14 import org.apache.hadoop.hdfs.DistributedFileSystem;
 15 import org.apache.hadoop.io.IOUtils;
 16 /**
 17  * function 合并小文件至 HDFS     ,  文件与块大小(比如128M)来比,小的话,称为小文件。是一个相对概念!相对于数据块而言的!
 18  * @author 小讲
 19  *  我们利用通配符和PathFilter 对象,将本地多种格式的文件上传至 HDFS文件系统,并过滤掉 txt文本格式以外的文件。
 20  */
 21 public class MergeSmallFilesToHDFS {
 22     private static FileSystem fs = null;
 23     private static FileSystem local = null;
 24     /**
 25      * @function main 
 26      * @param args
 27      * @throws IOException
 28      * @throws URISyntaxException
 29      */
 30     public static void main(String[] args) throws IOException,
 31             URISyntaxException {
 32         list();
 33     }
 34 
 35     /**
 36      * 
 37      * @throws IOException
 38      * @throws URISyntaxException
 39      */
 40     public static void list() throws IOException, URISyntaxException {
 41         // 读取hadoop文件系统的配置
 42         Configuration conf = new Configuration();
 43 //        conf=Configuration
 44 //        conf是Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml
 45         
 46         //文件系统访问接口
 47         URI uri = new URI("hdfs://djt002:9000");
 48 //        uri=URI
 49 //        uri是hdfs://djt002:9000
 50         
 51 //        URL、URI与Path三者的区别
 52 //        Hadoop文件系统中通过Hadoop Path对象来代表一个文件    
 53 //        URL(相当于绝对路径)    ->   (文件) ->    URI(相当于相对路径,即代表URL前面的那一部分)
 54 //        URI:如hdfs://dajiangtai:9000
 55 //        如,URL.openStream
 56         
 57         
 58         
 59         //获得FileSystem实例,即HDFS
 60         fs = FileSystem.get(uri, conf);
 61 //        fs=DistributedFileSystem
 62 //        fs是DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1814566850_1, ugi=Administrator (auth:SIMPLE)]]
 63         
 64         //获得FileSystem实例,即Local
 65         local = FileSystem.getLocal(conf);
 66 //        local=LocalFileSystem
 67 //        local是org.apache.hadoop.fs.LocalFileSystem@3ce1b8c5
 68 //            为什么要获取到Local呢,因为,我们要把本地D盘下data/73目录下的文件要合并后,上传到HDFS里,所以,我们需先获取到Local,再来做合并工作啦!
 69         
 70         
 71 //        18、列出文件或目录内容(主要是存放文件或目录的元数据,即大小,权限,副本,,,)
 72 //        public FileStatus[] listStatus(Path f) throws IOException
 73 //        public FileStatus[] listStatus(Path f,PathFilter filter) throws IOException
 74 //                PathFilter是路径过滤器
 75 //        public FileStatus[] listStatus(Path[] files) throws IOException
 76 //        public FileStatus[] listStatus(Path[] files,PathFilter filter)
 77 //                传送Path数组和路径过滤器
 78 //                
 79 //                
 80 //        19、FileUtil中的stat2Paths(),将一个FileStatus元数据对象数组转换为一个Path对象数组
 81 //
 82 //        20、(1)使用通配符来匹配多个目录下的多个文件,也是列出文件或目录内容(主要是存放文件或目录的元数据,即大小,权限,副本,,,)
 83 //        public FileStatus[] globStatus(Path pathPattern) throws IOException
 84 //        public FileStatus[] globStatus(Path pathPattern,PathFilter filter) throws IOException
 85 //                    
 86 //          (2)PathFilter对象
 87 //        public interface PathFilter{
 88 //            boolean accpet(Path path);
 89 //        }        
 90         
 91         
 92         
 93         //过滤目录下的 svn 文件,globStatus从第一个参数通配符合到文件,剔除满足第二个参数到结果,因为PathFilter中accept是return!  
 94         FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$"));//一般这是隐藏文件,所以得排除
 95         //dirstatus=FileStatus[7]
 96 //        dirstatus是[DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17; isDirectory=true; modification_time=1427791478002; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
 97 //        , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18; isDirectory=true; modification_time=1427791505373; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
 98 //        , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-19; isDirectory=true; modification_time=1427791532277; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
 99 //        , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-20; isDirectory=true; modification_time=1427791553035; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
100 //        , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-21; isDirectory=true; modification_time=1427791577709; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
101 //        , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-22; isDirectory=true; modification_time=1427791602770; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}
102 //        , DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-23; isDirectory=true; modification_time=1427791647177; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false}]
103         
104                         
105                         //        ^表示匹配我们字符串开始的位置               *代表0到多个字符                        $代表字符串结束的位置
106 //        RegexExcludePathFilter来只排除我们不需要的,即svn格式
107 //        RegexExcludePathFilter这个方法我们自己写
108         
109 //        但是我们,最终是要处理文件里的东西,最终是要转成Path类型,因为Path对象f,它对应着一个文件。
110         
111         //获取73目录下的所有文件路径,注意FIleUtil中stat2Paths()的使用,它将一个FileStatus对象数组转换为Path对象数组。
112         Path[] dirs = FileUtil.stat2Paths(dirstatus);//dirstatus是FileStatus数组类型
113 //        dirs=Path[7]
114 //        dirs是    [file:/D:/data/73/2012-09-17
115 //                 , file:/D:/data/73/2012-09-18
116 //                 , file:/D:/data/73/2012-09-19
117 //                 , file:/D:/data/73/2012-09-20
118 //                 , file:/D:/data/73/2012-09-21
119 //                 , file:/D:/data/73/2012-09-22
120 //                 , file:/D:/data/73/2012-09-23]        
121                 
122         
123         FSDataOutputStream out = null;//输出流
124 //        out=HdfsDaDataOutputStream
125 //        out是org.apache.hadoop.hdfs.client.HdfsDataOutputStream@2b11624e
126         
127         FSDataInputStream in = null;//输入流
128 //        in=ChecksumFileSystem&FSDataBoundedInputStream
129 //        in是org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream@526d542f
130         
131 //        很多人搞不清输入流和输出流,!!!!
132 //        其实啊,输入流、输出流都是针对内存的
133 //        往内存里写,是输入流。
134 //        内存往文件里写,是输出Luis。
135 //        
136 //        比如一个文件A复制到另一文件B,那么,先写到内存里,再写到文件B。
137 //           =>   则文件A写到内存里,叫输入流。
138 //           =>    则内存里写到文件B,叫输出流    
139         
140         
141         for (Path dir : dirs) {//for星型循环,即将dirs是Path对象数组,一一传给Path dir
142 //            dirs=Path[7]
143 //            dirs是[file:/D:/data/73/2012-09-17
144 //                  , file:/D:/data/73/2012-09-18
145 //                  , file:/D:/data/73/2012-09-19
146 //                  , file:/D:/data/73/2012-09-20
147 //                  , file:/D:/data/73/2012-09-21
148 //                  , file:/D:/data/73/2012-09-22
149 //                  , file:/D:/data/73/2012-09-23]    
150             
151 //        dir= Path    
152 //        先传,dir是file:/D:/data/73/2012-09-17
153 //        再传,file:/D:/data/73/2012-09-18           
154 //        再传,file:/D:/data/73/2012-09-19     
155 //        再传,file:/D:/data/73/2012-09-20       
156 //        再传,file:/D:/data/73/2012-09-21       
157 //        再传,file:/D:/data/73/2012-09-22       
158 //        再传,file:/D:/data/73/2012-09-23       
159             
160             String fileName = dir.getName().replace("-", "");//文件名称
161 //                        先获取到如2012-09-17,然后经过replace("-", ""),得到20120917
162 //                                                                再获取,20120918
163 //                                                                再获取,20120919
164 //                                                                再获取,20120920
165 //                                                                再获取,20120921
166 //                                                                再获取,20120922
167 //                                                                再获取,20120923            
168             
169             //只接受日期目录下的.txt文件,^匹配输入字符串的开始位置,$匹配输入字符串的结束位置,*匹配0个或多个字符。
170             FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
171 //            先获取到,localStatus=FileStatus[23]
172 //                   localStatus是[DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917000000.txt; isDirectory=false; length=1111961; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917001500.txt; isDirectory=false; length=782533; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917003000.txt; isDirectory=false; length=593507; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917004500.txt; isDirectory=false; length=839019; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917010000.txt; isDirectory=false; length=866393; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917011500.txt; isDirectory=false; length=678491; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917013000.txt; isDirectory=false; length=593292; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917014500.txt; isDirectory=false; length=688620; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917020000.txt; isDirectory=false; length=674864; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917021500.txt; isDirectory=false; length=635052; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917023000.txt; isDirectory=false; length=547324; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917024500.txt; isDirectory=false; length=598814; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917030000.txt; isDirectory=false; length=542600; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917031500.txt; isDirectory=false; length=535446; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917033000.txt; isDirectory=false; length=592780; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917034500.txt; isDirectory=false; length=619410; replication=1; blocksize=33554432; modification_time=1398669216000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917040000.txt; isDirectory=false; length=590326; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917041500.txt; isDirectory=false; length=428487; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917043000.txt; isDirectory=false; length=598048; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917044500.txt; isDirectory=false; length=598792; replication=1; blocksize=33554432; modification_time=1398669216000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917050000.txt; isDirectory=false; length=575613; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917051500.txt; isDirectory=false; length=619080; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-17/ars10767@20120917053000.txt; isDirectory=false; length=587763; replication=1; blocksize=33554432; modification_time=1398669214000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}]
173 //                   再获取到,localStatus=FileStatus[23]
174 //            localStatus是[DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918131500.txt; isDirectory=false; length=1722797; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918133000.txt; isDirectory=false; length=1922955; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918134500.txt; isDirectory=false; length=1388036; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918140000.txt; isDirectory=false; length=1888871; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918141500.txt; isDirectory=false; length=1685719; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918143000.txt; isDirectory=false; length=1541381; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918144500.txt; isDirectory=false; length=1723638; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918150000.txt; isDirectory=false; length=1629322; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918151500.txt; isDirectory=false; length=1658684; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918153000.txt; isDirectory=false; length=1548216; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918154500.txt; isDirectory=false; length=1510965; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918160000.txt; isDirectory=false; length=1559078; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918161500.txt; isDirectory=false; length=1752005; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918163000.txt; isDirectory=false; length=1901994; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918164500.txt; isDirectory=false; length=2234304; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918170000.txt; isDirectory=false; length=1912051; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918171500.txt; isDirectory=false; length=1711317; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918173000.txt; isDirectory=false; length=1799747; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918174500.txt; isDirectory=false; length=2038653; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918180000.txt; isDirectory=false; length=2341515; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918181500.txt; isDirectory=false; length=2396977; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918183000.txt; isDirectory=false; length=2382769; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}, DeprecatedRawLocalFileStatus{path=file:/D:/data/73/2012-09-18/ars10767@20120918184500.txt; isDirectory=false; length=2709048; replication=1; blocksize=33554432; modification_time=1398669244000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}]
175 //            再获取到,,,,不多赘述。
176             
177             
178 //            FileStatus[] localStatus = local.listStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));//试试,看有什么区别?
179 
180 //            如果不设置过滤器,FileInputFormat 会使用一个默认的过滤器来排除隐藏文件。 
181 //            如果通过调用 setInputPathFilter()设置了过滤器,它会在默认过滤器的基础上进行过滤。换句话说,自定义的过滤器只能看到非隐藏文件。
182             
183             
184                     //RegexAcceptPathFilter这个方法,我们自己写
185 //            RegexAcceptPathFilter来只接收我们需要,即txt格式
186 //            这里,我们还可以只接收别的格式,自己去改,一定要锻炼学会改别人的代码
187             
188             
189             // 获得如2012-09-17日期目录下的所有文件
190             Path[] listedPaths = FileUtil.stat2Paths(localStatus);
191 //            同样,但是我们,最终是要处理文件里的东西,最终是要转成Path类型,因为Path对象f,它对应着一个文件。
192             
193 //            先获取,listedPaths=Path[23]
194 //            先获取2012-09-17下的所有,这个不多赘述啦!
195             
196 //            再获取,listedPaths=Path[23]
197 //            listedPaths是[file:/D:/data/73/2012-09-18/ars10767@20120918131500.txt
198 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918133000.txt
199 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918134500.txt
200 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918140000.txt
201 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918141500.txt
202 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918143000.txt
203 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918144500.txt
204 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918150000.txt
205 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918151500.txt
206 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918153000.txt
207 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918154500.txt
208 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918160000.txt
209 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918161500.txt
210 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918163000.txt
211 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918164500.txt
212 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918170000.txt
213 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918171500.txt
214 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918173000.txt
215 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918174500.txt
216 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918180000.txt
217 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918181500.txt
218 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918183000.txt
219 //                         , file:/D:/data/73/2012-09-18/ars10767@20120918184500.txt]
220             
221             //输出路径
222             Path block = new Path("hdfs://djt002:9000/outData/MergeSmallFilesToHDFS/"+ fileName + ".txt");
223             //fileName是"fileName"
224 //            block=Path
225 //            block是hdfs://djt002:9000/outData/MergeSmallFilesToHDFS/20120918.txt
226             
227             // 打开输出流
228             out = fs.create(block);//因为,合并小文件之后,比如这是,合并2012-09-17日期目录下的所有小文件,之后,要上传到HDFS里。
229 //                类似于,文件A写到内存里,再内存里写到文件B。而这行代码out = fs.create(block);是相当于是,内存里写到文件B。所以是输出流,即是从内存里输出的,所以叫输出流。
230 //                            这里,文件A是Local                文件B是HDFS
231 
232 //                                        文件与块大小(比如128M)来比,小的话,称为小文件。是一个相对概念!相对于数据块而言的!
233             
234 //            很多人搞不清输入流和输出流,!!!!
235 //            其实啊,输入流、输出流都是针对内存的
236 //            往内存里写,是输入流。
237 //            内存往文件里写,是输出Luis。
238 //            
239 //            比如一个文件A复制到另一文件B,那么,先写到内存里,再写到文件B。
240 //               =>   则文件A写到内存里,叫输入流。
241 //               =>    则内存里写到文件B,叫输出流    
242             
243             
244             for (Path p : listedPaths) {//for星型循环,即将listedPaths的值一一传给Path p
245             //先获取2012-09-17下的所有,这个不多赘述啦!
246             //现在,获取到2012-09-18下了
247 //            p=Path
248 //            p是file:/D:/data/73/2012-09-18/ars10767@20120918134500.txt
249 //            得一个一个来,这才叫做一一传给Path p
250                 
251                 in = local.open(p);// 打开输入流in
252 //                类似于,文件A写到内存里,再内存里写到文件B。而这行代码in = local.open(p);是相当于是,文件A写到内存里。所以是输如流,即是写到内存里的,所以叫输入流。
253 //                    这里,文件A是Local                文件B是HDFS
254                 
255                 IOUtils.copyBytes(in, out, 4096, false); // 复制数据,IOUtils.copyBytes可以方便地将数据写入到文件,不需要自己去控制缓冲区,也不用自己去循环读取输入源。false表示不自动关闭数据流,那么就手动关闭。
256 //                IOUtils.copyBytes这个方法很重要
257                                 //是否自动关闭输入流和输出流,若是false,就要单独去关闭。则不在这个方法体里关闭输入和输出流了。
258 //                                                     若是true,则在这个方法里关闭输入和输出流。不需单独去关闭了
259                 
260                 
261 //                明白,IOUtils类的copyBytes将hdfs数据流拷贝到标准输出流System.out中,
262 //                copyBytes前两个参数好理解,一个输入,一个输出,第三个是缓存大小,第四个指定拷贝完毕后是否关闭流。
263 //                要设置为false,标准输出流不关闭,我们要手动关闭输入流。即,设置为false表示关闭输入流
264                 
265 //                主要是把最后的这个参数定义好, 就可以了。 定义为true还是false,则决定着是否在这个方法体里关闭
266 //                若定义为true,则在这个方法体里直接关闭输入流、输出流。不需单独去关闭了
267 //                若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了
268                 
269                 
270                 // 关闭输入流
271                 in.close();//若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了。这就是单独在关闭输入流!!!懂了吗
272             }
273             if (out != null) {//这里为什么不为空,空指针,则说明里面还有资源。
274                 // 关闭输出流
275                 out.close();//若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了。这就是单独在关闭输出流!!!懂了吗
276             }
277         }
278         
279     }
280 
281     /**
282      * 
283      * @function 过滤 regex 格式的文件
284      *
285      */
286     public static class RegexExcludePathFilter implements PathFilter {
287         private final String regex;//变量
288 
289         public RegexExcludePathFilter(String regex) {//这个是上面的那个,正在表达式
290             this.regex = regex;//将String regex的值,赋给RegexExcludePathFilter类里的private final String regex的值
291         }
292 
293         public boolean accept(Path path) {//主要是实现accept方法
294             // TODO Auto-generated method stub
295             boolean flag = path.toString().matches(regex);//匹配正则表达式,这里是^.*svn$
296             return !flag;
297         }
298 
299     }
300 
301     /**
302      * 
303      * @function 接受 regex 格式的文件
304      *
305      */
306     public static class RegexAcceptPathFilter implements PathFilter {
307         private final String regex;//变量
308 
309         public RegexAcceptPathFilter(String regex) {//这个是上面的那个,正在表达式
310             this.regex = regex;//将String regex的值,赋给RegexAcceptPathFilter类里的private final String regex的值
311         }
312 
313         public boolean accept(Path path) {//主要是实现accept方法
314             // TODO Auto-generated method stub
315             boolean flag = path.toString().matches(regex);//匹配正则表达式,这里是^.*txt$
316             return flag;
317         }
318 
319     }
320 }
复制代码

 



本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6174553.html,如需转载请自行联系原作者
相关文章
|
1月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
133 6
|
1月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
59 3
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
32 4
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
42 2
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
79 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
35 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
44 0
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
62 2
|
9天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
45 2
|
10天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
43 1
下一篇
无影云桌面