大数据Hadoop小文件问题与企业级解决方案

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Hadoop小文件问题与企业级解决方案

1 MapReduce性能优化

现在大家已经掌握了MapReduce程序的开发步骤,注意了,针对MapReduce的案例我们并没有讲太多,主要是因为在实际工作中真正需要我们去写MapReduce代码的场景已经是凤毛麟角了,因为后面我们会学习一个大数据框架Hive,Hive支持SQL,这个Hive底层会把SQL转化为MapReduce执行,不需要 我们写一行代码,所以说工作中的大部分需求我们都使用SQL去实现了,谁还苦巴巴的来写代码啊,一行SQL能抵你写的几十行代码,你还想去写MapReduce代码吗,肯定不想了。

但是MapReduce代码的开发毕竟是基本功,所以前面我们也详细的讲解了它的开发流程。

虽然现在MapReduce代码写的很少了,但是针对MapReduce程序的性能优化是少不了的,面试也是经

常会问到的,所以下面我们就来分析一下MapReduce中典型的性能优化场景

第一个场景是:小文件问题

第二个场景是:数据倾斜问题

2 小文件问题

先一个一个来,不要着急,我们先看小文件问题

咱们前面分析过,Hadoop的HDFS和MapReduce都是针对大数据文件来设计的,在小文件的处理上不但

效率低下,而且十分消耗内存资源针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然存储了很多个文件,但是文件的体积并不大,这样就没有意义了。

针对MapReduce而言,每一个小文件都是一个Block,都会产生一个InputSplit,最终每一个小文件都会 产生一个map任务,这样会导致同时启动太多的Map任务,Map任务的启动是非常消耗性能的,但是启动了以后执行了很短时间就停止了,因为小文件的数据量太小了,这样就会造成任务执行消耗的时间还没有启动任务消耗的时间多,这样也会影响MapReduce执行的效率。

针对这个问题,解决办法通常是选择一个容器,将这些小文件组织起来统一存储,HDFS提供了两种类型的容器,分别是SequenceFile 和 MapFileSequeceFile是Hadoop 提供的一种二进制文件,这种二进制文件直接将<key, value>对序列化到文件中。

一般对小文件可以使用这种文件合并,即将小文件的文件名作为key,文件内容作为value序列化到大文

件中但是这个文件有一个缺点,就是它需要一个合并文件的过程,最终合并的文件会比较大,并且合并后的文件查看起来不方便,必须通过遍历才能查看里面的每一个小文件所以这个SequenceFile 其实可以理解为把很多小文件压缩成一个大的压缩包了。

下面我们来具体看一下如何生成SequenceFile

生成SequenceFile需要开发代码

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import java.io.File;
/**
 * 小文件解决方案之SequenceFile
 */
public class SmallFileSeq {
    public static void main(String[] args) throws Exception {
        //生成SequenceFile文件
        write("D:\\smallFile", "/seqFile");
        //读取SequenceFile文件
        read("/seqFile");
    }
    /**
     * 生成SequenceFile文件
     *
     * @param inputDir   输入目录-windows目录
     * @param outputFile 输出文件-hdfs文件
     * @throws Exception
     */
    private static void write(String inputDir, String outputFile)
            throws Exception {
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
        //获取操作HDFS的对象
        FileSystem fileSystem = FileSystem.get(conf);
        //删除输出文件
        fileSystem.delete(new Path(outputFile), true);
        //构造opts数组,有三个元素
 /*
 第一个是输出路径
 第二个是key类型
 第三个是value类型
 */
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                SequenceFile.Writer.file(new Path(outputFile)),
                SequenceFile.Writer.keyClass(Text.class),
                SequenceFile.Writer.valueClass(Text.class)};
        //创建一个writer实例
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
        //指定要压缩的文件的目录
        File inputDirPath = new File(inputDir);
        if (inputDirPath.isDirectory()) {
            File[] files = inputDirPath.listFiles();
            for (File file : files) {
                //获取文件全部内容
                String content = FileUtils.readFileToString(file, "UTF-8");
                //文件名作为key
                Text key = new Text(file.getName());
                //文件内容作为value
                Text value = new Text(content);
                writer.append(key, value);
            }
        }
        writer.close();
    }
    *
    @param
    inputFile SequenceFile文件路径
 *@throws Exception
 */
    private static void read(String inputFile)
            throws Exception {
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
        //创建阅读器
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFi
                Text key = new Text();
        Text value = new Text();
        //循环读取数据
        while (reader.next(key, value)) {
            //输出文件名称
            System.out.print("文件名:" + key.toString() + ",");
            //输出文件的内容
            System.out.println("文件内容:" + value.toString());
        }
        reader.close();
    }
}

