MapReduce【Shuffle-Combiner】

简介: Conbiner在MapReduce的Shuffle阶段起作用,它负责局部数据的聚合,我们可以看到,对于大数据量,如果没有Combiner,将会在磁盘上写入多个文件等待ReduceTask来拉取,但是如果有Combiner组件,我们可以通过Combiner来减小中间结果文件的大小,从而增加传输的效率。

概述


image.png

        Conbiner在MapReduce的Shuffle阶段起作用,它负责局部数据的聚合,我们可以看到,对于大数据量,如果没有Combiner,将会在磁盘上写入多个文件等待ReduceTask来拉取,但是如果有Combiner组件,我们可以通过Combiner来减小中间结果文件的大小,从而增加传输的效率。


       以wordcount为例,从map出来的kv已经经过了排序是有序的,我们可以进行一次Combiner将相同key的value进行一个合并,从而减少数据量。接着再进行一次归并排序,将多个溢写文件合并到一起。如果溢写的文件特别多,一次归并排序不能全部合并(默认一次归并10个溢写文件),可以再进行一次归并。最终只有一个中间结果文件产生。


Combiner是MapReduce程序中Mapper和Reducer之外的一个组件。

Combiner组件的父类就是Reducer

Combiner和Reducer的区别在于运行的位置

Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

比如wordcount案例,我们可以对它增加一个Combiner,因为这样不会影响最终结果。

但是对于求平均值这种案例,比如(3+5+7+2+6)/5 != (3+5+7)/3 + (2+6)/2

实现

Combiner只需要继承Reducer类并重写reduce方法即可,我们只需要在wordcount案例基础之上增加一个类WordCountCombiner并在Runner类中修改job的属性即可。


WordCountCombiner类

public class WordCountCombiner extends Reducer<Text, LongWritable,Text,LongWritable> {
    private LongWritable OUT_KEY = new LongWritable();
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (LongWritable value : values) {
            sum += value.get();
        }
        OUT_KEY.set(sum);
        context.write(key,OUT_KEY);
    }
}


修改job属性

//设置combiner
        job.setCombinerClass(WordCountCombiner.class);

 


相关文章
|
Linux C语言 Python
CentOS7下升级GLIBC2.31
CentOS7下升级GLIBC2.31
4101 0
CentOS7下升级GLIBC2.31
|
SQL 分布式计算 资源调度
线上 hive on spark 作业执行超时问题排查案例分享
线上 hive on spark 作业执行超时问题排查案例分享
|
存储 SQL 运维
一篇文章彻底理解 HDFS 的安全模式
一篇文章彻底理解 HDFS 的安全模式
|
存储 分布式计算 资源调度
2022年最强大数据面试宝典(全文50000字,建议收藏)(一)
复习大数据面试题,看这一套就够了!
2877 0
|
8月前
|
SQL 分布式计算 数据库
【YashanDB 知识库】Hive 命令工具 insert 崖山数据库报错
【YashanDB 知识库】Hive 命令工具 insert 崖山数据库报错
|
10月前
|
缓存 分布式计算 资源调度
Spark 与 MapReduce 的 Shuffle 的区别?
MapReduce 和 Spark 在 Shuffle 过程中有显著区别。MapReduce 采用两阶段模型,中间数据写入磁盘,I/O 开销大;而 Spark 使用基于内存的多阶段执行模型,支持操作合并和内存缓存,减少 I/O。Spark 的 RDD 转换优化减少了 Shuffle 次数,提升了性能。此外,Spark 通过 lineage 实现容错,资源管理更灵活,整体大数据处理效率更高。
|
分布式计算 自然语言处理 监控
大数据Spark对SogouQ日志分析
大数据Spark对SogouQ日志分析
519 0
|
SQL 存储 分布式计算
Hive精选10道面试题
Hive精选10道面试题
833 3
Hive精选10道面试题
|
SQL Oracle 关系型数据库
SQL 面试系列(一)【留存率问题】
SQL 面试系列(一)【留存率问题】
|
SQL 分布式计算 安全
Spark的核心概念:RDD、DataFrame和Dataset
Spark的核心概念:RDD、DataFrame和Dataset