MapReduce计算框架

简介: MapReduce计算框架 一、MapReduce实现原理   图展示了MapReduce实现中的全部流程,处理步骤如下:   1、用户程序中的MapReduce函数库首先把输入文件分成M块(每块大小默认64M),在集群上执行处理程序,见序号1   2、主控程序master分配Map任务和Reduce任务给工作执行机器worker。

MapReduce计算框架

一、MapReduce实现原理

  图展示了MapReduce实现中的全部流程,处理步骤如下:

  1、用户程序中的MapReduce函数库首先把输入文件分成M块(每块大小默认64M),在集群上执行处理程序,见序号1

  2、主控程序master分配Map任务和Reduce任务给工作执行机器worker。见序号2

  3、一个分配了Map任务的worker读取并处理输入数据块。从数据片段中解析出key/value键值对,然后把其传递给Map函数,由Map函数生成并输出中间key/value键值对集合,暂缓内存中。见序号3

  4、缓存中key/value键值对通过分区函数分成R个区域,之后周期性地写到本地磁盘上。同时将本地磁盘的存储位置传给master,由master负责把这些存储位置再传递给Reduce worker,见序号4

  5、当Reduce worker收到master的存储位置信息后,使用RPC从Map worker所在的磁盘上读取数据。最后通过对key进行排序使得具有key值的数据聚合到一起。见序号5

  6、Reduce worker程序遍历排序后的中间数据。Reduce worker程序将这个key值和相关的中间value值的集合传递给Rdeduce函数,最后,Reduce的输出被追加到所属分区的输入文件。见序号6

二、计算流程与机制

2.1、作业提交和初始化

  MapReduce的JobTracker接收到客户端提交的作业后首先要把作业初始化为Map任务和Reduce任务,然后等待调度执行。

  图中展示了MapReduce客户端提交作业以及初始化的流程。

  作业提交过程:

  1、命令行提交。调用JobClient.runJob()方法开始提交,最终通过Job对象内部JobClient对象的submitJobInternal方法来提交作业到JobTracker。

  2、作业上传。在提交到JobTracker之前还需要完成相关的初始化工作(获取用户作业的JobId,创建HDFS目录,上传作业、相关依赖库,需要分发的文件等到HDFS上,获取用户输入数据的所有分片信息)。

  3、产生切片文件。在作业提交后,JobClient调用InputFormat中的getSplits()方法产生用户数据的split分片信息。

  4、提交作业到JobTracker。JobClient通过RPC将作业提交到JobTracker作业调度器中,首先为作业创建JobInProgress对象,用于跟踪正在运行的作业的状态和进度。其次检查用户是否具有指定队列的作业提交权限。接着检查作业配置的内存使用量是否合理。最后通过TaskScheduler初始化作业,JobTracker收到提交的作业后,会交给TaskScheduler调度器,然后按照一定的策略对作业执行初始化操作。

  作业的初始化:主要是构造Map Task和Reduce Task并对他们进行初始化操作,主要是调度器调用JobTracker.initJob()方法来进行的。具体分为四个类型的任务:

    Setup Task--->Map Task--->Reduce Task--->Cleanup Task

2.2、Mapper

  在作业提交完成之后,就开始执行Map Task任务了,Mapper的任务就是执行用户的Map()函数将输入键值对(key/value pair)映像到一组中间格式的键值对集合。

  图中是Mapper的处理流程图。Mapper的输入文件在HDFS上;InputFormat接口描述文件的格式信息,通过这个接口可以获得InputSplit的实现,然后对输入的数据进行切分;每一个Split分块对应一个Mapper任务,通过RecordReader对象从输入分块中读取并生成<k,v>键值对;Map函数接收这些键值对根据用户的Map函数进行处理后输出<k1,v1>键值对,Map函数通过context.collect方法将结果写到context对象中;当Mapper的输出键值对被收集后,他们会被Partitioner类中的partition()函数以指定的分区写到输出缓冲区,同时调用sort函数对输出进行排序,如果用户为Mapper指定了Combiner,则在Mapper输出它的键值对<k1,v1>时,不会马上写到输出中,会被收集在list对象中,当写入一定数量的键值对,这部分缓冲会被Combiner中的combine函数合并,然后输出被写入本地文件系统之后会进入Reduce阶段。

2.3、Reducer

  Reducer有三个主要阶段:Shuffer、Sort和Reduce,处理流程图为:

  Reducer的整个处理流程为:

    1、Shuffle阶段,此时Reducer的输入就是Mapper已经排序好的输出。可以理解为混洗阶段,相当于数据复制阶段。

    2、Sort阶段,按照key值对Reducer的输入进行分组,Shuffle和Sort是同时进行的。

    3、Reduce阶段,通过前两个阶段得到的<key,(list of values)>会送到Reducer中的reduce()函数中处理。输出的结果通过OutputFormat输出到DFS中。

