在实际开发中,我们肯定希望提高MapReduce的工作效率,其实提高MapReduce的效率,无非就是提高Map阶段和Reduce阶段的效率。
Map阶段优化之小文件问题
我们知道Map阶段中的MapTask个数是与InputSplit的个数有关的,一般一个InputSplit切片对应一个,而且InputSplit的个数我们一般也无法控制,应为默认就是128MB,但是往往我们的文件并不是这样,而是大小不一,有的可能300MB,一个可能只有10KB,尤其是为一群几十KB的小文件一个划分一个InputSplit切片,实在浪费资源。
而且针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然存储了很多个文件,但是文件的体积并不大,这样就没有意义了。
这就想到我之前的文章【自定义InputFormat】,我们可以这样来将多个小文件都合并到一个文件当中,进入map()方法后仍然需要我们进行处理才能使用,但这并不是标准的Sequence文件,它的输出键是一个文件名,值是我们文件的字节码,所以如果我们要实现一个WordCount,还需要进一步需要使用的话还需要针对每一行的值将字节码转为Stering,在做分词处理。而下面是真正生成一个Sequence文件的代码:
生成Sequence文件
我们需要将三个小文件合并成一个Sequence文件。
import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.DefaultCodec; import java.io.File; /** * 小文件解决方案之SequenceFile */ public class SmallFileSeq { public static void main(String[] args) throws Exception{ //在hdfs中生成SequenceFile文件到临时目录下 // write("D:\\smallFile","/tmp/seqFile"); //读取SequenceFile文件 // read("/tmp/seqFile"); //在windows端生成SequenceFile文件 // write("D:\\MapReduce_Data_Test\\myinputformat\\input","D:\\MapReduce_Data_Test\\myinputformat\\outputSeq"); write("D:\\MapReduce_Data_Test\\sequence\\word\\","D:\\MapReduce_Data_Test\\sequence\\wordSeq"); //读取SequenceFile文件 注意是文件地址 不是目录地址 // read("D:\\MapReduce_Data_Test\\myinputformat\\outputSeq"); } /** * 生成SequenceFile文件 * @param inputDir 输入目录-windows目录 * @param outputFile 输出文件-hdfs文件 * @throws Exception */ private static void write(String inputDir,String outputFile) throws Exception{ //创建一个配置对象 Configuration conf = new Configuration(); //指定HDFS的地址 // conf.set("fs.defaultFS","hdfs://hadoop102:9000"); //获取操作文件系统对象 FileSystem fileSystem = FileSystem.get(conf); //删除输出文件 fileSystem.delete(new Path(outputFile),true); //构造opts数组,有三个元素 /* 第一个是输出路径 第二个是key类型 第三个是value类型 */ SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{ SequenceFile.Writer.file(new Path(outputFile)), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)}; //创建一个writer实例 SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts); //指定要压缩的文件的目录 File inputDirPath = new File(inputDir); if(inputDirPath.isDirectory()){ File[] files = inputDirPath.listFiles(); for (File file : files) { //获取文件全部内容 String content = FileUtils.readFileToString(file, "UTF-8"); //文件名作为key Text key = new Text(file.getName()); //文件内容作为value Text value = new Text(content); writer.append(key,value); } } writer.close(); } /** * 读取SequenceFile文件 * @param inputFile SequenceFile文件路径 注意是文件地址不是目录地址 * @throws Exception */ private static void read(String inputFile) throws Exception{ //创建一个配置对象 Configuration conf = new Configuration(); //指定HDFS的地址 // conf.set("fs.defaultFS","hdfs://hadoop102:9000"); // 指定windows端的文件路径,注意在Windows系统中,文件系统没有端口号的概念。因此,只需要使用file://协议指定文件路径即可连接本地文件系统。 conf.set("fs.defaultFS","file:///"); //创建阅读器 SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile))); Text key = new Text(); Text value = new Text(); //循环读取数据 while(reader.next(key,value)){ //输出文件名称 System.out.print("文件名:"+key.toString()+","); //输出文件的内容 System.out.println("文件内容:\n"+value.toString()); } reader.close(); } }
输入:
输出:
我们可以看到一共会输出两个文件,下面的 outputSeq是真正的Sequence文件 ,文件名是我们自己指定的,而上面的.outputSeq.crc则是一个校验码文件。
接下来要做的就是如何写一个MapReduce程序将我们的小文件内容从这个合并后的Sequence文件中读取出来
Mapper类
我们需要在Job中指定输入格式InputFormat为SequenceFileInputFormat,我们读取Sequence文件只需要指定键和值都为Text即可。
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class SeqMapper extends Mapper<Text, Text,Text,LongWritable> { private Text OUT_KEY = new Text(); private LongWritable OUT_VALUE = new LongWritable(1); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); System.out.println(line); String[] words = StringUtils.split(line,'\t'); for (String word : words) { System.out.println("key:" + word); OUT_KEY.set(word); context.write(OUT_KEY,OUT_VALUE); } } }
Reducer类
依旧和之前的WordCount没什么两样。
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SeqReducer extends Reducer<Text, LongWritable,Text,LongWritable> { private LongWritable OUT_VALUE = new LongWritable(); @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { sum += val.get(); } OUT_VALUE.set(sum); context.write(key,OUT_VALUE); } }
Runner类
与之前不同的是修改输入格式为SequenceFileInputFormat,而且我们读取Sequence文件时,需要指定键和值的类型都为Text类型。
这个值并不是BytesWritable类型,如果设置为BytesWritable会报错:org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.BytesWritable。这就说明Sequence文件读取时的值类型为Text类型。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SeqRunner extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(),new SeqRunner(),args); } @Override public int run(String[] args) throws Exception { //1.获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sequence file word count"); //2.配置jar包路径 job.setJarByClass(SeqRunner.class); //3.关联mapper和reducer job.setMapperClass(SeqMapper.class); job.setReducerClass(SeqReducer.class); //设置输入格式为 Sequence文件格式 job.setInputFormatClass(SequenceFileInputFormat.class); //4.设置map、reduce输出的k、v类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径 FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\sequence\\input")); //6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径 FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\sequence\\output")); return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息 } }