Hadoop旧mapreduce的map任务切分原理

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/51395043 前言 最近在工作过程中接触一些Hive数据仓库中的表,这些表实际是从关系型数据库通过Sqoop抽到Hive的。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/51395043

前言

最近在工作过程中接触一些Hive数据仓库中的表,这些表实际是从关系型数据库通过Sqoop抽到Hive的。在开发过程中对map任务的划分进行性能调优,发现mapreduce中关于FileInputFormat的参数调整都不起作用,最后发现这些老任务都是用旧版的mapreduce开发的,于是顺便研究下旧版mapreduce的任务划分策略。有关新版mapreduce的任务划分策略,大家可以参考我之前的博文《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》。

源码分析

根据《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》一文的内容,我们知道map任务的划分关键在于FileInputFormat的getSplits方法的实现策略,现在我们来看看其源码:

  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    Stopwatch sw = new Stopwatch().start();
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits.toArray(new FileSplit[splits.size()]);
  }

  protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

这里对以上代码的划分策略进行整理:

  1. 遍历当前作业的所有输入文件,然后将累积这些文件的字节数并保存到变量totalSize中;
  2. 如果用户指定了mapreduce.job.maps参数,那么这个参数会被保存在入参numSplits中;
  3. 用户想要通过numSplits控制map任务的数量,那么需求对totalSize进行平分,以便确定每个map任务划分的输入大小。这个计算很简单,即使用totalSize除以numSplits,最后得到的目标划分大小存储在变量goalSize中;
  4. 常量SPLIT_MINSIZE实际是由参数mapreduce.input.fileinputformat.split.minsize来控制的,如果没有配置则默认是1。minSplitSize默认是1,切旧版FileIntputFormat没有设置此变量的地方。最后取SPLIT_MINSIZE和minSplitSize的最大值,并保存在变量minSize中;
  5. 遍历当前作业的每个输入文件,计算每个输入文件,将被划分的任务数量,最后将每个文件划分的任务数量合并起来就是整个作业划分的任务数量。
以上只是总体分析了作业的任务划分,有关每个输入文件的任务数量划分步骤如下:
  1. 判断文件的大小,只有文件字节数大于0才是有意义的;
  2. 判断文件是否是可以切分的,只有能够切分的文件才会继续进行任务数量划分;
  3. 调用文件的getBlockSize方法,获取文件的块大小并存储在变量blockSize中;
  4. 调用computeSplitSize方法计算最后划分给每个任务的输入大小,并保存在splitSize中。计算公式为:splitSize = max(minSize, min(goalSize, blockSize));
  5. 将文件按照splitSize的大小进行划分,不足splitSize大小的也算作一个任务划分数。

总结

根据以上分析发现旧版mapreduce和新版mapreduce的FileIntputFormat关于map任务数量划分的实现逻辑不同,在对它们进行开发和性能优化时要特别注意。



后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。


京东:http://item.jd.com/11846120.html 

当当:http://product.dangdang.com/23838168.html 


相关文章
|
7月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
86 1
|
7月前
|
存储 分布式计算 Hadoop
Hadoop【基础知识 01】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 01】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)
254 3
|
2月前
|
分布式计算 负载均衡 算法
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
34 1
|
2月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
50 1
|
2月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
46 1
|
2月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
58 1
|
7月前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
344 2
|
7月前
|
存储 分布式计算 关系型数据库
bigdata-08-MapReduce原理到实战
bigdata-08-MapReduce原理到实战
88 0
|
7月前
|
分布式计算
MapReduce中的Map和Reduce函数分别是什么作用?
MapReduce中的Map和Reduce函数分别是什么作用?
486 0
|
7月前
|
分布式计算 并行计算 数据处理
什么是MapReduce?请简要解释其工作原理。
什么是MapReduce?请简要解释其工作原理。
118 0

相关实验场景

更多