MapReduce【小文件的优化-Sequence文件】

简介: MapReduce【小文件的优化-Sequence文件】

在实际开发中,我们肯定希望提高MapReduce的工作效率,其实提高MapReduce的效率,无非就是提高Map阶段和Reduce阶段的效率。

Map阶段优化之小文件问题

我们知道Map阶段中的MapTask个数是与InputSplit的个数有关的,一般一个InputSplit切片对应一个,而且InputSplit的个数我们一般也无法控制,应为默认就是128MB,但是往往我们的文件并不是这样,而是大小不一,有的可能300MB,一个可能只有10KB,尤其是为一群几十KB的小文件一个划分一个InputSplit切片,实在浪费资源。


而且针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然存储了很多个文件,但是文件的体积并不大,这样就没有意义了。


       这就想到我之前的文章【自定义InputFormat】,我们可以这样来将多个小文件都合并到一个文件当中,进入map()方法后仍然需要我们进行处理才能使用,但这并不是标准的Sequence文件,它的输出键是一个文件名,值是我们文件的字节码,所以如果我们要实现一个WordCount,还需要进一步需要使用的话还需要针对每一行的值将字节码转为Stering,在做分词处理。而下面是真正生成一个Sequence文件的代码:

生成Sequence文件

我们需要将三个小文件合并成一个Sequence文件。

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import java.io.File;
/**
 * 小文件解决方案之SequenceFile
 */
public class SmallFileSeq {
    public static void main(String[] args) throws Exception{
        //在hdfs中生成SequenceFile文件到临时目录下
//        write("D:\\smallFile","/tmp/seqFile");
        //读取SequenceFile文件
//        read("/tmp/seqFile");
        //在windows端生成SequenceFile文件
//        write("D:\\MapReduce_Data_Test\\myinputformat\\input","D:\\MapReduce_Data_Test\\myinputformat\\outputSeq");
        write("D:\\MapReduce_Data_Test\\sequence\\word\\","D:\\MapReduce_Data_Test\\sequence\\wordSeq");
        //读取SequenceFile文件 注意是文件地址 不是目录地址
//        read("D:\\MapReduce_Data_Test\\myinputformat\\outputSeq");
    }
    /**
     * 生成SequenceFile文件
     * @param inputDir 输入目录-windows目录
     * @param outputFile 输出文件-hdfs文件
     * @throws Exception
     */
    private static void write(String inputDir,String outputFile)
            throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
//        conf.set("fs.defaultFS","hdfs://hadoop102:9000");
        //获取操作文件系统对象
        FileSystem fileSystem = FileSystem.get(conf);
        //删除输出文件
        fileSystem.delete(new Path(outputFile),true);
        //构造opts数组,有三个元素
        /*
        第一个是输出路径
        第二个是key类型
        第三个是value类型
         */
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                SequenceFile.Writer.file(new Path(outputFile)),
                SequenceFile.Writer.keyClass(Text.class),
                SequenceFile.Writer.valueClass(Text.class)};
        //创建一个writer实例
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
        //指定要压缩的文件的目录
        File inputDirPath = new File(inputDir);
        if(inputDirPath.isDirectory()){
            File[] files = inputDirPath.listFiles();
            for (File file : files) {
                //获取文件全部内容
                String content = FileUtils.readFileToString(file, "UTF-8");
                //文件名作为key
                Text key = new Text(file.getName());
                //文件内容作为value
                Text value = new Text(content);
                writer.append(key,value);
            }
        }
        writer.close();
    }
    /**
     * 读取SequenceFile文件
     * @param inputFile SequenceFile文件路径 注意是文件地址不是目录地址
     * @throws Exception
     */
    private static void read(String inputFile)
            throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
