MapReduce之小文件问题

简介: 针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然储了很多个文件,但是文件的体积并不大,这样就没有意义了。

小文件问题


针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然储了很多个文件,但是文件的体积并不大,这样就没有意义了。


针对MapReduce而言,每一个小文件都是一个Block,都会产生一个InputSplit,最终每一个小文件都产生一个map任务,这样会导致同时启动太多的Map任务,Map任务的启动是非常消耗性能的,但是启动了以后执行了很短时间就停止了,因为小文件的数据量太小了,这样就会造成任务执行消耗的时间还没有有启动任务消耗的时间多,这样也会影响MapReduce执行的效率。


HDFS提供了两种类型的容器,分别是SequenceFile 和 MapFile

SequenceFile


SequeceFile是Hadoop 提供的一种二进制文件,这种二进制文件直接将<key, value>对序列化到文件中。


一般对小文件可以使用这种文件合并,即将小文件的文件名作为key,文件内容作为value序列化到大文件中。


但是这个文件有一个缺点,就是它需要一个合并文件的过程,最终合并的文件会比较大,并且合并后的文件查看起来不方便,必须通过遍历才能查看里面的每一个小文件


所以这个SequenceFile 其实可以理解为把很多小文件压缩成一个大的压缩包了。

生成SequenceFile的java代码


public class SmallFileSeq {
    public static void main(String[] args) throws Exception{
        //生成SequenceFile文件
        write("D:\\smallFile","/seqFile");
        //读取SequenceFile文件
        read("/seqFile");
    }
    /**
     * 生成SequenceFile文件
     * @param inputDir 输入目录-windows目录
     * @param outputFile 输出文件-hdfs文件
     * @throws Exception
     */
    private static void write(String inputDir,String outputFile) throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS","hdfs://node01:9000");
        //获取操作HDFD的对象
        FileSystem fileSystem = FileSystem.get(conf);
        //删除HDFS上的输出文件
        fileSystem.delete(new Path(outputFile),true);
        //构造opts数组,有三个元素
        /*
        第一个是输出路径【文件】
        第二个是key的类型
        第三个是value的类型
         */
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                SequenceFile.Writer.file(new Path(outputFile)),
                SequenceFile.Writer.keyClass(Text.class),
                SequenceFile.Writer.valueClass(Text.class)
        };
        //创建了一个writer实例
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
        //指定需要压缩的文件的目录
        File inputDirPath = new File(inputDir);
        if(inputDirPath.isDirectory()){
            //获取目录中的文件
            File[] files = inputDirPath.listFiles();
            //迭代文件
            for (File file: files) {
                //获取文件的全部内容
                String content = FileUtils.readFileToString(file, "UTF-8");
                //获取文件名
                String fileName = file.getName();
                Text key = new Text(fileName);
                Text value = new Text(content);
                //向SequenceFile中写入数据
                writer.append(key,value);
            }
        }
        writer.close();
    }
    /**
     * 读取SequenceFile文件
     * @param inputFile SequenceFile文件路径
     * @throws Exception
     */
    private static void read(String inputFile) throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS","hdfs://node01:9000");
        //创建阅读器
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
        Text key = new Text();
        Text value = new Text();
        //循环读取数据
        while(reader.next(key,value)){
            //输出文件名称
            System.out.print("文件名:"+key.toString()+",");
            //输出文件内容
            System.out.println("文件内容:"+value.toString()+"");
        }
        reader.close();
    }
}

MapFile


MapFile是排序后的SequenceFile,MapFile由两部分组成,分别是index和data,index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。


在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件的位置。


因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存index数据。

MapFile的java代码


public class SmallFileMap {
    public static void main(String[] args) throws Exception{
        //生成MapFile文件
        write("D:\\smallFile","/mapFile");
        //读取MapFile文件
        read("/mapFile");
    }
    /**
     * 生成MapFile文件
     * @param inputDir 输入目录-windows目录
     * @param outputDir 输出目录-hdfs目录
     * @throws Exception
     */
    private static void write(String inputDir,String outputDir) throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS","hdfs://bigdata01:9000");
        //获取操作HDFD的对象
        FileSystem fileSystem = FileSystem.get(conf);
        //删除HDFS上的输出文件
        fileSystem.delete(new Path(outputDir),true);
        //构造opts数组,有两个元素
        /*
        第一个是key的类型
        第二个是value的类型
         */
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                MapFile.Writer.keyClass(Text.class),
                MapFile.Writer.valueClass(Text.class)
        };
        //创建了一个writer实例
        MapFile.Writer writer = new MapFile.Writer(conf, new Path(outputDir), opts);
        //指定需要压缩的文件的目录
        File inputDirPath = new File(inputDir);
        if(inputDirPath.isDirectory()){
            //获取目录中的文件
            File[] files = inputDirPath.listFiles();
            //迭代文件
            for (File file: files) {
                //获取文件的全部内容
                String content = FileUtils.readFileToString(file, "UTF-8");
                //获取文件名
                String fileName = file.getName();
                Text key = new Text(fileName);
                Text value = new Text(content);
                //向SequenceFile中写入数据
                writer.append(key,value);
            }
        }
        writer.close();
    }
    /**
     * 读取MapFile文件
     * @param inputDir MapFile文件路径
     * @throws Exception
     */
    private static void read(String inputDir)throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS","hdfs://bigdata01:9000");
        //创建阅读器
        MapFile.Reader reader = new MapFile.Reader(new Path(inputDir),conf);
        Text key = new Text();
        Text value = new Text();
        //循环读取数据
        while(reader.next(key,value)){
            //输出文件名称
            System.out.print("文件名:"+key.toString()+",");
            //输出文件内容
            System.out.println("文件内容:"+value.toString()+"");
        }
        reader.close();
    }
}

