MapReduce入门(一篇就够了)(下)

简介: MapReduce入门(一篇就够了)(下)

3.2.3 ReduceTask 并行度决定机制

ReduceTask的并行度同样影响整个job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

//默认值是1,手动设置为4
job.setNumReduceTasks(4);

注意:

  • 如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜。
  • ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask
  • 尽量不要运行太多的ReduceTask,对大多数job来说,最好reduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小 ,这个对于小集群而言,尤其重要。

3.2.4 Shuffle机制

Shuffle机制:指的是map阶段处理的数据传递给reduce的一个流程。核心是数据分区、排序及缓存,就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序;

Shuffle分为3个步骤操作:

  1. 分区partition
  2. Sort根据key排序
  3. Combiner进行局部value的合并

详细流程:

  1. MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中;
  2. 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件;
  3. 多个溢出文件会被合并成大的溢出文件;
  4. 在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序;
  5. ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据;
  6. ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序);
  7. 合并成大文件后,shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)。

Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快 。

缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100M。

3.2.5 Combiner使用原则

Combiner的使用要非常谨慎,因为CombinerMapReduce过程中可能调用也肯能不调用,可能调一次也可能调多次。

所以Combiner使用的原则是,有或没有都不能影响业务逻辑。

  1. combiner是MR程序中Mapper和Reducer之外的一种组件。
  2. combiner组件的父类就是Reducer
  3. combiner和reducer的区别在于运行的位置:
    |__3.1、Combiner是在每一个maptask所在的节点运行
    |__3.2、Reducer是接收全局所有Mapper的输出结果;
  4. combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量,具体实现步骤:
    |___4.1、自定义一个combiner继承Reducer,重写reduce方法
    |___4.2 、在job中设置:job.setCombinerClass(CustomCombiner.class)
  5. combiner能够应用的前提是不能影响最终的业务逻辑,而且combiner的输出kv应该跟reducer的输入kv类型要对应起来。

04 MapReduce 编程

4.1 MapReduce 编程规范

作为程序员,对MapReduce这一块,用户编写的程序分成三个部分:MapperReducerDriver,提交运行MR程序客户端。

Mapper部分:

  1. Mapper的输入数据是KV对的形式(KV的类型可自定义);
  2. Mapper的输出数据是KV对的形式(KV的类型可自定义);
  3. Mapper中的业务逻辑写在map()方法中;
  4. map()方法(MapTask进程)对每一个<K,V>调用一次。

Reducer部分:

  1. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  2. Reducer的业务逻辑写在reduce()方法中;
  3. Reducetask进程对每一组相同k<k,v>组调用一次reduce()方法;
  4. 用户自定义的MapperReducer都要继承各自的父类。

Drvier部分: 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象。

4.2 WordCount示例

需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数。

Step1:定义一个mapper类

//首先要定义四个泛型的类型
//key in:  LongWritable    value in: Text
//key out: Text            value out:IntWritable
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  //map方法的生命周期:  框架每传一行数据就被调用一次
  //key :  这一行的起始点在文件中的偏移量
  //value: 这一行的内容
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //拿到一行数据转换为string
    String line = value.toString();
    //将这一行切分出各个单词
    String[] words = line.split(" ");
    //遍历数组,输出<单词,1>
    for(String word:words){
      context.write(new Text(word), new IntWritable(1));
    }
  }
}

Step2:定义一个Reducer类

//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  //定义一个计数器
  int count = 0;
  //遍历这一组kv的所有v,累加到count中
  for(IntWritable value:values){
    count += value.get();
  }
  context.write(key, new IntWritable(count));
}
}

Step3:定义一个主类,用来描述job并提交job

public class WordCountRunner {
  //把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里……)描述成一个job对象
  //把这个描述好的job提交给集群去运行
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job wcjob = Job.getInstance(conf);
    //指定我这个job所在的jar包
//    wcjob.setJar("/home/hadoop/wordcount.jar");
    wcjob.setJarByClass(WordCountRunner.class);
    wcjob.setMapperClass(WordCountMapper.class);
    wcjob.setReducerClass(WordCountReducer.class);
    //设置我们的业务逻辑Mapper类的输出key和value的数据类型
    wcjob.setMapOutputKeyClass(Text.class);
    wcjob.setMapOutputValueClass(IntWritable.class);
    //设置我们的业务逻辑Reducer类的输出key和value的数据类型
    wcjob.setOutputKeyClass(Text.class);
    wcjob.setOutputValueClass(IntWritable.class);
    //指定要处理的数据所在的位置
    FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");
    //指定处理完成之后的结果所保存的位置
    FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));
    //向yarn集群提交这个job
    boolean res = wcjob.waitForCompletion(true);
    System.exit(res?0:1);
  }

