reduce task个数到底和哪些因素有关

简介: reduce task个数到底和哪些因素有关

1、我们知道map的数量和文件数、文件大小、块大小、以及split大小有关,而reduce的数量跟哪些因素有关呢?

设置mapred.tasktracker.reduce.tasks.maximum的大小可以决定单个tasktracker一次性启动reduce的数目,但是不能决定总的reduce数目。

conf.setNumReduceTasks(4);JobConf对象的这个方法可以用来设定总的reduce的数目,看下Job Counters的统计:

Job Counters 
        Data-local map tasks=2
        Total time spent by all maps waiting after reserving slots (ms)=0
        Total time spent by all reduces waiting after reserving slots (ms)=0
        SLOTS_MILLIS_MAPS=10695
        SLOTS_MILLIS_REDUCES=29502
        Launched map tasks=2
        Launched reduce tasks=4

确实启动了4个reduce:看下输出:

diegoball@diegoball:~/IdeaProjects/test/build/classes$ hadoop fs -ls  /user/diegoball/join_ou1123
11/03/25 15:28:45 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
11/03/25 15:28:45 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
Found 5 items
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/_SUCCESS
-rw-r--r--   1 diegoball supergroup        124 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00000
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00001
-rw-r--r--   1 diegoball supergroup        214 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00002
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00003


只有2个reduce在干活。为什么呢?

shuffle的过程,需要根据key的值决定将这条<K,V> (map的输出),送到哪一个reduce中去。送到哪一个reduce中去靠调用默认的org.apache.hadoop.mapred.lib.HashPartitioner的getPartition()方法来实现。

HashPartitioner类:

package org.apache.hadoop.mapred.lib;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.JobConf;
/** Partition keys by their {@link Object#hashCode()}. 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
  public void configure(JobConf job) {}
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}


numReduceTasks的值在JobConf中可以设置。默认的是1:显然太小。

这也是为什么默认的设置中总启动一个reduce的原因。

返回与运算的结果和numReduceTasks求余。

Mapreduce根据这个返回结果决定将这条<K,V>,送到哪一个reduce中去。

key传入的是LongWritable类型,看下这个LongWritable类的hashcode()方法:

public int hashCode() {
    return (int)value;
  }


简简单单的返回了原值的整型值。

因为getPartition(K2 key, V2 value,int numReduceTask)返回的结果只有2个不同的值,所以最终只有2个reduce在干活。

HashPartitioner是默认的partition类,我们也可以自定义partition类 :

package com.alipay.dw.test;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
/**
 * Created by IntelliJ IDEA.
 * User: diegoball
 * Date: 11-3-10
 * Time: 下午5:26
 * To change this template use File | Settings | File Templates.
 */
public class MyPartitioner implements Partitioner<IntWritable, IntWritable> {
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
        int nbOccurences = key.get();
        if (nbOccurences > 20051210)
            return 0;
        else
            return 1;
    }
    public void configure(JobConf arg0) {
    }
}


仅仅需要覆盖getPartition()方法就OK。通过:

conf.setPartitionerClass(MyPartitioner.class);

可以设置自定义的partition类。

同样由于之返回2个不同的值0,1,不管conf.setNumReduceTasks(4);设置多少个reduce,也同样只会有2个reduce在干活。

由于每个reduce的输出key都是经过排序的,上述自定义的Partitioner还可以达到排序结果集的目的:

11/03/25 15:24:49 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
Found 5 items
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/_SUCCESS
-rw-r--r--   1 diegoball supergroup      24546 2011-03-25 15:23 /user/diegoball/opt.del/part-00000
-rw-r--r--   1 diegoball supergroup      10241 2011-03-25 15:23 /user/diegoball/opt.del/part-00001
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00002
-rw-r--r--   1 diegoball supergroup          0 2011-03-25 15:23 /user/diegoball/opt.del/part-00003


part-00000和part-00001是这2个reduce的输出,由于使用了自定义的MyPartitioner,所有key小于20051210的的<K,V>都会放到第一个reduce中处理,key大于20051210就会被放到第二个reduce中处理。

每个reduce的输出key又是经过key排序的,所以最终的结果集降序排列。

但是如果使用上面自定义的partition类,又conf.setNumReduceTasks(1)的话,会怎样? 看下Job Counters:

Job Counters 
        Data-local map tasks=2
        Total time spent by all maps waiting after reserving slots (ms)=0
        Total time spent by all reduces waiting after reserving slots (ms)=0
        SLOTS_MILLIS_MAPS=16395
        SLOTS_MILLIS_REDUCES=3512
        Launched map tasks=2
        Launched reduce tasks=1


只启动了一个reduce。

(1)、 当setNumReduceTasks( int a) a=1(即默认值),不管Partitioner返回不同值的个数b为多少,只启动1个reduce,这种情况下自定义的Partitioner类没有起到任何作用。

(2)、 若a!=1:

a、当setNumReduceTasks( int a)里 a设置小于Partitioner返回不同值的个数b的话:

public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
        int nbOccurences = key.get();
        if (nbOccurences < 20051210)
            return 0;
        if (nbOccurences >= 20051210 && nbOccurences < 20061210)
            return 1;
        if (nbOccurences >= 20061210 && nbOccurences < 20081210)
            return 2;
        else
            return 3;
    }


同时设置setNumReduceTasks( 2)。

于是抛出异常:

11/03/25 17:03:41 INFO mapreduce.Job: Task Id : attempt_201103241018_0023_m_000000_1, Status : FAILED
java.io.IOException: Illegal partition for 20110116 (3)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:900)
    at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:508)
    at com.alipay.dw.test.KpiMapper.map(Unknown Source)
    at com.alipay.dw.test.KpiMapper.map(Unknown Source)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:397)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)


某些key没有找到所对应的reduce去处。原因是只启动了a个reduce。

b、当setNumReduceTasks( int a)里 a设置大于Partitioner返回不同值的个数b的话,同样会启动a个reduce,但是只有b个redurce上会得到数据。启动的其他的a-b个reduce浪费了。

c、理想状况是a=b,这样可以合理利用资源,负载更均衡。

目录
相关文章
|
算法 数据处理 计算机视觉
【MATLAB 】 MODWT 信号分解+希尔伯特黄变换+边际谱算法
【MATLAB 】 MODWT 信号分解+希尔伯特黄变换+边际谱算法
537 0
|
存储 分布式计算 DataWorks
DataWorks报错问题之报错0420095如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
存储 NoSQL 关系型数据库
Redis(六)set集合类型
set集合和list列表十分的相似,都可以存储多个字符串。但是list列表可以存储重复值,而set集合中不可重复。
7580 0
Redis(六)set集合类型
|
存储 资源调度 分布式计算
LC3|视角 开源大数据生态下的高性能分布式文件系统
## 背景介绍 盘古是阿里云自主研发的高可靠、高可用、高性能分布式文件系统,距今已经有将近10年的历史。盘古1.0作为阿里云的统一存储核心,稳定高效的支撑着阿里云ECS、NAS、OSS、OTS、ODPS、ADS等多条业务线的迅猛发展。但最近几年,基于如下两方面的原因,盘古还是重新设计实现了第二代存储引擎盘古2.0,其不只为阿里云,也为集团、蚂蚁金服的多种业务提供了更佳优异的存储服务 #
3425 0
|
存储 分布式计算 负载均衡
HadoopHDFS的特点可扩展性
【5月更文挑战第11天】HadoopHDFS的特点可扩展性
379 1
|
缓存 JavaScript
Vue 中的 computed 和 watch 的区别
Vue 中的 computed 和 watch 的区别
|
安全 API UED
【支付宝推荐】企业转账如何又快又省?试试“商家转账”吧!
企业面对日益增长的转账需求,财务操作繁琐、效率低下。但支付宝的“商家转账”服务为企业提供了数字化资金通道,实现0费率、批量处理、实时到账。适用于零工薪酬、佣金、营销激励等多种场景,已覆盖灵活用工、物流、出行、家政服务等多行业。该服务提供无需开发的批量转账产品和API接口产品,支持定制化行业解决方案。如需接入,可点击链接留下信息以获取联系。
【支付宝推荐】企业转账如何又快又省?试试“商家转账”吧!
|
安全
工信部ICP备案查询指南
【10月更文挑战第12天】工信部ICP备案查询指南
4279 0
|
机器学习/深度学习 自然语言处理 算法
深度学习算法简介(一)
深度学习算法简介(一)
792 0
MapTask 、ReduceTask 数量的决定因素
MapTask 、ReduceTask 数量的决定因素
653 0

热门文章

最新文章