MapReduce执行机制之Map和Reduce源码分析

简介: MapReduce执行机制之Map和Reduce源码分析

1、Mapper 类

 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * 
 * <p>Maps are the individual tasks which transform input records into a 
 * intermediate records. The transformed intermediate records need not be of 
 * the same type as the input records. A given input pair may map to zero or 
 * many output pairs.</p> 
 * 
 * <p>The Hadoop Map-Reduce framework spawns one map task for each 
 * {@link InputSplit} generated by the {@link InputFormat} for the job.
 * <code>Mapper</code> implementations can access the {@link Configuration} for 
 * the job via the {@link JobContext#getConfiguration()}.
 * 
 * <p>The framework first calls 
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
 * {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
 * for each key/value pair in the <code>InputSplit</code>. Finally 
 * {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.</p>
 * 
 * <p>All intermediate values associated with a given output key are 
 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
 * determine the final output. Users can control the sorting and grouping by 
 * specifying two key {@link RawComparator} classes.</p>
 *
 * <p>The <code>Mapper</code> outputs are partitioned per 
 * <code>Reducer</code>. Users can control which keys (and hence records) go to 
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
 * 
 * <p>Users can optionally specify a <code>combiner</code>, via 
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
 * intermediate outputs, which helps to cut down the amount of data transferred 
 * from the <code>Mapper</code> to the <code>Reducer</code>.
 * 
 * <p>Applications can specify if and how the intermediate
 * outputs are to be compressed and which {@link CompressionCodec}s are to be
 * used via the <code>Configuration</code>.</p>
 *  
 * <p>If the job has zero
 * reduces then the output of the <code>Mapper</code> is directly written
 * to the {@link OutputFormat} without sorting by keys.</p>
 * 
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class TokenCounterMapper 
 *     extends Mapper&lt;Object, Text, Text, IntWritable&gt;{
 *    
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   
 *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.write(word, one);
 *     }
 *   }
 * }
 * </pre></blockquote>
 *
 * <p>Applications may override the
 * {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert
 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
 * etc.</p>



将输入键/值对映射到一组中间键/值对。 
   
<p> map是将输入记录转换为a  
中间记录。 转换后的中间记录不必是  
与输入记录的类型相同。 给定的输入对可以映射为零或  
</p> . txt / /输出>  
*  
* <p> Hadoop map - reduce框架为每个映射生成一个映射任务  
* {@link InputFormat}为作业生成的{@link InputSplit}。  
* <code>Mapper</code>实现可以访问{@link Configuration}  
*该任务通过{@link JobContext#getConfiguration()}。  







**   mapper调用流程

* <p>框架首先调用  
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)},然后是  
* {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}  
*为<code>InputSplit</code>. value >中的每个键/值对。 最后  
* {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)}被调用  
*  
* <p>所有与给定输出键相关联的中间值是  
*随后被框架分组,并传递给{@link Reducer}  
*确定最终输出。 用户可以通过控制排序和分组  
</p> . *指定两个键{@link RawComparator}类  


** 1、Partioner分区

* <p> <code>Mapper</code> output are partitioned per . / <p> <code>Mapper</code> output  
* <代码>减速器> < /代码。 用户可以控制去哪个键(以及记录)  
<code>Reducer</code>通过实现一个自定义的{@link Partitioner}。  





*** 2、数据预合并Combiner