执行代码中的write方法,可以看到在HDFS上会产生一个/seqFile文件,这个文件就是最终生成的大文件

执行代码中的read方法,可以输出小文件的名称和内容

接下来我们来看一下MapFile

MapFile是排序后的SequenceFile,MapFile由两部分组成,分别是index和data

index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。

MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件 位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。

代码实现如下:

package com.oldlu.mr;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.File;
/**
 * 小文件解决方案之MapFile
 */
public class SmallFileMap {
    public static void main(String[] args) throws Exception{
        //生成MapFile文件
        write("D:\\smallFile","/mapFile");
        read("/mapFile");
    }
    /**
     * 生成MapFile文件
     * @param inputDir 输入目录-windows目录
     * @param outputDir 输出目录-hdfs目录
     * @throws Exception
     */
    private static void write(String inputDir,String outputDir)
            throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS","hdfs://bigdata01:9000");
        //获取操作HDFS的对象
        FileSystem fileSystem = FileSystem.get(conf);
        //删除输出目录
        fileSystem.delete(new Path(outputDir),true);
        //构造opts数组,有两个元素
 /*
 第一个是key类型
 第二个是value类型
 */
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                MapFile.Writer.keyClass(Text.class),
                MapFile.Writer.valueClass(Text.class)};
        //创建一个writer实例
        MapFile.Writer writer = new MapFile.Writer(conf,new Path(outputDir),o
                //指定要压缩的文件的目录
                File inputDirPath = new File(inputDir);
        if(inputDirPath.isDirectory()){
            File[] files = inputDirPath.listFiles();
            for (File file : files) {
                //获取文件全部内容
                String content = FileUtils.readFileToString(file, "UTF-8");
                //文件名作为key
                Text key = new Text(file.getName());
                //文件内容作为value
                Text value = new Text(content);
                writer.append(key,value);
            }
        }
        writer.close();
    }
    /**
     * 读取MapFile文件
     * @param inputDir MapFile文件路径
     * @throws Exception
     */
    private static void read(String inputDir)
            throws Exception{
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS","hdfs://bigdata01:9000");
        //创建阅读器
        MapFile.Reader reader = new MapFile.Reader(new Path(inputDir),conf);
        //循环读取数据
        while(reader.next(key,value)){
            //输出文件名称
            System.out.print("文件名:"+key.toString()+",");
            //输出文件的内容
            System.out.println("文件内容:"+value.toString());
        }
        reader.close();
    }
}

执行代码中的write方法,可以看到在HDFS上会产生一个/mapFile目录,这个目录里面有两个文件,一个

index索引文件,一个data数据文件

执行代码中的read方法,可以输出小文件的名称和内容

下面我们来看一个案例

我们来使用SequenceFile实现小文件的存储和计算

小文件的存储刚才我们已经通过代码实现了,接下来我们要实现如何通过MapReduce读取SequenceFile

咱们之前的代码默认只能读取普通文本文件,针对SequenceFile是无法读取的

那该如何设置才能让mapreduce可以读取SequenceFile呢?

很简单,只需要在job中设置输入数据处理类就行了,默认情况下使用的是TextInputFormat

创建一个新的类WordCountJobSeq

注意修改两个地方

  1. 修改job中的设置输入数据处理类
  2. 修改map中k1的数据类型为Text类型
