MapReduce shuffle过程详解!

简介: 笔记

一、MR的shuffle过程


MR的shuffle过程:

input ->  map  ->   shuffle  ->  reduce  ->output

MR的原理图:

8.jpg



二、Map shuffle


9.png

1.map()的数据会写入到内存(环形缓冲区:默认大小:100mb),当数据达到缓冲区总容量的80%(阈值)时,会将我们的数据spill到本地磁盘


1)分区(partitioner):分区决定map输出的数据将会被哪个reduce任务进行处理

2)排序:会对我们分区中的数据进行排序

3)spill写磁盘:将内存中的数据写入到本地磁盘(hadoop.tmp.dir)

2.当Map阶段数据处理完成之后,我们会将spill到磁盘的数据进行一次合并


1)将各个分区的数据合并在一起

2)分区合并之后再次排序

   eg:
      file -1(分区1 + 分区2)
      file -2(分区1 + 分区2)
      file -3(分区1 + 分区2)
      合并成一个文件:
      allFile(合并分区1 + 合并分区2)
      最后会形成一个文件,待reduce任务获取

map shuffle总结成自己话:当map以<key,value>对输出之后,会写入到内存中(环形缓冲区),环形缓冲区默认大小是100MB,当数据达到缓冲区总容量的80%(阈值)时,会将我们的数据spill到本地磁盘。数据在环形缓冲区中spill到本地磁盘之前会做分区,排序,之后再spill到本地磁盘。因为map对数据不断的进行处理,数据会不断spill到本地磁盘,也就在磁盘上生成很多小文件。由于最终reduce只会拿map输出的一个结果,所以会将spill到磁盘的数据进行一次合并,将各个分区的数据合并在一起,分区合并之后再次排序。最后会形成一个文件,待reduce任务获取


三、Reduce shuffle


1.当map阶段数据处理完成之后,各个reduce 任务主动到已经完成的map 任务的本次磁盘中,去拉取属于自己要处理的数据,最后会形成一个文件


1)reduce对应partition的数据写入内存,根据reduce端数据内存的阈值,spill到reduce本地磁盘,形成一个个小文件

2)对文件进行合并

3)排序

4)分组(将相同key的value放在一起)

  <hadoop,1>
  <hadoop,1>    ->  <hadoop,List(1,1,1)>
  <hadoop,1>
  <spark,1>
  <spark,1>

5)reduce()

reduce shuffle总结成自己话:当map阶段数据处理完成之后,各个reduce任务主动到已经完成的map 任务的本次磁盘中,去拉取属于自己要处理的数据,最后会形成一个文件,reduce 对应partition的数据写入内存,根据reduce端数据内存的阈值,spill到reduce本地磁盘,形成一个个小文件,然后对一个个小文件进行合并,排序,分组(将相同key的value放在一起),最后形成reduce输入端,传递给reduce()函数,得出结果。


四、MapReduce shuffle阶段配置详解


//1.分区
job.setPartitionerClass();
//2.排序
job.setSortComparatorClass();
//3.combiner  -可选项
//combiner其实就是map端reduce
job.setCombinerClass(WordCountCombiner.class);
//4.compress  -可配置
//5.分组
job.setGroupingComparatorClass();

(1)Combiner配置详解

(1)Combiner概述

·Mapreduce中的mapper阶段将输入的数据转换成一个个键值对的形式<key,value>,然后在网络节点间对其进行shuffle机制对数据进行整理,最后reducer阶段处理数据并输出结果。这是一个mr程序一般的处理流程。如果存在这样一个实际的场景:如果有10个数据文件,Mapper会生成10亿个<k2,v2>的键值对在网络间进行传输,但如果我们只是对数据求最大值,显然Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。

combiner本质就是map端reduce:我们可以把合并(combiner)看作是一个在每个单独节点上先做一次Reduce的操作,其输入及输出的参数和Reduce是一样的。


(2)Combiner案例

以wordcount为例,对比有和没有Combiner的差异:

10.png

若要引入Combiner,则只需要在wordcount程序的main方法中,增加下面的语句即可:

//combiner其实就是map端reduce
job.setCombinerClass(WordCountCombiner.class);

Combiner虽好,但并不是所有的案例都适合用combiner,只有操作满足结合律的才可设置combiner。比如求平均值的场景:

11.png

接下来看一个wordcount的combiner的案例:

1.输入文件内容:

java python hadoop
spark spark java hadoop
hive hbase hbase hive
spark java python hadoop
linux unix java python
spark spark storm flink
java flink hue kafka kafka
java python zookeeper zookeeper
spark hive hbase hive

2.先运行之前没有设置Combiner的Wordcount案例,观察控制台显示的信息:

12.png

在我们没有启用Combiner之前,关于Combiner的计数器都为0,查看这个时候的mapper阶段的计数器和reducer阶段的计数器为10和37。


3.需求分析

统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。


4.实现过程

package com.kfk.hadoop.mr;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class WordCountUpMRC extends Configured implements Tool {
    /**
     * map
     */
    public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        // 创建k2,v2对象
        private final Text mapOutputKey = new Text();
        private final IntWritable mapOutputValue = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 输出<k1,v1>,k1就是偏移量,v1就是一行行数据
            System.out.println("Map in == KeyIn: "+key+"   ValueIn: "+value);
            // 每一行数据
            String lineValue = value.toString();
            // 将数据按照空格分开
            String[] strs = lineValue.split(" ");
            // 输出<k2,v2>
            for (String str:strs){
                mapOutputKey.set(str);
                context.write(mapOutputKey,mapOutputValue);
                // 打印出<k2,v2>
                System.out.println("Map out == KeyOut: "+mapOutputKey+"   ValueOut: "+mapOutputValue);
            }
        }
    }
    /**
     * combiner
     */
    public static class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{
        private final IntWritable outputValue = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            List<IntWritable> list = Lists.newArrayList(values);
            // 打印出Combiner输入的kv
            System.out.println("Combiner in keyIn:"+key +"    ValueIn:"+list);
            int sum = 0;
            for (IntWritable value:list){
                sum += value.get();
            }
            outputValue.set(sum);
            context.write(key,outputValue);
            // 打印出Combiner输出的kv
            System.out.println("Combiner out keyOut:"+key +"    ValueOut:"+outputValue);
        }
    }
    /**
     * reduce
     */
    public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        // 创建v4对象
        private final IntWritable outputValue = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 打印出k3,v3集合,用于本机测试
            List<IntWritable> list = Lists.newArrayList(values);
            System.out.println("Reduce in == KeyIn: "+key+"   ValueIn: "+list);
            // 对v3求和
            int sum = 0;
            for (IntWritable value:list){
                sum += value.get();
            }
            outputValue.set(sum);
            // 输出<k4,v4>
            context.write(key,outputValue);
            // 打印出k4,v4的值
            System.out.println("Reduce out == KeyOut: "+key+"   ValueOut: "+outputValue);
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 1.分区
        // job.setPartitionerClass();
        // 2.排序
        // job.setSortComparatorClass();
        // 3.combiner -可选项
        job.setCombinerClass(WordCountCombiner.class);
        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec压缩算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 5.分组
        // job.setGroupingComparatorClass();
    // 6.设置reduce的数量
        // job.setNumReduceTasks(2);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/wordcount.txt",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new WordCountUpMRC(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

5.查看运行结果:

map输出:

13.png

combiner输出:

14.png

reduce输出:

15.png

再次查看控制台显示的信息:

16.png

在我们启用Combiner之后,关于Combiner的计数器为37和14,reduce的计数器为14,对这个结果分析一下,因为输入文件没有变化,所以mapper的计数器没有变化,因为输入文件小于128M,所以只会启用一个map task。


小结:

在实际的Hadoop集群操作中,我们是由多台主机一起进行MapReduce的,如果加入合并操作,每一台主机会在reduce之前进行一次对本机数据的合并,然后在通过集群进行reduce操作,这样就会大大节省reduce的时间,从而加快MapReduce的处理速度。

17.png


(2)MR的数据压缩

(1)数据压缩概述

数据压缩这是mapreduce的一种优化策略:通过压缩编码对mapper或者reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度(但相应增加了cpu运算负担)


Mapreduce支持将map输出的结果或者reduce输出的结果进行压缩,以减少网络IO或最终输出数据的体积

压缩特性运用得当能提高性能,但运用不当也可能降低性能

压缩使用基本原则:(1)运算密集型的job,少用压缩 (2)IO密集型的job,多用压缩

(2)常见压缩算法比较

1.gzip压缩:

优点:压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便。

缺点:不支持split。


2.lzo压缩:

优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便。

缺点:压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式)。


3.snappy压缩:

优点:压缩速度快;支持hadoop native库。