//        conf.set("fs.defaultFS","hdfs://hadoop102:9000");
//        指定windows端的文件路径,注意在Windows系统中,文件系统没有端口号的概念。因此,只需要使用file://协议指定文件路径即可连接本地文件系统。
        conf.set("fs.defaultFS","file:///");
        //创建阅读器
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
        Text key = new Text();
        Text value = new Text();
        //循环读取数据
        while(reader.next(key,value)){
            //输出文件名称
            System.out.print("文件名:"+key.toString()+",");
            //输出文件的内容
            System.out.println("文件内容:\n"+value.toString());
        }
        reader.close();
    }
}

输入:

输出:  

       我们可以看到一共会输出两个文件,下面的 outputSeq是真正的Sequence文件 ,文件名是我们自己指定的,而上面的.outputSeq.crc则是一个校验码文件。

接下来要做的就是如何写一个MapReduce程序将我们的小文件内容从这个合并后的Sequence文件中读取出来

Mapper类

我们需要在Job中指定输入格式InputFormat为SequenceFileInputFormat,我们读取Sequence文件只需要指定键和值都为Text即可

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
public class SeqMapper extends Mapper<Text, Text,Text,LongWritable> {
    private Text OUT_KEY = new Text();
    private LongWritable OUT_VALUE = new LongWritable(1);
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        System.out.println(line);
        String[] words = StringUtils.split(line,'\t');
        for (String word : words) {
            System.out.println("key:" + word);
            OUT_KEY.set(word);
            context.write(OUT_KEY,OUT_VALUE);
        }
    }
}

Reducer类

依旧和之前的WordCount没什么两样。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SeqReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
    private LongWritable OUT_VALUE = new LongWritable();
    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {
        long sum = 0;
        for (LongWritable val : values) {
            sum += val.get();
        }
        OUT_VALUE.set(sum);
        context.write(key,OUT_VALUE);
    }
}

Runner类

与之前不同的是修改输入格式为SequenceFileInputFormat,而且我们读取Sequence文件时,需要指定键和值的类型都为Text类型。

这个值并不是BytesWritable类型,如果设置为BytesWritable会报错:org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.BytesWritable。这就说明Sequence文件读取时的值类型为Text类型。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SeqRunner extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new SeqRunner(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        //1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "sequence file word count");
        //2.配置jar包路径
        job.setJarByClass(SeqRunner.class);
        //3.关联mapper和reducer
        job.setMapperClass(SeqMapper.class);
        job.setReducerClass(SeqReducer.class);
        //设置输入格式为 Sequence文件格式
        job.setInputFormatClass(SequenceFileInputFormat.class);
        //4.设置map、reduce输出的k、v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径
        FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\sequence\\input"));
        //6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径
        FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\sequence\\output"));
        return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息
    }
}

相关文章
|
5月前
|
存储 分布式计算 算法
MapReduce 处理压缩文件的能力
【8月更文挑战第12天】
58 4
|
5月前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
713 0
|
7月前
|
数据采集 SQL 分布式计算
|
8月前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
163 2
|
8月前
|
分布式计算
如何在MapReduce中处理多个输入文件?
如何在MapReduce中处理多个输入文件?
192 0
|
8月前
|
分布式计算
MapReduce【数据倾斜的优化】
MapReduce【数据倾斜的优化】
|
分布式计算 Java Hadoop
39 MAPREDUCE参数优化
39 MAPREDUCE参数优化
78 0
|
缓存 分布式计算 调度
MapReduce 优化经验
MapReduce 优化经验
141 0
|
存储 分布式计算 资源调度
MapReduce之小文件问题
针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然储了很多个文件,但是文件的体积并不大,这样就没有意义了。
147 0
|
分布式计算
有一个日志文件visitlog.txt,其中记录了用户访问网站的日期和访问的网站地址信息,每行一条记录。要求编写mapreduce程序完成以下功能: 1、 将不同访问日期的访问记录分配给不同的red
有一个日志文件visitlog.txt,其中记录了用户访问网站的日期和访问的网站地址信息,每行一条记录。要求编写mapreduce程序完成以下功能: 1、 将不同访问日期的访问记录分配给不同的red
151 0