job.setInputFormatClass(SequenceFileInputFormat.class)

创建一个新的类WordCountJobSeq

注意修改两个地方

  1. 修改job中的设置输入数据处理类
  2. 修改map中k1的数据类型为Text类型
    执行成功以后查看结果
package com.oldlu.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
 * 需求:读取SequenceFile文件
 * Created by xuwei
 */
public class WordCountJobSeq {
    /**
     * public static class MyMapper extends Mapper<Text, Text,Text,LongWritable>
     * Logger logger = LoggerFactory.getLogger(MyMapper.class);
     * /**
     * 需要实现map函数
     * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
     *
     * @param k1
     * @param v1
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(Text k1, Text v1, Context context)
            throws IOException, InterruptedException {
        //输出k1,v1的值
        System.out.println("<k1,v1>=<" + k1.toString() + "," + v1.toString() + ">
                //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
                //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
                //对获取到的每一行数据进行切割,把单词切割出来
                String[]words = v1.toString().split(" ");
        //迭代切割出来的单词数据
        for (String word : words) {
            //把迭代出来的单词封装成<k2,v2>的形式
            Text k2 = new Text(word);
            LongWritable v2 = new LongWritable(1L);
            //把<k2,v2>写出去
            context.write(k2, v2);
        }
    }
}
/**
 * Reduce阶段
 */
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongW
 Logger logger =LoggerFactory.getLogger(MyReducer.class);
/**
 * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
 *
 * @param k2
 * @param v2s
 * @param context
 * @throws IOException
 * @throws InterruptedException
 */
@Override
protected void reduce(Text k2,Iterable<LongWritable> v2s,Context co
        throws IOException,InterruptedException{
        //创建一个sum变量,保存v2s的和
        long sum=0L;
        //对v2s中的数据进行累加求和
        for(LongWritable v2:v2s){
        //输出k2,v2的值
        //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"
        //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
        sum+=v2.get();
        }
        //组装k3,v3
        Text k3=k2;
        LongWritable v3=new LongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
        context.write(k3,v3);
        }
        }
/**
 * 组装Job=Map+Reduce
 */
public static void main(String[]args){
        try{
        if(args.length!=2){
        //如果传递的参数不够,程序直接退出
        System.exit(100);
        }
        //指定Job需要的配置参数
        Configuration conf=new Configuration();
        //创建一个Job
        Job job=Job.getInstance(conf);
        //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个
        job.setJarByClass(WordCountJobSeq.class);
        //指定输入路径(可以是文件,也可以是目录)
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //指定输出路径(只能指定一个不存在的目录)
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //指定map相关的代码
        job.setMapperClass(MyMapper.class);
        //指定k2的类型
        job.setMapOutputKeyClass(Text.class);
        //指定v2的类型
        job.setMapOutputValueClass(LongWritable.class);
        //设置输入数据处理类
        job.setInputFormatClass(SequenceFileInputFormat.class);
        //指定reduce相关的代码
        job.setReducerClass(MyReducer.class);
        //指定k3的类型
        job.setOutputKeyClass(Text.class);
        //指定v3的类型
        job.setOutputValueClass(LongWritable.class);
        //提交job
        job.waitForCompletion(true);
        }catch(Exception e){
        e.printStackTrace();
        }
     }
 }

