Mapreduce执行机制之提交任务和切片原理

简介: Mapreduce执行机制之提交任务和切片原理

1、Mapper 类

 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * 
 * <p>Maps are the individual tasks which transform input records into a 
 * intermediate records. The transformed intermediate records need not be of 
 * the same type as the input records. A given input pair may map to zero or 
 * many output pairs.</p> 
 * 
 * <p>The Hadoop Map-Reduce framework spawns one map task for each 
 * {@link InputSplit} generated by the {@link InputFormat} for the job.
 * <code>Mapper</code> implementations can access the {@link Configuration} for 
 * the job via the {@link JobContext#getConfiguration()}.
 * 
 * <p>The framework first calls 
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
 * {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
 * for each key/value pair in the <code>InputSplit</code>. Finally 
 * {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.</p>
 * 
 * <p>All intermediate values associated with a given output key are 
 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
 * determine the final output. Users can control the sorting and grouping by 
 * specifying two key {@link RawComparator} classes.</p>
 *
 * <p>The <code>Mapper</code> outputs are partitioned per 
 * <code>Reducer</code>. Users can control which keys (and hence records) go to 
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
 * 
 * <p>Users can optionally specify a <code>combiner</code>, via 
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
 * intermediate outputs, which helps to cut down the amount of data transferred 
 * from the <code>Mapper</code> to the <code>Reducer</code>.
 * 
 * <p>Applications can specify if and how the intermediate
 * outputs are to be compressed and which {@link CompressionCodec}s are to be
 * used via the <code>Configuration</code>.</p>
 *  
 * <p>If the job has zero
 * reduces then the output of the <code>Mapper</code> is directly written
 * to the {@link OutputFormat} without sorting by keys.</p>
 * 
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class TokenCounterMapper 
 *     extends Mapper&lt;Object, Text, Text, IntWritable&gt;{
 *    
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   
 *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.write(word, one);
 *     }
 *   }
 * }
 * </pre></blockquote>
 *
 * <p>Applications may override the
 * {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert
 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
 * etc.</p>



将输入键/值对映射到一组中间键/值对。 
   
<p> map是将输入记录转换为a  
中间记录。 转换后的中间记录不必是  
与输入记录的类型相同。 给定的输入对可以映射为零或  
</p> . txt / /输出>  
*  
* <p> Hadoop map - reduce框架为每个映射生成一个映射任务  
* {@link InputFormat}为作业生成的{@link InputSplit}。  
* <code>Mapper</code>实现可以访问{@link Configuration}  
*该任务通过{@link JobContext#getConfiguration()}。  







**   mapper调用流程

* <p>框架首先调用  
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)},然后是  
* {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}  
*为<code>InputSplit</code>. value >中的每个键/值对。 最后  
* {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)}被调用  
*  
* <p>所有与给定输出键相关联的中间值是  
*随后被框架分组,并传递给{@link Reducer}  
*确定最终输出。 用户可以通过控制排序和分组  
</p> . *指定两个键{@link RawComparator}类  


** 1、Partioner分区

* <p> <code>Mapper</code> output are partitioned per . / <p> <code>Mapper</code> output  
* <代码>减速器> < /代码。 用户可以控制去哪个键(以及记录)  
<code>Reducer</code>通过实现一个自定义的{@link Partitioner}。  





*** 2、数据预合并Combiner

