记Hadoop2.5.0线上mapreduce任务执行map任务划分的一次问题解决

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/51397729 前言 近日在线上发现有些mapreduce作业的执行时间很长,我们需要解决这个问题。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/51397729

前言

近日在线上发现有些mapreduce作业的执行时间很长,我们需要解决这个问题。输入文件的大小是5G,采用了lzo压缩,整个集群的默认block大小是128M。本文将详细描述这次线上问题的排查过程。

现象

线上有一个脚本,为了便于展示,我将这个脚本重新copy了一份并重命名为zzz。这个脚本实际是使用Hadoop streaming运行一个mapreduce任务,在线上执行它的部分输出内容如下:


可以看到map任务划分为1个。这个执行过程十分漫长,我将中间的一些信息省略,map与reduce任务的执行进度如下:

16/05/16 10:22:16 INFO mapreduce.Job:  map 0% reduce 0%
16/05/16 10:22:32 INFO mapreduce.Job:  map 1% reduce 0%
。。。
16/05/16 10:44:14 INFO mapreduce.Job:  map 99% reduce 0%
16/05/16 10:44:20 INFO mapreduce.Job:  map 100% reduce 0%
16/05/16 10:44:33 INFO mapreduce.Job:  map 100% reduce 2%
16/05/16 10:44:34 INFO mapreduce.Job:  map 100% reduce 19%
。。。
16/05/16 10:44:57 INFO mapreduce.Job:  map 100% reduce 99%
16/05/16 10:44:58 INFO mapreduce.Job:  map 100% reduce 100%
从以上内容可以看到map任务执行一共耗时22分钟左右,而reduce任务只耗用了30多秒。

分析

根据以上现象分析,我们知道耗时主要发生在map任务执行的阶段。我们首先查看下这个map任务的输入内容,看到它的大小为5GB且使用lzo压缩:


如此大的输入仅仅在一个map任务中处理显然是进度缓慢的主要原因,我们需要对mapreduce的任务划分进行干预。我们查看下mapreduce任务的InputFormat,以便确定干预的手段,打开我们的脚本其中有以下代码片段:

     hadoop jar $streaming_jar \
          -D mapred.reduce.tasks=30 \
          -D mapreduce.job.maps=100 \
          -D mapreduce.input.fileinputformat.split.minsize=100000000 \
          -inputformat TextInputFormat \
          -file mr.py \
          -input $member_input \
          -output $output \
          -mapper "python mr.py map" \
          -reducer "python mr.py reduce"
我们看到甚至在脚本中还配置了mapreduce.job.maps,根据《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》一文的分析,我们知道此参数实际不会对map任务划分产生任何影响。查看到mapreduce作业的input format是TextInputFormat,TextInputFormat的实现见代码清单1。

代码清单1 TextInputFormat的实现

/** An {@link InputFormat} for plain text files.  Files are broken into lines.
 * Either linefeed or carriage-return are used to signal end of line.  Keys are
 * the position in the file, and values are the line of text.. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  //省略无关代码

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    final CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }

}

我们看到TextInputFormat继承了FileInputFormat,因而根据《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》一文的内容,真正影响使用FileInputFormat的map任务划分的参数有:

  • dfs.blockSize
  • mapreduce.input.fileinputformat.split.minsize
  • mapreduce.input.fileinputformat.split.maxsize
我们首先来看看dfs.blockSize的大小,由于参数使用了Hadoop集群的默认配置,查看信息如下:


可以看到blockSize的大小是128M,我们不太可能为了一个mapreduce任务去更改整个Hadoop集群的配置,所以我们的重点讲放在后两个参数上。按照《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》一文的分析,我对mapreduce.input.fileinputformat.split.minsize和mapreduce.input.fileinputformat.split.maxsize的大小进行了各种尝试,但在运行的过程中发现map任务数量依然只有一个。难道我之前文章的内容分析的有问题?我一度陷入困境。
我休息了一会,重新查看代码清单1,发现TextInputFormat的isSplitable方法。isSplitable方法用于判断TextInputFormat是否可以进行map任务划分。由于输入文件是经过sqoop从关系型数据库抽取的,采用了lzo进行压缩,而Hadoop默认不支持压缩算法lzo,需要单独安装hadoop-lzo,查看Hadoop集群配置,发现我们之前已经做好了这方面的工作。

<property>
  <name>io.compression.codecs</name>
  <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.SnappyCodec</value>
  <source>core-site.xml</source>
</property>
这说明TextInputFormat的isSplitable方法在从压缩算法工厂类CompressionCodecFactory中获取到的CompressionCodec不为null,那么TextInputFormat是否支持map任务划分就取决于com.hadoop.compression.lzo.LzoCodec是否实现了SplittableCompressionCodec接口。我们看看com.hadoop.compression.lzo.LzoCodec的实现:
/**
 * A {@link org.apache.hadoop.io.compress.CompressionCodec} for a streaming
 * <b>lzo</b> compression/decompression pair.
 * http://www.oberhumer.com/opensource/lzo/
 *
 */
public class LzoCodec extends org.anarres.lzo.hadoop.codec.LzoCodec {
}
看来com.hadoop.compression.lzo.LzoCodec继承了org.anarres.lzo.hadoop.codec.LzoCodec,再来看看org.anarres.lzo.hadoop.codec.LzoCodec:
/**
 * A {@link org.apache.hadoop.io.compress.CompressionCodec} for a streaming
 * <b>lzo</b> compression/decompression pair.
 * http://www.oberhumer.com/opensource/lzo/
 * 
 */
public class LzoCodec extends Configured implements CompressionCodec {
可以看到org.anarres.lzo.hadoop.codec.LzoCodec直接实现了CompressionCodec,并没有实现SplittableCompressionCodec接口,SplittableCompressionCodec接口实际也继承了CompressionCodec接口:
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface SplittableCompressionCodec extends CompressionCodec {
因此codec instanceof SplittableCompressionCodec这条Java语句将返回false,采用lzo压缩算法的输入文件将导致map任务不可划分,也就是不会生成多个map任务。


解决方法

使用hadoop-lzo-0.4.20-SNAPSHOT.jar提供的LzoTextInputFormat类,它实现了SplittableCompressionCodec接口。



后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。


京东:http://item.jd.com/11846120.html 

当当:http://product.dangdang.com/23838168.html 



相关文章
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
83 2
|
23天前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
119 3
|
2月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
52 1
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
59 1
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
110 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
51 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
60 0
|
4月前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
679 0
|
6月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
68 1

相关实验场景

更多