执行成功以后查看结果

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out10/*
hello 10
you 10

此时到yarn的web界面上查看map任务的个数,发现只有1个,说明这样是生效的。

查看map任务的日志,查看打印的k1,v1日志信息

Log Type: stdout
Log Length: 301
<k1,v1>=<file1.txt,hello you>
<k1,v1>=<file10.txt,hello you>
<k1,v1>=<file2.txt,hello you>
<k1,v1>=<file3.txt,hello you>
<k1,v1>=<file4.txt,hello you>
<k1,v1>=<file5.txt,hello you>
<k1,v1>=<file6.txt,hello you>
<k1,v1>=<file7.txt,hello you>
<k1,v1>=<file8.txt,hello you>
<k1,v1>=<file9.txt,hello you>


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
存储 分布式计算 资源调度
Hadoop小文件解决方案
Hadoop小文件解决方案
|
3天前
|
存储 分布式计算 资源调度
两万字长文向你解密大数据组件 Hadoop
两万字长文向你解密大数据组件 Hadoop
25 11
|
2月前
|
图形学 数据可视化 开发者
超实用Unity Shader Graph教程:从零开始打造令人惊叹的游戏视觉特效,让你的作品瞬间高大上,附带示例代码与详细步骤解析!
【8月更文挑战第31天】Unity Shader Graph 是 Unity 引擎中的强大工具,通过可视化编程帮助开发者轻松创建复杂且炫酷的视觉效果。本文将指导你使用 Shader Graph 实现三种效果:彩虹色渐变着色器、动态光效和水波纹效果。首先确保安装最新版 Unity 并启用 Shader Graph。创建新材质和着色器图谱后,利用节点库中的预定义节点,在编辑区连接节点定义着色器行为。
97 0
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
|
2月前
|
存储 SQL 分布式计算
Hadoop生态系统概述:构建大数据处理与分析的基石
【8月更文挑战第25天】Hadoop生态系统为大数据处理和分析提供了强大的基础设施和工具集。通过不断扩展和优化其组件和功能,Hadoop将继续在大数据时代发挥重要作用。
|
2月前
|
分布式计算 大数据 数据处理
【大数据管理新纪元】EMR Delta Lake 与 DLF 深度集成:解锁企业级数据湖的无限潜能!
【8月更文挑战第26天】随着大数据技术的发展,Apache Spark已成为处理大规模数据集的首选工具。亚马逊的EMR服务简化了Spark集群的搭建和运行流程。结合使用Delta Lake(提供ACID事务保证和数据版本控制)与DLF(加强数据访问控制及管理),可以显著提升数据湖的可靠性和性能。本文通过一个电商公司的具体案例展示了如何在EMR上部署集成Delta Lake和DLF的环境,以及这一集成方案带来的几大优势:增强的可靠性、细粒度访问控制、性能优化以及易于管理的特性。这为数据工程师提供了一个高效且灵活的数据湖平台,简化了数据湖的建设和维护工作。
42 1
|
2月前
|
SQL 分布式计算 数据可视化
基于Hadoop的大数据可视化方法
【8月更文第28天】在大数据时代,有效地处理和分析海量数据对于企业来说至关重要。Hadoop作为一个强大的分布式数据处理框架,能够处理PB级别的数据量。然而,仅仅完成数据处理还不够,还需要将这些数据转化为易于理解的信息,这就是数据可视化的重要性所在。本文将详细介绍如何使用Hadoop处理后的数据进行有效的可视化分析,并会涉及一些流行的可视化工具如Tableau、Qlik等。
65 0
|
2月前
|
消息中间件 分布式计算 Hadoop
利用Hadoop进行实时数据分析的挑战与解决方案
【8月更文第28天】随着大数据技术的快速发展,企业和组织面临着越来越复杂的实时数据处理需求。Hadoop 作为一种分布式存储和处理大数据的框架,虽然擅长于批处理任务,但在处理实时数据流时存在一定的局限性。为了克服这些限制,Hadoop 经常与其他实时处理框架(如 Apache Kafka 和 Apache Storm)结合使用。本文将探讨如何利用 Hadoop 结合 Kafka 和 Storm 实现近实时的数据处理,并提供相关的代码示例。
103 0
|
28天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
78 11
|
2月前
|
存储 分布式计算 大数据
MaxCompute 数据分区与生命周期管理
【8月更文第31天】随着大数据分析需求的增长,如何高效地管理和组织数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个专为海量数据设计的计算服务,它提供了丰富的功能来帮助用户管理和优化数据。本文将重点讨论 MaxCompute 中的数据分区策略和生命周期管理方法,并通过具体的代码示例来展示如何实施这些策略。
81 1

热门文章

最新文章

下一篇
无影云桌面