案例: SequenceFile实现小文件的存储和计算


有十个小文件,使用sequenceFile,写一个mapreduce程序观察提交到yarn上的map任务有几个?

public class WordCountJobSeq {
    /**
     * Map阶段
     */
    public static class MyMapper extends Mapper<Text, Text,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyMapper.class);
        /**
         * 需要实现map函数
         * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
         * @param k1
         * @param v1
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(Text k1, Text v1, Context context)
                throws IOException, InterruptedException {
            //输出k1,v1的值
            System.out.println("<k1,v1>=<"+k1.toString()+","+v1.toString()+">");
            //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
            //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
            //对获取到的每一行数据进行切割,把单词切割出来
            String[] words = v1.toString().split(" ");
            //迭代切割出来的单词数据
            for (String word : words) {
                //把迭代出来的单词封装成<k2,v2>的形式
                Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                //把<k2,v2>写出去
                context.write(k2,v2);
            }
        }
    }
    /**
     * Reduce阶段
     */
    public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyReducer.class);
        /**
         * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
         * @param k2
         * @param v2s
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
                throws IOException, InterruptedException {
            //创建一个sum变量,保存v2s的和
            long sum = 0L;
            //对v2s中的数据进行累加求和
            for(LongWritable v2: v2s){
                //输出k2,v2的值
                //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                sum += v2.get();
            }
            //组装k3,v3
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            //输出k3,v3的值
            //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            // 把结果写出去
            context.write(k3,v3);
        }
    }
    /**
     * 组装Job=Map+Reduce
     */
    public static void main(String[] args) {
        try{
            if(args.length!=2){
                //如果传递的参数不够,程序直接退出
                System.exit(100);
            }
            //指定Job需要的配置参数
            Configuration conf = new Configuration();
            //创建一个Job
            Job job = Job.getInstance(conf);
            //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
            job.setJarByClass(WordCountJobSeq.class);
            //指定输入路径(可以是文件,也可以是目录)
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            //指定输出路径(只能指定一个不存在的目录)
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            //指定map相关的代码
            job.setMapperClass(MyMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);
            //设置输入数据处理类
            job.setInputFormatClass(SequenceFileInputFormat.class);
            //指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            //指定k3的类型
            job.setOutputKeyClass(Text.class);
            //指定v3的类型
            job.setOutputValueClass(LongWritable.class);
            //提交job
            job.waitForCompletion(true);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}
目录
相关文章
|
5月前
|
存储 分布式计算 算法
MapReduce 处理压缩文件的能力
【8月更文挑战第12天】
58 4
|
7月前
|
数据采集 SQL 分布式计算
|
8月前
|
分布式计算
如何在MapReduce中处理多个输入文件?
如何在MapReduce中处理多个输入文件?
193 0
|
8月前
|
存储 分布式计算 自然语言处理
MapReduce【小文件的优化-Sequence文件】
MapReduce【小文件的优化-Sequence文件】
|
SQL 分布式计算 Hadoop
MapReduce - 读取 ORC, RcFile 文件
一.引言 MR 任务处理相关 hive 表数据时格式为 orc 和 rcFile,下面记录两种处理方法。 二.偷懒版读取 ORC,RcFile 文件 最初不太熟悉 mr,只会 textFormat 一种输入模式,于是遇到 orc 和 rcFile 形式的 hive 数据需要在 mr 读取时,都是先通过 INSERTOVERWRITEDIRECTORY 将 hive 表重新输出一份 hdfs 的 text 数据,随后用 mr 读取该 text 文件,该方法适合偷懒且原始 hive 数据不大,..
396 0
MapReduce - 读取 ORC, RcFile 文件
|
分布式计算
有一个日志文件visitlog.txt,其中记录了用户访问网站的日期和访问的网站地址信息,每行一条记录。要求编写mapreduce程序完成以下功能: 1、 将不同访问日期的访问记录分配给不同的red
有一个日志文件visitlog.txt,其中记录了用户访问网站的日期和访问的网站地址信息,每行一条记录。要求编写mapreduce程序完成以下功能: 1、 将不同访问日期的访问记录分配给不同的red
151 0
|
分布式计算 Hadoop
Hadoop学习:MapReduce实现文件的解压缩
Hadoop学习:MapReduce实现文件的解压缩
142 0
|
分布式计算 自然语言处理 Java
MapReduce实现与自定义词典文件基于hanLP的中文分词详解
文本分类任务的第1步,就是对语料进行分词。在单机模式下,可以选择python jieba分词,使用起来较方便。但是如果希望在Hadoop集群上通过mapreduce程序来进行分词,则hanLP更加胜任。
2739 0
|
分布式计算 DataWorks Java
[MaxCompute MapReduce实践]通过简单瘦身,解决Dataworks 10M文件限制问题
用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业。 解决方案: jar -resources test_mr.
3099 0