缺点:不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令。

4.bzip2压缩:

优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便。

缺点:压缩/解压速度慢;不支持native。


(3)compress设定方式

MR数据 处理过程中,compress如何设定,有两种方式:

第一种:configuration

// compress -可配置
configuration.set("mapreduce.map.output.compress","true");
// 使用的SnappyCodec压缩算法 
configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

第二种:mapred-site.xml

#支持压缩
<property>
    <name>mapreduce.map.output.compress</name>
    <value>true</value>
</property>
#压缩方式
<property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property> 


(3)如何设置reduce数量

在缺省情况下,一个mapreduce的job只有一个reducer;在大型集群中,需要使用许多reducer,中间数据都会放到一个reducer中处理,如果reducer数量不够,会成为计算瓶颈。

// 6.设置reduce的数量为2
job.setNumReduceTasks(2);

查看partition分区


一个分区对应一个reducer。

18.png


四、MapReduce的调优


1.Reduce Task Number(设定reduce的数量)


根据业务需求

测试

2.Map task 输出压缩


3.shuffle数据处理过程中的参数


mapreduce.task.io.sort.mb:配置环形缓冲区大小(默认100MB)

mapreduce.map.sort.spill.percent:配置达到的百分比(默认是80%开始spill磁盘)

mapreduce.task.io.sort.factor:配置分区合并文件的数量(默认是10)

mapreduce.map.cpu.vcores:配置CPU核数(默认是1)

mapreduce.reduce.cpu.vcores:配置CPU核数(默认是1)

mapreduce.map.memory.mb:配置内存大小(默认是1024MB)

mapreduce.reduce.memory.mb:配置内存大小(默认是1024MB)


五、完善MR的编程模板


根据wordcount编程,我们可以整理出一套MR的编程模板,根据需求套用模版即可:

package com.kfk.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class MRTemplate extends Configured implements Tool {
    /**
     * map
     * TODO
     */
    public static class TemplateMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * reduce
     * TODO
     */
    public static class TemplateReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(TemplateMapper.class);
        //TODO
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 1.分区
        // job.setPartitionerClass();
        // 2.排序
        // job.setSortComparatorClass();
        // 3.combiner -可选项
        // job.setCombinerClass(WordCountCombiner.class);
        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec压缩算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 5.分组
        // job.setGroupingComparatorClass();
        // 6.设置reduce的数量
        job.setNumReduceTasks(2);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(TemplateReducer.class);
        //TODO
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/wordcount.txt",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new MRTemplate(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


相关文章
|
6月前
|
分布式计算
MapReduce中的Shuffle过程是什么?为什么它在性能上很关键?
MapReduce中的Shuffle过程是什么?为什么它在性能上很关键?
227 0
|
6月前
|
存储 分布式计算 算法
MapReduce计数器,Tash的运行机制,shuffle过程,压缩算法
MapReduce计数器,Tash的运行机制,shuffle过程,压缩算法
60 0
|
缓存 分布式计算
25 MAPREDUCE的shuffle机制
25 MAPREDUCE的shuffle机制
68 0
|
存储 分布式计算 索引
MapReduce 的 shuffle 阶段【重要】
MapReduce 的 shuffle 阶段【重要】
160 0
|
存储 缓存 分布式计算
Hadoop知识点总结——MapReduce的Shuffle
从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程
137 0
|
存储 分布式计算 Hadoop
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
|
分布式计算 大数据 数据处理
大数据||MapReduce的shuffle
mapreduce的数据处理过程中,shuffle出于map和Reduce之间。 Shuffle:洗牌或弄乱。 Collections.shuffle(List):随机地打乱参数list里的元素顺序。
1133 0
|
分布式计算 缓存
mapreduce的shuffle机制(来自学习笔记)
3. MAPREDUCE原理篇(2) 3.1 mapreduce的shuffle机制 3.1.1 概述: v  mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle; v shuffle: 洗牌、发牌——(核心机制:数据分区,排序,缓存) v 具体来说:就是将maptask输出的处理结果数据,分发给re
3680 0
|
分布式计算 索引 Hadoop
MapReduce优化----Shuffle过程剖析及性能优化
1.    Map端 当Map 开始产生输出时,它并不是简单的把数据写到磁盘,因为频繁的磁盘操作会导致性能严重下降。它的处理过程更复杂,数据首先是写到内存中的一个缓冲区,并做了一些预排序,以提升效率。
1386 0