2.4、Reporter和OutputCollector

  Reporter是用于MapReduce应用程序报告进度,设定应用级别的消息,更新Counters计数器的机制。

  OutputCollector是一个由Map/Reduce框架提供的,用于收集Mapper和Reducer输出数据的通用机制,老版本用collect函数,新版本用write函数。

三、MapReduce的I/O格式

3.1、输入格式

  Hadoop中的MapReduce框架依赖InputFormat提供数据输入,也就是InputFormat为MapReduce作业描述输入的细节规范。InputFormat有三个作用:

    1、检查作业输入的有效性。

    2、把输入文件切分成多个逻辑InputSplit实例,并把每一个实例分别分发给一个Mapper,也就是一个Mapper的输入只对应一个逻辑InputSplit,只处理一个Split文件的数据块。

    3、提供ReduceReader的实现。从逻辑InputSplit中获取输入记录,这些记录将由Mapper处理,Mapper利用该实现从InputSplit中读取输入的<K,V>键值对。

  1、TextInputFormat

    用于读取纯文本文件,是Hadoop默认的InputFormat的派生类。LineRecordReader将Inputsplit解析成<key,value>对,key是每一行的位置(偏移量,为LongWritable类型),value是每一行的内容(为Text类型)。类图如下。

    通过类图看出,TextInputFormat类继承了FileInputFormat基类,实现了JobConfigure接口,实现了InputFormat中的getRecordReader这一方法,返回一个RecordReader用于划分中读取<key,value>键值对。

  2、KeyValueTextInputFormat

    类似TextInputFormat一样。

  3、NLineInputFormat

    这个类型可以将文件以行为单位进行split切分,比如每一行对应一个Map,得到key是每一行的位置,value是每一行的内容。这里的N指的是每一个Mapper接收的对应一个mapper来处理。

  4、SequenceFileInputFormat

    用于读取sequencefile。sequence是hadoop用于存储数自定义格式的二进制binary文件。其主要有两个子类。SequenceFileAsBinaryInputFormat用于处理二进制数据。SequenceFileAsTextInputFormat,主要是为了适应Hadoop streaming接口而设计的,在读取键值对后会调用toString()方法将key和value类型转化为Text类型对象。

3.2、输出格式

  hadoop中的OutputFormat用来描述MapReduce作业的输出格式。OutputFormat主要有三个作用:

    1、检验作业的输出

    2、验证输出结果类型是否如在Config中所配置的。

    3、提供一个RecordWriter的实现,用来输出作业结果。RecordWriter生成<key,value>键值对到输出文件。

  1、TextOutputFormat

    是默认的输出格式,就是输出纯文本文件,格式为key+"\t"+value,key与value之间默认以\t分割。

  2、SequeceFileOutputFormat

    和输入类似

  

  3、MapFileOutputFormat

    这个输出类型可以将数据输出为hadoop中的MpaFile文件格式。MapFile与SequenceFile类似。

  4、MultipleOutputFormat

    是hadoop类中的多路输出

     
     


 

当神已无能为力,那便是魔渡众生
目录
相关文章
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
98 3
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
50 1
|
6月前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
311 2
|
2月前
|
分布式计算 资源调度 Hadoop
在YARN集群上运行部署MapReduce分布式计算框架
主要介绍了如何在YARN集群上配置和运行MapReduce分布式计算框架,包括准备数据、运行MapReduce任务、查看任务日志,并启动HistoryServer服务以便于日志查看。
65 0
|
3月前
|
缓存 分布式计算 Java
详细解读MapReduce框架中的分布式缓存
【8月更文挑战第31天】
42 0
|
5月前
|
分布式计算 资源调度 数据处理
YARN支持哪些非基于MapReduce的计算模型?
【6月更文挑战第19天】YARN支持哪些非基于MapReduce的计算模型?
70 11
|
5月前
|
机器学习/深度学习 分布式计算 并行计算
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
77 0
|
6月前
|
分布式计算 并行计算 搜索推荐
Hadoop MapReduce计算框架
【5月更文挑战第10天】HadoopMapReduce计算框架
53 3
|
6月前
|
分布式计算 并行计算 Java
【分布式计算框架】 MapReduce编程初级实践
【分布式计算框架】 MapReduce编程初级实践
193 2
|
5月前
|
分布式计算 Java Hadoop
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
60 0
下一篇
无影云桌面