reduce个数究竟和哪些因素有关

简介:
reduce的数目究竟和哪些因素有关

 

 

1、我们知道map的数量和文件数、文件大小、块大小、以及split大小有关,而reduce的数量跟哪些因素有关呢?

 设置mapred.tasktracker.reduce.tasks.maximum的大小能够决定单个tasktracker一次性启动reduce的数目,可是不能决定总的reduce数目。



  conf.setNumReduceTasks(4);JobConf对象的这种方法能够用来设定总的reduce的数目,看下Job Counters的统计:

 

 

	Job Counters 
		Data-local map tasks=2
		Total time spent by all maps waiting after reserving slots (ms)=0
		Total time spent by all reduces waiting after reserving slots (ms)=0
		SLOTS_MILLIS_MAPS=10695
		SLOTS_MILLIS_REDUCES=29502
		Launched map tasks=2
		Launched reduce tasks=4

 

 确实启动了4个reduce:看下输出:

 

diegoball@diegoball:~/IdeaProjects/test/build/classes$ hadoop fs -ls  /user/diegoball/join_ou1123
11/03/25 15:28:45 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
11/03/25 15:28:45 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
Found 5 items
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/_SUCCESS
-rw-r--r--   1 diegoball supergroup        124 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00000
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00001
-rw-r--r--   1 diegoball supergroup        214 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00002
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00003

 仅仅有2个reduce在干活。为什么呢?

shuffle的过程。须要依据key的值决定将这条<K,V> (map的输出),送到哪一个reduce中去。送到哪一个reduce中去靠调用默认的org.apache.hadoop.mapred.lib.HashPartitioner的getPartition()方法来实现。
HashPartitioner类:

package org.apache.hadoop.mapred.lib;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.JobConf;

/** Partition keys by their {@link Object#hashCode()}. 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

 numReduceTasks的值在JobConf中能够设置。

默认的是1:显然太小。


   这也是为什么默认的设置中总启动一个reduce的原因。

   返回与运算的结果和numReduceTasks求余。

   Mapreduce依据这个返回结果决定将这条<K,V>,送到哪一个reduce中去。



key传入的是LongWritable类型,看下这个LongWritable类的hashcode()方法:

 

 public int hashCode() {
    return (int)value;
  }

 简简单单的返回了原值的整型值。

 由于getPartition(K2 key, V2 value,int numReduceTask)返回的结果仅仅有2个不同的值,所以终于仅仅有2个reduce在干活。
 

 HashPartitioner是默认的partition类。我们也能够自己定义partition类 :

 package com.alipay.dw.test;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;

/**
 * Created by IntelliJ IDEA.
 * User: diegoball
 * Date: 11-3-10
 * Time: 下午5:26
 * To change this template use File | Settings | File Templates.
 */
public class MyPartitioner implements Partitioner<IntWritable, IntWritable> {
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
        int nbOccurences = key.get();
        if (nbOccurences > 20051210)
            return 0;
        else
            return 1;
    }

    public void configure(JobConf arg0) {

    }
}

 只须要覆盖getPartition()方法就OK。

通过:
conf.setPartitionerClass(MyPartitioner.class);
能够设置自己定义的partition类。
相同因为之返回2个不同的值0,1,无论conf.setNumReduceTasks(4);设置多少个reduce,也相同仅仅会有2个reduce在干活。

因为每一个reduce的输出key都是经过排序的,上述自己定义的Partitioner还能够达到排序结果集的目的:

 

11/03/25 15:24:49 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
Found 5 items
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/_SUCCESS
-rw-r--r--   1 diegoball supergroup      24546 2011-03-25 15:23 /user/diegoball/opt.del/part-00000
-rw-r--r--   1 diegoball supergroup      10241 2011-03-25 15:23 /user/diegoball/opt.del/part-00001
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00002
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00003

 part-00000和part-00001是这2个reduce的输出,因为使用了自己定义的MyPartitioner,全部key小于20051210的的<K,V>都会放到第一个reduce中处理。key大于20051210就会被放到第二个reduce中处理。


每一个reduce的输出key又是经过key排序的,所以终于的结果集降序排列。


可是假设使用上面自己定义的partition类,又conf.setNumReduceTasks(1)的话。会如何? 看下Job Counters:

	Job Counters 
		Data-local map tasks=2
		Total time spent by all maps waiting after reserving slots (ms)=0
		Total time spent by all reduces waiting after reserving slots (ms)=0
		SLOTS_MILLIS_MAPS=16395
		SLOTS_MILLIS_REDUCES=3512
		Launched map tasks=2
		Launched reduce tasks=1

  仅仅启动了一个reduce。
  (1)、 当setNumReduceTasks( int a) a=1(即默认值),无论Partitioner返回不同值的个数b为多少,仅仅启动1个reduce,这样的情况下自己定义的Partitioner类没有起到不论什么作用。


  (2)、 若a!=1:
   a、当setNumReduceTasks( int a)里 a设置小于Partitioner返回不同值的个数b的话:

    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
        int nbOccurences = key.get();
        if (nbOccurences < 20051210)
            return 0;
        if (nbOccurences >= 20051210 && nbOccurences < 20061210)
            return 1;
        if (nbOccurences >= 20061210 && nbOccurences < 20081210)
            return 2;
        else
            return 3;
    }
 

  同一时候设置setNumReduceTasks( 2)。

 

 于是抛出异常:

  11/03/25 17:03:41 INFO mapreduce.Job: Task Id : attempt_201103241018_0023_m_000000_1, Status : FAILED
