Mapreduce和Yarn概念,参数优化,作用,原理,MapReduce计数器 Counter,MapReduce 多job串联之ControlledJob(来自学习资料)

简介: 3.3. MapReduce与YARN3.3.1 YARN概述Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序3.3.2 YARN的重要概念1、  yarn并不清楚用户提交的程序的运行机制2、  yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责

3.3. MapReduce与YARN

3.3.1 YARN概述

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序

3.3.2 YARN的重要概念

1、  yarn并不清楚用户提交的程序的运行机制

2、  yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源)

3、  yarn中的主管角色叫ResourceManager

4、  yarn中具体提供运算资源的角色叫NodeManager

5、  这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序,tez ……

6、  所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可

7、  Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享

 

3.3.3 Yarn中运行运算程序的示例

mapreduce程序的调度过程,如下图

 

 

Mapreduce的其他补充

5.1 计数器应用

在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce框架中提供的全局计数器来实现

示例代码如下:

public class MultiOutputs {

         //通过枚举形式定义自定义计数器

         enum MyCounter{MALFORORMED,NORMAL}

 

         static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

 

                  @Override

                  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

                           String[] words = value.toString().split(",");

 

                           for (String word : words) {

                                    context.write(new Text(word), new LongWritable(1));

                           }

                           //对枚举定义的自定义计数器加1

                           context.getCounter(MyCounter.MALFORORMED).increment(1);

                           //通过动态设置自定义计数器加1

                           context.getCounter("counterGroupa", "countera").increment(1);

                  }

 

         }

 

5.2 多job串联

一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现

 

示例代码:

        ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());

        ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());

        ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());

      

        cJob1.setJob(job1);

        cJob2.setJob(job2);

        cJob3.setJob(job3);

 

        // 设置作业依赖关系

        cJob2.addDependingJob(cJob1);

        cJob3.addDependingJob(cJob2);

 

        JobControl jobControl = new JobControl("RecommendationJob");

        jobControl.addJob(cJob1);

        jobControl.addJob(cJob2);

        jobControl.addJob(cJob3);

 

 

        // 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束

        Thread jobControlThread = new Thread(jobControl);

        jobControlThread.start();

        while (!jobControl.allFinished()) {

            Thread.sleep(500);

        }

        jobControl.stop();

 

        return 0;

 

 Mapreduce参数优化:

mapreduce参数优化

MapReduce重要配置参数

11.1 资源相关参数

//以下参数是在用户自己的mr应用程序中配置就可以生效

(1) mapreduce.map.memory.mb:一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。

(2) mapreduce.reduce.memory.mb:一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。

(3) mapreduce.map.cpu.vcores:每个Map task可使用的最多cpu core数目, 默认值: 1

(4) mapreduce.reduce.cpu.vcores:每个Reduce task可使用的最多cpu core数目, 默认值: 1

 

//shuffle性能优化的关键参数,应在yarn启动之前就配置好

(12) mapreduce.task.io.sort.mb   100        //shuffle的环形缓冲区大小,默认100m

(13) mapreduce.map.sort.spill.percent   0.8   //环形缓冲区溢出的阈值,默认80%

 

 

 

(5) mapreduce.map.java.opts: Map Task的JVM参数,你可以在此配置默认的java heapsize等参数, e.g.

“-Xmx1024m -verbose:gc-Xloggc:/tmp/@taskid@.gc” (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “”

(6) mapreduce.reduce.java.opts: Reduce Task的JVM参数,你可以在此配置默认的java heapsize等参数, e.g.

“-Xmx1024m -verbose:gc-Xloggc:/tmp/@taskid@.gc”, 默认值: “”

 

 

 

 

 

//应该在yarn启动之前就配置在服务器的配置文件中才能生效

(7) yarn.scheduler.minimum-allocation-mb     1024   给应用程序container分配的最小内存

(8) yarn.scheduler.maximum-allocation-mb     8192     给应用程序container分配的最大内存

(9)yarn.scheduler.minimum-allocation-vcores       1      

(10)yarn.scheduler.maximum-allocation-vcores     32

(11)yarn.nodemanager.resource.memory-mb   8192  每台NodeManager最大可用内存

yarn.nodemanager.resource.cpu-vcores    8    每台NodeManager最大可用cpu核数

 

 

11.2 容错相关参数

(1) mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

(2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

(3) mapreduce.map.failures.maxpercent: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。

(4) mapreduce.reduce.failures.maxpercent: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0.

(5) mapreduce.task.timeout: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after300 secsContainer killed by the ApplicationMaster.”。

11.3 本地运行mapreduce 作业

设置以下几个参数:

mapreduce.framework.name=local

mapreduce.jobtracker.address=local

fs.defaultFS=local

11.4 效率和稳定性相关参数

(1) mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false

(2) mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false

(3) mapreduce.job.user.classpath.first& mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。

(4)mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时的最小切片大小,(5)mapreduce.input.fileinputformat.split.maxsize:  FileInputFormat做切片时的最大切片大小

(切片的默认大小就等于blocksize,即 134217728)

 

 



目录
相关文章
|
资源调度 分布式计算 调度
27 MAPREDUCE与YARN
27 MAPREDUCE与YARN
72 0
|
分布式计算 资源调度 大数据
黑马程序员-大数据入门到实战-MapReduce & YARN入门
黑马程序员-大数据入门到实战-MapReduce & YARN入门
159 0
|
分布式计算 Hadoop 大数据
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
181 0
|
分布式计算 Hadoop 大数据
Hadoop学习:深入解析MapReduce的大数据魔力(三)
Hadoop学习:深入解析MapReduce的大数据魔力(三)
110 0
|
3月前
|
分布式计算 资源调度 Hadoop
在YARN集群上运行部署MapReduce分布式计算框架
主要介绍了如何在YARN集群上配置和运行MapReduce分布式计算框架,包括准备数据、运行MapReduce任务、查看任务日志,并启动HistoryServer服务以便于日志查看。
74 0
|
4月前
|
分布式计算 资源调度 监控
MapReduce程序中的主要配置参数详解
【8月更文挑战第31天】
139 0
|
4月前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
618 0
|
4月前
|
SQL 资源调度 数据处理
实时计算 Flink版产品使用问题之-s参数在yarn-session.sh命令中是否有效
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
分布式计算 资源调度 数据处理
YARN支持哪些非基于MapReduce的计算模型?
【6月更文挑战第19天】YARN支持哪些非基于MapReduce的计算模型?
73 11
|
5月前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用问题之在使用Flink on yarn模式进行内存资源调优时,如何进行优化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。