05 MapReduce 运行模式

5.1 本地运行模式

MapReduce本地运行模式如下:

  • MapReduce程序是被提交给LocalJobRunner在本地以单进程的形式运行;
  • 处理的数据及输出结果可以在本地文件系统,也可以在HDFS上;
  • 如果要在本地运行,写一个程序,不要带集群的配置文件(本质是你的mr程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname参数);
  • 本地模式非常便于进行业务逻辑的debug,只要在开发环境中打断点即可。

如果在windows下想运行本地模式来测试程序逻辑,需要在windows中配置环境变量,并且要将d:/hadoop-2.6.1libbin目录替换成windows平台编译的版本:

%HADOOP_HOME%  =  d:/hadoop-2.6.1
%PATH% =  %HADOOP_HOME%\bin
• 1
• 2

5.2 集群运行模式

Step1:将MapReduce程序提交给yarn集群ResourceManager,分发到很多的节点上并发执行;

Step2:处理的数据和输出结果应该位于HDFS文件系统;

Step3:提交集群的实现步骤

  1. 将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动(命令: $ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver inputpath outputpath
    );
  2. 直接运行main方法(项目中要带参数:mapreduce.framework.name=yarn以及yarn的两个基本配置);
  3. 如果要在开发环境提交job给集群,则要修改YarnRunner类。

06 MapReduce 参数优化

6.1 资源相关参数

  • mapreduce.map.memory.mb: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。
  • mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。
  • mapreduce.map.java.opts: Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “”
  • mapreduce.reduce.java.opts: Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默认值: “”
  • mapreduce.map.cpu.vcores: 每个Map task可使用的最多cpu core数目, 默认值: 1

6.2 容错相关参数

  • mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
  • mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
  • mapreduce.map.failures.maxpercent: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。
  • mapreduce.reduce.failures.maxpercent: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0.
  • mapreduce.task.timeout: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

6.3 本地运行参数

设置以下几个参数:

mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local

6.4 效率和稳定性相关参数

  • mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false
  • mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false
  • mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。
  • mapreduce.input.fileinputformat.split.minsize: 每个Map Task处理的数据量(仅针对基于文件的Inputformat有效,比如TextInputFormat,SequenceFileInputFormat),默认为一个block大小,即 134217728。

07 MapReduce应用案例

之前写过MapReduce的应用案例,有兴趣的同学可以参阅下:

08 文末

本文主要讲解了MapReduce的基本概念,谢谢大家的阅读,本文完!

目录
相关文章
|
分布式计算 资源调度 大数据
黑马程序员-大数据入门到实战-MapReduce & YARN入门
黑马程序员-大数据入门到实战-MapReduce & YARN入门
138 0
|
缓存 分布式计算 资源调度
MapReduce入门(一篇就够了)
MapReduce入门(一篇就够了)
5142 0
MapReduce入门(一篇就够了)
|
存储 分布式计算 大数据
大数据MapReduce入门
大数据MapReduce入门
70 0
|
分布式计算 算法 Java
MapReduce入门编程-成绩求和排序
MapReduce入门编程-成绩求和排序
MapReduce入门编程-成绩求和排序
|
分布式计算 Java Hadoop
java:MapReduce原理及入门实例:wordcount
java:MapReduce原理及入门实例:wordcount
177 0
java:MapReduce原理及入门实例:wordcount
|
资源调度 分布式计算 Java
MapReduce入门例子之WordCount单词计数
MapReduce入门例子之WordCount单词计数
162 0
MapReduce入门例子之WordCount单词计数
|
分布式计算 Java Hadoop
java:MapReduce原理及入门实例:wordcount
java:MapReduce原理及入门实例:wordcount
298 0
java:MapReduce原理及入门实例:wordcount
|
分布式计算 大数据 Hadoop
史上最快! 10小时大数据入门实战(五)-分布式计算框架MapReduce
目录 1 MapReduce概述 2 MapReduce编程模型之通过wordcount词频统计分析案例入门 MapReduce执行流程 InputFormat OutputFormat OutputFormt接口决定了在哪里以及怎样持久化作业结果。
1355 0