* <p>用户可以选择指定一个<code>合成器</code>,通过  
* {@link Job#setCombinerClass(Class)},执行局部聚合  
*中间输出,有助于减少传输的数据量  
从<code>Mapper</code>到<code>Reducer</code>. *  


** 3、压缩Compression
 
* <p>应用程序可以指定是否以及如何使用中间体  
*输出将被压缩,哪个{@link CompressionCodec}将被压缩  
*通过<code>Configuration</code>.</p> . Configuration  

Mapper核心调用在这里插入图片描述

Reducer 类

* Reduces a set of intermediate values which share a key to a smaller set of
 * values.  
 * 
 * <p><code>Reducer</code> implementations 
 * can access the {@link Configuration} for the job via the 
 * {@link JobContext#getConfiguration()} method.</p>

 * <p><code>Reducer</code> has 3 primary phases:</p>
 * <ol>
 *   <li>
 *   
 *   <b id="Shuffle">Shuffle</b>
 *   
 *   <p>The <code>Reducer</code> copies the sorted output from each 
 *   {@link Mapper} using HTTP across the network.</p>
 *   </li>
 *   
 *   <li>
 *   <b id="Sort">Sort</b>
 *   
 *   <p>The framework merge sorts <code>Reducer</code> inputs by 
 *   <code>key</code>s 
 *   (since different <code>Mapper</code>s may have output the same key).</p>
 *   
 *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
 *   being fetched they are merged.</p>
 *      
 *   <b id="SecondarySort">SecondarySort</b>
 *   
 *   <p>To achieve a secondary sort on the values returned by the value 
 *   iterator, the application should extend the key with the secondary
 *   key and define a grouping comparator. The keys will be sorted using the
 *   entire key, but will be grouped using the grouping comparator to decide
 *   which keys and values are sent in the same call to reduce.The grouping 
 *   comparator is specified via 
 *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
 *   controlled by 
 *   {@link Job#setSortComparatorClass(Class)}.</p>


*将一组共享密钥的中间值减少到更小的一组  
*值。  
*  
* < p > <代码>减速器> < /代码的实现  
. *可以访问任务的{@link配置}  
* {@link JobContext#getConfiguration()}方法  
 
<p><code>Reducer</code>有3个主要阶段:</p>  
* < ol >  
李* < >  
*  
改组* < b id = "洗牌" > < / b >  
*  
* <p> <code>Reducer</code>从每个  
* {@link Mapper}使用HTTP跨网络  
李* < / >  
*  
李* < >  
* < b id = "排序" > < / b >排序  
*  
* <p>框架合并排序<代码>Reducer</代码>输入  
* <代码>关键代码> < / s  
*(因为不同的<code>Mapper</code>s可能输出相同的键)  
*  
* <p> shuffle和sort阶段同时发生,即当输出是  
</p> . txt > </p> . txt  
*  
* < b id = " SecondarySort " > SecondarySort < / b >  
*  
* <p>对返回的值进行二级排序  
*迭代器时,应用程序应该使用secondary扩展键  
键并定义一个分组比较器。 索引键将被排序  
*整个键,但将使用分组比较器分组决定  
在同一个reduce调用中发送哪些键和值。 分组  
*比较器通过  
* {@link工作# setGroupingComparatorClass(类)}。 排序顺序是  
*控制  
* {@link工作# setSortComparatorClass(类)}。< / p >  

Reducer核心调用
在这里插入图片描述

大致处理流程
在这里插入图片描述

提交任务执行流程
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

注意:上节 我们讲述了 任务对提交流程

[Mapreduce执行机制之提交任务和切片原理
](https://blog.csdn.net/qq_44787816/article/details/121250334)

这次看看Map和Reduce执行流程

我们的Driver 类

/**
 *  wordCount driver
 */
public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job实例
        Job job = Job.getInstance();

        // 设置 驱动类
        job.setJarByClass(WordCountDriver.class);

        // 关联map和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 设置预聚合combiner
        job.setCombinerClass(WordCountCombiner.class);
        System.out.println();

        // 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 设置分区类,自定义按条件将数据输出分区
        job.setPartitionerClass(CustomPartitioner.class);
        // 设置ReduceTask数量
        job.setNumReduceTasks(4);

        // 设置总输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 设置文件的输出路径和输出路径
        FileInputFormat.setInputPaths(job,"G:\\input\\wordCountInput");
        FileOutputFormat.setOutputPath(job,new Path("G:\\output\\wordOutput11"));

        // 提交job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

partitioner 自定义分区类

在这里插入图片描述
Combiner 预结合类

在这里插入图片描述

Mapper 类

在这里插入图片描述

reducer 类

在这里插入图片描述

如何执行?

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

走到自定义的Mapper
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

重点: 从这里我们可以看到,partitioner的执行时机是在MapTask中context.write()之后 ,将数据输出到collector环形缓存区之前,确定数据的分区,reduceTask之前

现在进入环形缓冲区collector

 public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {
      reporter.progress();

      **  校验keyClass是否与定义的一致
      if (key.getClass() != keyClass) {
        throw new IOException("Type mismatch in key from map: expected "
                              + keyClass.getName() + ", received "
                              + key.getClass().getName());
      }

      **  校验ValueClass是否与定义的一致
      if (value.getClass() != valClass) {
        throw new IOException("Type mismatch in value from map: expected "
                              + valClass.getName() + ", received "
                              + value.getClass().getName());
      }

      **  判断partitioner分区的合法性
      if (partition < 0 || partition >= partitions) {
        throw new IOException("Illegal partition for " + key + " (" +
            partition + ")");
      }

      ** 校验是否出现spill溢写异常
      checkSpillException();
      bufferRemaining -= METASIZE;
      
      ** 判断环形缓冲区, 缓存是否80M已经用完,如果用完,开始溢写
      if (bufferRemaining <= 0) {
        // start spill if the thread is not running and the soft limit has been
        **  使用可重入锁ReentrantLock 锁住这段代码
        // reached   
        spillLock.lock();
        try {
          do {
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex;
              final int kvbend = 4 * kvend;
              // serialized, unspilled bytes always lie between kvindex and
              // bufindex, crossing the equator. Note that any void space
              // created by a reset must be included in "used" bytes
              final int bUsed = distanceTo(kvbidx, bufindex);
              final boolean bufsoftlimit = bUsed >= softLimit;
              if ((kvbend + METASIZE) % kvbuffer.length !=
                  equator - (equator % METASIZE)) {
                // spill finished, reclaim space
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                    softLimit - bUsed) - METASIZE;
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                // spill records, if any collected; check latter, as it may
                // be possible for metadata alignment to hit spill pcnt
                startSpill();
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                // leave at least half the split buffer for serialization data
                // ensure that kvindex >= bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * METASIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                // bytes remaining before the lock must be held and limits
                // checked is the minimum of three arcs: the metadata space, the
                // serialization space, and the soft limit
                bufferRemaining = Math.min(
                    // metadata max
                    distanceTo(bufend, newPos),
                    Math.min(
                      // serialization max
                      distanceTo(newPos, serBound),
                      // soft limit
                      softLimit)) - 2 * METASIZE;
              }
            }
          } while (false);
        } finally {

            ** 释放锁
          spillLock.unlock();
        }
      }

      try {
        
        ** 序列化key
        // serialize key bytes into buffer
        int keystart = bufindex;
        keySerializer.serialize(key);
        if (bufindex < keystart) {
          // wrapped the key; must make contiguous
          bb.shiftBufferedKey();
          keystart = 0;
        }
        ** 序列化value
        // serialize value bytes into buffer
        final int valstart = bufindex;
        valSerializer.serialize(value);
        // It's possible for records to have zero length, i.e. the serializer
        // will perform no writes. To ensure that the boundary conditions are
        // checked and that the kvindex invariant is maintained, perform a
        // zero-length write into the buffer. The logic monitoring this could be
        // moved into collect, but this is cleaner and inexpensive. For now, it
        // is acceptable.
        
        ** 将数据写入缓冲区
        bb.write(b0, 0, 0);

        // the record must be marked after the preceding write, as the metadata
        // for this record are not yet written
        int valend = bb.markRecord();

        mapOutputRecordCounter.increment(1);
        mapOutputByteCounter.increment(
            distanceTo(keystart, valend, bufvoid));

        // write accounting info
        kvmeta.put(kvindex + PARTITION, partition);
        kvmeta.put(kvindex + KEYSTART, keystart);
        kvmeta.put(kvindex + VALSTART, valstart);
        kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
        // advance kvindex
        kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
      } catch (MapBufferTooSmallException e) {
        LOG.info("Record too large for in-memory buffer: " + e.getMessage());
        spillSingleRecord(key, value, partition);
        mapOutputRecordCounter.increment(1);
        return;
      }
    }

1、bufferRemaining 默认80M,环形缓冲区,溢写阈值
在这里插入图片描述

mapTask循环将数据写入到环形缓冲区之后,自定义map走完,关闭环形缓冲区

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions
      final long size = distanceTo(bufstart, bufend, bufvoid) +
                  partitions * APPROX_HEADER_LENGTH;
      FSDataOutputStream out = null;
      FSDataOutputStream partitionOut = null;
      try {
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename =
            mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);

        final int mstart = kvend / NMETA;
        final int mend = 1 + // kvend is a valid record
          (kvstart >= kvend
          ? kvstart
          : kvmeta.capacity() + kvstart) / NMETA;
        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
        int spindex = mstart;
        final IndexRecord rec = new IndexRecord();
        final InMemValBytes value = new InMemValBytes();
        for (int i = 0; i < partitions; ++i) {
          IFile.Writer<K, V> writer = null;
          try {
            long segmentStart = out.getPos();
            partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
            writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
                                      spilledRecordsCounter);

            ** 判断是否定义了Combiner,我们自己定义了
            if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < mend &&
                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                final int kvoff = offsetFor(spindex % maxRec);
                int keystart = kvmeta.get(kvoff + KEYSTART);
                int valstart = kvmeta.get(kvoff + VALSTART);
                key.reset(kvbuffer, keystart, valstart - keystart);
                getVBytesForOffset(kvoff, value);
                writer.append(key, value);
                ++spindex;
              }
            } else {
              int spstart = spindex;
              while (spindex < mend &&
                  kvmeta.get(offsetFor(spindex % maxRec)
                            + PARTITION) == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                
                **  走自定义的combiner
                combinerRunner.combine(kvIter, combineCollector);
              }
            }

            // close the writer
            writer.close();
            if (partitionOut != out) {
              partitionOut.close();
              partitionOut = null;
            }

            // record offsets
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
            rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
            spillRec.putIndex(rec, i);

            writer = null;
          } finally {
            if (null != writer) writer.close();
          }
        }

        if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
          // create spill index file
          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;
      } finally {
        if (out != null) out.close();
        if (partitionOut != null) {
          partitionOut.close();
        }
      }
    }