* <p>用户可以选择指定一个<code>合成器</code>,通过  
* {@link Job#setCombinerClass(Class)},执行局部聚合  
*中间输出,有助于减少传输的数据量  
从<code>Mapper</code>到<code>Reducer</code>. *  


** 3、压缩Compression
 
* <p>应用程序可以指定是否以及如何使用中间体  
*输出将被压缩,哪个{@link CompressionCodec}将被压缩  
*通过<code>Configuration</code>.</p> . Configuration  

Mapper核心调用在这里插入图片描述

Reducer 类

* Reduces a set of intermediate values which share a key to a smaller set of
 * values.  
 * 
 * <p><code>Reducer</code> implementations 
 * can access the {@link Configuration} for the job via the 
 * {@link JobContext#getConfiguration()} method.</p>

 * <p><code>Reducer</code> has 3 primary phases:</p>
 * <ol>
 *   <li>
 *   
 *   <b id="Shuffle">Shuffle</b>
 *   
 *   <p>The <code>Reducer</code> copies the sorted output from each 
 *   {@link Mapper} using HTTP across the network.</p>
 *   </li>
 *   
 *   <li>
 *   <b id="Sort">Sort</b>
 *   
 *   <p>The framework merge sorts <code>Reducer</code> inputs by 
 *   <code>key</code>s 
 *   (since different <code>Mapper</code>s may have output the same key).</p>
 *   
 *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
 *   being fetched they are merged.</p>
 *      
 *   <b id="SecondarySort">SecondarySort</b>
 *   
 *   <p>To achieve a secondary sort on the values returned by the value 
 *   iterator, the application should extend the key with the secondary
 *   key and define a grouping comparator. The keys will be sorted using the
 *   entire key, but will be grouped using the grouping comparator to decide
 *   which keys and values are sent in the same call to reduce.The grouping 
 *   comparator is specified via 
 *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
 *   controlled by 
 *   {@link Job#setSortComparatorClass(Class)}.</p>


*将一组共享密钥的中间值减少到更小的一组  
*值。  
*  
* < p > <代码>减速器> < /代码的实现  
. *可以访问任务的{@link配置}  
* {@link JobContext#getConfiguration()}方法  
 
<p><code>Reducer</code>有3个主要阶段:</p>  
* < ol >  
李* < >  
*  
改组* < b id = "洗牌" > < / b >  
*  
* <p> <code>Reducer</code>从每个  
* {@link Mapper}使用HTTP跨网络  
李* < / >  
*  
李* < >  
* < b id = "排序" > < / b >排序  
*  
* <p>框架合并排序<代码>Reducer</代码>输入  
* <代码>关键代码> < / s  
*(因为不同的<code>Mapper</code>s可能输出相同的键)  
*  
* <p> shuffle和sort阶段同时发生,即当输出是  
</p> . txt > </p> . txt  
*  
* < b id = " SecondarySort " > SecondarySort < / b >  
*  
* <p>对返回的值进行二级排序  
*迭代器时,应用程序应该使用secondary扩展键  
键并定义一个分组比较器。 索引键将被排序  
*整个键,但将使用分组比较器分组决定  
在同一个reduce调用中发送哪些键和值。 分组  
*比较器通过  
* {@link工作# setGroupingComparatorClass(类)}。 排序顺序是  
*控制  
* {@link工作# setSortComparatorClass(类)}。< / p >  

Reducer核心调用
在这里插入图片描述

大致处理流程
在这里插入图片描述

提交任务执行流程
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
通过比较来确定ClientProtocol
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

以下截图为 submitJobInternal内容

1、上传jar到集群的临时目录
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
将这些jar,文件,achieve上传到上面的临时路径
在这里插入图片描述
在这里插入图片描述

我这里呢,没有jar,achieve所以这里是空的

2、根据job对文件进行逻辑分片

在这里插入图片描述
在这里插入图片描述

重点1:使用 哪种 InputFormat 进行数据读取

在这里插入图片描述
在这里插入图片描述

public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    
    ** 确定minSize = 1 , maxSize=long最大值
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);

     。。。

    ** 遍历输入路径的所有文件
    for (FileStatus file: files) {
    
      if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        
        
        ** 判断文件是否可分割
        if (isSplitable(job, path)) {
          ** 获取默认blockSize = 32M,集群默认128M,集群可配
          long blockSize = file.getBlockSize();
          ** 通过blockSize,minSize,maxSize计算分片大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          ** 文件字节数
          long bytesRemaining = length;
          **  文件字节数/32M > 1.1 , 继续分片
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
             ** 确定分片,确定分片末尾索引
               **  例如 40M-32M=8M, 获取32M的索引,拿出这一部分作为一个分片文件
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          if (LOG.isDebugEnabled()) {
            // Log only if the file is big enough to be splitted
            if (length > Math.min(file.getBlockSize(), minSize)) {
              LOG.debug("File is not splittable so no parallelization "
                  + "is possible: " + file.getPath());
            }
          }
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }

    ** 返回分片数量
    return splits;
  }

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
任务提交之后,会用一个YarnRunner或者LocalRunner 运行任务,调用map.run,让mapTask执行

在这里插入图片描述
在这里插入图片描述
而jobRunner不就是我们之前确定的ClientProtocal嘛嘛嘛嘛嘛嘛

在这里插入图片描述

相关文章
|
5月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
71 1
|
5月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
205 0
|
14天前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
21 1
|
14天前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
34 1
|
4月前
|
数据采集 SQL 分布式计算
|
5月前
|
SQL 分布式计算 数据处理
【Hive】所有的Hive任务都会有MapReduce的执行吗?
【4月更文挑战第17天】【Hive】所有的Hive任务都会有MapReduce的执行吗?
|
5月前
|
存储 分布式计算 关系型数据库
bigdata-08-MapReduce原理到实战
bigdata-08-MapReduce原理到实战
70 0
|
5月前
|
分布式计算 并行计算 数据处理
什么是MapReduce?请简要解释其工作原理。
什么是MapReduce?请简要解释其工作原理。
86 0
|
11月前
|
机器学习/深度学习 分布式计算 大数据
大数据 - MapReduce:从原理到实战的全面指南
大数据 - MapReduce:从原理到实战的全面指南
1209 0
|
11月前
|
分布式计算 Java Hadoop
70 Azkaban MAPREDUCE任务
70 Azkaban MAPREDUCE任务
36 0