MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

简介:         我们知道,MapReduce有三层调度模型,即Job——>Task——>TaskAttempt,并且:        1、通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业、JobSetup Task等复杂的情况这里不做考虑);        2、每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次。

        我们知道,MapReduce有三层调度模型,即Job——>Task——>TaskAttempt,并且:

        1、通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业、JobSetup Task等复杂的情况这里不做考虑);

        2、每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次。

        而TaskImpl中存在一个成员变量attempts,用来存储Task所包含TaskAttempt中TaskAttemptId与TaskAttempt的映射关系,定义及初始化如下:

    private Map<TaskAttemptId, TaskAttempt> attempts;
    this.attempts = Collections.emptyMap();

        也就是说,attempts一开始被初始化为Collections.emptyMap(),我们看下其实现:

    @SuppressWarnings("unchecked")
    public static final <K,V> Map<K,V> emptyMap() {
        return (Map<K,V>) EMPTY_MAP;
    }
    @SuppressWarnings("unchecked")
    public static final Map EMPTY_MAP = new EmptyMap<>();
    /**
     * @serial include
     */
    private static class EmptyMap<K,V>
        extends AbstractMap<K,V>
        implements Serializable
    {
        private static final long serialVersionUID = 6428348081105594320L;

        public int size()                          {return 0;}
        public boolean isEmpty()                   {return true;}
        public boolean containsKey(Object key)     {return false;}
        public boolean containsValue(Object value) {return false;}
        public V get(Object key)                   {return null;}
        public Set<K> keySet()                     {return emptySet();}
        public Collection<V> values()              {return emptySet();}
        public Set<Map.Entry<K,V>> entrySet()      {return emptySet();}

        public boolean equals(Object o) {
            return (o instanceof Map) && ((Map<?,?>)o).isEmpty();
        }

        public int hashCode()                      {return 0;}

        // Preserves singleton property
        private Object readResolve() {
            return EMPTY_MAP;
        }
    }
        可以看出,EmptyMap就是一个空的Map,大小为0,isEmpty为true,containsKey和containsValue等针对任何key或value均为false。

        而在生成TaskAttempt后将其添加至attempts的逻辑如下:

    // 将创建的任务运行尝试TaskAttemptImpl实例attempt与其ID的对应关系添加到TaskImpl的任务运行尝试集合attempts中,
    // attempts先被初始化为Collections.emptyMap()
    // this.attempts = Collections.emptyMap();
    switch (attempts.size()) {
      case 0:
    	  
    	// 如果attempts大小为0,即为Collections.emptyMap(),则将其更换为Collections.singletonMap(),并加入该TaskAttemptImpl实例attempt
        attempts = Collections.singletonMap(attempt.getID(),
            (TaskAttempt) attempt);
        break;
        
      case 1:
    	  
    	// 如果attempts大小为1,即为Collections.singletonMap(),则将其替换为LinkedHashMap,并加入之前和现在的TaskAttemptImpl实例attempt
        Map<TaskAttemptId, TaskAttempt> newAttempts
            = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
        newAttempts.putAll(attempts);
        attempts = newAttempts;
        attempts.put(attempt.getID(), attempt);
        break;

      default:
    	// 如果attempts大小大于1,说明其实一个LinkedHashMap,直接put吧
        attempts.put(attempt.getID(), attempt);
        break;
    }
        当Task第一次生成TaskAttempt,并将其加入attempts时,attempts为Collections.emptyMap(),其大小肯定为0,此时将TaskAttempt加入attempts时,会将attempts转换成Collections.singletonMap,即只含有一个Key-Value对的Map。而Collections.singletonMap定义如下:

    public static <K,V> Map<K,V> singletonMap(K key, V value) {
        return new SingletonMap<>(key, value);
    }
    private static class SingletonMap<K,V>
          extends AbstractMap<K,V>
          implements Serializable {
        private static final long serialVersionUID = -6979724477215052911L;

        private final K k;
        private final V v;

        SingletonMap(K key, V value) {
            k = key;
            v = value;
        }

        public int size()                          {return 1;}

        public boolean isEmpty()                   {return false;}

        public boolean containsKey(Object key)     {return eq(key, k);}

        public boolean containsValue(Object value) {return eq(value, v);}

        public V get(Object key)                   {return (eq(key, k) ? v : null);}

        private transient Set<K> keySet = null;
        private transient Set<Map.Entry<K,V>> entrySet = null;
        private transient Collection<V> values = null;

        public Set<K> keySet() {
            if (keySet==null)
                keySet = singleton(k);
            return keySet;
        }

        public Set<Map.Entry<K,V>> entrySet() {
            if (entrySet==null)
                entrySet = Collections.<Map.Entry<K,V>>singleton(
                    new SimpleImmutableEntry<>(k, v));
            return entrySet;
        }

        public Collection<V> values() {
            if (values==null)
                values = singleton(v);
            return values;
        }

    }
         