走到我们自定义的Combiner

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

重点:我们从这里可以看到combiner的执行时机: 环形缓冲区的数据 排序溢写 到 本地临时文件之前,注意:此时数据已经按Key排序好了,数据溢写到本地文件之前,对数据进行一个提前预聚合combiner。

在这里插入图片描述

注意:如果使用了combiner,就不要使用自定义的Reducer不然会导致最后没有数据

在这里插入图片描述

接下来走到我们自定义Reducer

在这里插入图片描述
initialize
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

第一阶段 COPY
在这里插入图片描述
在这里插入图片描述

第二阶段sort
在这里插入图片描述

第三阶段调用reduce
在这里插入图片描述
JobRunner就是我们之前确定的clientProtocol,它分别调用mapTask的run(),和ReduceTask的Run()
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

最终输出的结果
在这里插入图片描述
我们定义了4个分区,一个分区一个ReduceTask处理数据,而一个reduceTask会输出到一个文件,因此4个文件

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在看看我们的输入文件,统计单词个数
在这里插入图片描述

整个流程如下:

请添加图片描述
请添加图片描述

相关文章
|
5月前
|
分布式计算 算法 数据库
32 MAPREDUCE的map端join算法实现
32 MAPREDUCE的map端join算法实现
19 0
|
3月前
|
分布式计算
MapReduce中的Map和Reduce函数分别是什么作用?
MapReduce中的Map和Reduce函数分别是什么作用?
42 0
|
5月前
|
存储 SQL 分布式计算
31 MAPREDUCE的reduce端join算法实现
31 MAPREDUCE的reduce端join算法实现
19 0
|
5月前
|
缓存 分布式计算
25 MAPREDUCE的shuffle机制
25 MAPREDUCE的shuffle机制
19 0
|
11月前
|
分布式计算 Hadoop
Hadoop框架下MapReduce中的map个数如何控制
Hadoop框架下MapReduce中的map个数如何控制
85 0
|
分布式计算 Hadoop
Hadoop学习:MapReduce不使用Reduce将表合并提高效率
Hadoop学习:MapReduce不使用Reduce将表合并提高效率
113 0
|
分布式计算 Java
Mapreduce执行机制之提交任务和切片原理
Mapreduce执行机制之提交任务和切片原理
|
存储 分布式计算 Hadoop
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
|
存储 XML 缓存
Hadoop中的MapReduce框架原理、Job提交流程源码断点在哪断并且介绍相关源码、切片与MapTask并行度决定机制、MapTask并行度决定机制
Hadoop中的MapReduce框架原理、Job提交流程源码断点在哪断并且介绍相关源码、切片与MapTask并行度决定机制、MapTask并行度决定机制
Hadoop中的MapReduce框架原理、Job提交流程源码断点在哪断并且介绍相关源码、切片与MapTask并行度决定机制、MapTask并行度决定机制
|
分布式计算 资源调度 Hadoop
二十四、MapReduce工作机制
二十四、MapReduce工作机制
二十四、MapReduce工作机制