java.io.IOException: Illegal partition for 20110116 (3)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:900)
	at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:508)
	at com.alipay.dw.test.KpiMapper.map(Unknown Source)
	at com.alipay.dw.test.KpiMapper.map(Unknown Source)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:397)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
	at org.apache.hadoop.mapred.Child.main(Child.java:211) 

 某些key没有找到所相应的reduce去处。

原因是仅仅启动了a个reduce。
 
   b、当setNumReduceTasks( int a)里 a设置大于Partitioner返回不同值的个数b的话,相同会启动a个reduce。可是仅仅有b个redurce上会得到数据。启动的其它的a-b个reduce浪费了。

 

   c、理想状况是a=b,这样能够合理利用资源,负载更均衡。






本文转自mfrbuaa博客园博客,原文链接:http://www.cnblogs.com/mfrbuaa/p/5144262.html,如需转载请自行联系原作者

相关文章
|
Linux Android开发 iOS开发
建议收藏!8款音乐APP,找不到喜欢的,算我输!
最近一段时间,有不少同学反复询问我音乐相关的软件。 在以前的文章中,我先后介绍过很多音乐相关的APP,其中不乏一时之间让我非常惊艳的。
建议收藏!8款音乐APP,找不到喜欢的,算我输!
|
存储 资源调度 分布式计算
CDP中配置Apache Hadoop Yarn的安全性
CDP中配置Hadoop Yarn的安全性。
888 0
CDP中配置Apache Hadoop Yarn的安全性
【IntelliJ IDEA】idea修改文件的file is read-only
【IntelliJ IDEA】idea修改文件的file is read-only
2520 0
|
消息中间件 Java 大数据
Kafka ISR机制详解!
本文详细解析了Kafka的ISR(In-Sync Replicas)机制,阐述其工作原理及如何确保消息的高可靠性和高可用性。ISR动态维护与Leader同步的副本集,通过不同ACK确认机制(如acks=0、acks=1、acks=all),平衡可靠性和性能。此外,ISR机制支持故障转移,当Leader失效时,可从ISR中选取新的Leader。文章还包括实例分析,展示了ISR在不同场景下的变化,并讨论了其优缺点,帮助读者更好地理解和应用ISR机制。
963 0
Kafka ISR机制详解!
|
机器学习/深度学习 算法
XGBoost中正则化的9个超参数
本文探讨了XGBoost中多种正则化方法及其重要性,旨在通过防止过拟合来提升模型性能。文章首先强调了XGBoost作为一种高效算法在机器学习任务中的应用价值,并指出正则化对于缓解过拟合问题的关键作用,具体包括降低模型复杂度、改善泛化能力和防止模型过度适应训练数据。随后,文章详细介绍了四种正则化方法:减少估计器数量(如使用`early_stopping_rounds`)、使用更简单的树(如调整`gamma`和`max_depth`)、采样(如设置`subsample`和`colsample`)以及收缩(如调节`learning_rate`, `lambda`和`alpha`)。
391 0
XGBoost中正则化的9个超参数
|
大数据
用wordcloud搞词云,大数据词云,自定义图像
用wordcloud搞词云,大数据词云,自定义图像
|
编解码 文字识别 计算机视觉
寒武纪1号诞生:谢赛宁Yann LeCun团队发布最强开源多模态LLM
【7月更文挑战第10天】【寒武纪1号】- 谢赛宁、Yann LeCun团队发布开源多模态LLM,含8B至34B规模模型,创新空间视觉聚合器(SVA)提升视觉-语言集成,建立新基准CV-Bench及大规模训练数据集Cambrian-7M。在多模态任务中表现出色,尤其在高分辨率图像处理上,但面临高分辨率信息处理和部分视觉任务评估的局限。[链接](https://arxiv.org/pdf/2406.16860)
414 1
|
SQL 分布式计算 算法
【Hive】数据倾斜怎么解决?
【4月更文挑战第16天】【Hive】数据倾斜怎么解决?
|
SQL 存储 分布式数据库
【Hive】Hive有索引吗?
【4月更文挑战第14天】【Hive】Hive有索引吗?
|
SQL 分布式计算 资源调度
一文解析 ODPS SQL 任务优化方法原理
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。
104667 1