由此可以看出,SingletonMap是只包含一对Key-Value的Map,其size大小固定为1,containsKey和containsValue返回入参key、value是否与SingletonMap内部的k、v相等,get会根据入参是否为k,来确定返回v还是null,等等。

        而当attempts大小为1,即为Collections.singletonMap时,再添加TaskAttempt的话,就需要将attempts更换为LinkedHashMap,将之前的和新添加的TaskAttempt加入,此后,如果再有TaskAttempt要加入的话,直接put即可。LinkedHashMap初始化时,其容量已被确定,为maxAttempts,这个maxAttempts取自方法getMaxAttempts(),它在TaskImpl中是一个抽象方法,由其两个子类MapTaskImpl、ReduceTaskImpl分别实现,如下:

        TaskImpl.java

  // No override of this method may require that the subclass be initialized.
  protected abstract int getMaxAttempts();
        MapTaskImpl.java

  @Override
  protected int getMaxAttempts() {
    return conf.getInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4);
  }
        ReduceTaskImpl.java

  @Override
  protected int getMaxAttempts() {
    return conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
  }
        可见,Map和Reduce任务的TaskAttempt都有一个限制,分别取自参数mapreduce.map.maxattempts、mapreduce.reduce.maxattempts,参数未配置的话,均默认为4。既然有了TaskAttempt个数的上限,那么我们初始化LinkedHashMap指定容量即可,其构造如下:

    /**
     * Constructs an empty insertion-ordered <tt>LinkedHashMap</tt> instance
     * with the specified initial capacity and a default load factor (0.75).
     *
     * @param  initialCapacity the initial capacity
     * @throws IllegalArgumentException if the initial capacity is negative
     */
    public LinkedHashMap(int initialCapacity) {
        super(initialCapacity);
        accessOrder = false;
    }
        调用父类HashMap的构造函数,如下:

    /**
     * Constructs an empty <tt>HashMap</tt> with the specified initial
     * capacity and the default load factor (0.75).
     *
     * @param  initialCapacity the initial capacity.
     * @throws IllegalArgumentException if the initial capacity is negative.
     */
    public HashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR);
    }
         确定其初始容量为指定的initialCapacity。


        思考:

        MapReduce为什么要这么设计呢?我想了想,大体有关于业务逻辑和性能等方面的两个原因:

        1、Task的调度执行是有顺序的,而Task的抽象类TaskImpl的实现类,无论是MapTaskImpl,还是ReduceTaskImpl的构造,都是必须先进行的,这样就有一个问题,如果attempts上来就被构造为指定大小的LinkedHashMap,势必会造成空间的浪费,还有性能的消耗,况且,作业执行成功与否,还是后话,而如果我们初始化为Collections.emptyMap(),则很容易解决上面两个问题;

        2、按照常理来说,理想情况下,每个Task应该有且只有一个TaskAttempt,只有当任务运行失败后重试,或开启推测执行机制后为有效加快拖后腿任务的执行而开启的备份任务等情况时,才会存在多个TaskAttempt,而在第一个TaskAttempt被构造时,将attempts由Collections.emptyMap()升级为Collections.singletonMap(),无论是在空间利用、性能上,还是业务逻辑上,都比较贴合实际情况;

        3、再需要重试任务或开启备份任务时,才将attempts由Collections.singletonMap()升级为指定容量的LinkedHashMap,里面有延迟加载的理念;

        4、占用资源越少,性能越高,对于其他作业或任务来说,是一种福音,能够整体提高集群的资源利用效率。

        上述性能和业务逻辑方面的考虑,您或许不以为然,可能觉得性能提升不大,但是如果在大规模集群中,当作业数量庞大、任务数目数量庞大时,这种优势就愈发明显,而它带来的好处,于已,于别的作业来说,都会是一种福音!这种设计上的细节,值得我们学习、借鉴与反思!

        









相关文章
|
4月前
|
存储 Java
map中存储的是引用,而不是对象本身
该内容是关于Java编程中验证Map存储引用而非复制对象的示例。创建大型List导致内存增加,说明List确实占用空间。通过Person类示例,将不同对象放入Map,改变一个对象的属性后,比较原对象与Map中的键值对,发现两者相等,证明Map保存的是对象引用。
59 5
|
10月前
源码分析系列教程(12) - 手写Map框架(基于JDK1.7)
源码分析系列教程(12) - 手写Map框架(基于JDK1.7)
31 0
|
10月前
|
索引
源码分析系列教程(11) - 手写Map框架(基于LinkedList)
源码分析系列教程(11) - 手写Map框架(基于LinkedList)
27 0
|
10月前
|
分布式计算 算法 数据库
32 MAPREDUCE的map端join算法实现
32 MAPREDUCE的map端join算法实现
31 0
|
1月前
|
存储 分布式计算 算法
"揭秘!MapReduce如何玩转压缩文件,让大数据处理秒变‘瘦身达人’,效率飙升,存储不再是烦恼!"
【8月更文挑战第17天】MapReduce作为Hadoop的核心组件,在处理大规模数据集时展现出卓越效能。通过压缩技术减少I/O操作和网络传输的数据量,不仅提升数据处理速度,还节省存储空间。支持Gzip等多种压缩算法,可根据需求选择。示例代码展示了如何配置Map输出压缩,并使用GzipCodec进行压缩。尽管压缩带来CPU负担,但在多数情况下收益大于成本,特别是Hadoop能够自动处理压缩文件,简化开发流程。
29 0
|
3月前
|
存储 分布式计算 DataWorks
MaxCompute产品使用合集之要存储用户的下单所有产品,然后查询时要进行产品分组的,一般这种字段要使用ARRAY还是MAP
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
3月前
|
存储 Java API
深入剖析Java Map:不只是存储数据,更是设计艺术的体现!
【6月更文挑战第18天】Java Map是键值对数据结构的艺术,展示了设计效率与易用性的平衡。HashMap利用哈希表实现快速访问,TreeMap通过红黑树保证排序。选择合适的实现类如HashMap、TreeMap或LinkedHashMap至关重要。注意空指针异常,谨慎在遍历时修改Map。Map的高效使用能提升编程效果。
23 0
|
4月前
|
存储 索引
Map存储两个key:Duplicate key 6
Map存储两个key:Duplicate key 6
163 0
|
4月前
|
分布式计算
MapReduce中的Map和Reduce函数分别是什么作用?
MapReduce中的Map和Reduce函数分别是什么作用?
144 0
|
存储 关系型数据库 Java
Mybatis plus 存储 List、Map
Mybatis plus 存储 List、Map、自定义类型
864 0