我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。
在编写MapReduce程序时,我们是以Job为单位进行编程处理,一个应用程序可能由一组Job组成,而MapReduce框架给我们暴露的只是一些Map和Reduce的函数接口,在运行期它会构建对应MapTask和ReduceTask,所以我们知道一个Job是由一个或多个MapTask,以及0个或1个ReduceTask组成。而对于MapTask,它是根据输入的数据文件的的逻辑分片(InputSplit)而定的,通常有多少个分片就会有多少个MapTask;而对于ReduceTask,它会根据我们编写的MapReduce程序配置的个数来运行。
有了这些信息,我们能够预想到,在Job运行过程中,无非也需要维护与这些Job/Task相关的一些状态信息,通过一定的调度策略来管理Job/Task的运行。这里,我们主要关注JobTracker端的一些非常有用的数据结构:JobTracker、JobInProgress、TaskInProgress,来熟悉各种数据结构的定义及作用。
数据结构总体抽象
MapReduce框架就为了运行Job,所以我们基于Job的抽象来对JobTracker端的相关对象进行抽象,总体上理解它们之间的关系,如下图所示:
在JobTracker端,通过维护JobInProgress的信息来跟踪Job的运行生命周期,那么,JobTracker端肯定有一个用来维护所有Job状态的JobInProgress对象集合。而Job又是由Task组成,所以自然而然JobInProgress中应该有对Task运行状态的维护,Task的状态在JobTracker端通过TaskInProgress来抽象。一个Task可能运行失败,所以可能经过多次运行才能成功,而每一次运行会对应一个TaskAttempID,那么一个TaskInProgress又可能对应着多个TaskAttempID。
TaskInProgress数据结构
- TaskAttemptID结构
一个TaskInProgress结构中包含了3个TaskAttemptID类型的数据,如下图所示:
- TaskSplitMetaInfo结构
JobTracker会创建每一个Task需要运行Split的信息,包含了该Split所在的位置信息、起始偏移量、总输入字节数,结构图如下所示:
其他结构
结构图 | 说明 | ||
|
一个TaskInProgress包含的TaskAttemptID的集合。 |
||
|
维护了当前运行的Task,该Task运行在哪个TaskTracker上。 |
||
|
记录了某个cleanup task在哪个TaskTracker上。 |
||
|
满足如下3种条件的Task会被加入到该数据结构中:
该数据结构用来辅助判断,是否一个Task已经完成(成功/失败),需要被TaskTracker终止掉,这个需要JobTracker发送KillTaskAction指令,通知TaskTracker终止该Task运行。 |
||
|
记录了某个Task是否需要被Kill掉。 |
||
记录了执行Task失败的TaskTracker的host信息。 |
JobInProgress数据结构
- JobID结构
JobID的结构,如下图所示:
上图中jtIdentifier的值为job,它是组成一个Job的ID字符串的前缀,唯一标识一个Job的完整ID的组成,如下所示:
1 |
job_<JobTracker启动时间字符串>_<序号> |
例如,一个Job的ID字符串为job_200912121733_0002 。
- JobProfile结构
JobProfile描述了一个Job的基本信息,它的结构,如下图所示:
通过上图可以看出,JobProfile包含了一个Job的如下信息:
标识名称 | 类型 | 说明 |
jobid | JobID | 唯一标识一个Job的ID,例如:job_200912121733_0002 |
user | String | 提交的该Job的所属用户名称,例如:shirdrn |
url | String | 在Web UI页面上查看该Job信息的链接,例如:http://jobtracker.hadoopcluster.com:8080/jobdetails.jsp?jobid=job_200912121733_0002 |
name | String | 提交Job的用户为该Job设置的名称字符串,例如:ChainUserEventsJob |
jobFile | String | 该Job所对应的配置文件,例如:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml |
queueName | String | 提交的该Job所在的队列的名称,例如:default |
- 组成Job的Task的信息
一个Job可能包含多个Task(MapTask/ReduceTask),每个Task在JobTracker端使用TaskInProgress结构来跟踪Task的信息,一个Job由下面4组结构来表示这些信息,如下图所示:
上图中出现了4种类型的Task,我们需要明白每种Task的作用是什么。一个Job在调度时,需要分解为上述4种类型的Task,基于类型来说明,一个Job对应的这4种类型的Task的运行顺序为:setup task、MapTask、ReduceTask、cleanup task,其中setup task和cleanup task运行也需要申请slot来运行,map setup运行需要占用Map Slot,而reduce setup运行需要占用Reduce Slot,对于cleanup task也是类似的。
这里说明一下cleanup task和setup task的作用。其实在JobTracker端来看,setup task、cleanup task都与MapTask、ReduceTask使用相同的TaskInProgress数据结构来维护状态。setup task主要是在一个Job开始运行之前,初始化一些状态信息,由于存在MapTask和ReduceTask两种计算型Task,那么对应就存在map setup task和reduce setup task两种setup task。cleanup task主要是在一个Job运行结束后,负责清理在TaskTracker上运行的Task生成的临时数据,更新TaskTracker端维护的相关对象的状态信息,等等,类似地也存在map cleanup task和reduce cleanup task两种cleanup task。
- JobStatus结构
JobStatus结构定义一个Job的当前状态信息,如下图所示:
除了定义Job运行状态信息,还包含了其他信息,如下表所示:
标识名称 | 类型 | 说明 |
jobid | JobID | 唯一标识一个Job的ID,例如:job_200912121733_0002 |
mapProgress | float | MapTask运行进度百分比 |
reduceProgress | float | ReduceTask运行进度百分比 |
cleanupProgress | float | cleanup task运行进度百分比 |
setupProgress | float | setup task运行进度百分比 |
runState | int | Job运行状态:RUNNING = 1;SUCCEEDED = 2;FAILED = 3;PREP = 4;KILLED = 5; |
user | String | 提交的该Job的所属用户名称,例如:shirdrn |
priority | JobPriority | Job优先级信息,是一个枚举类型,包含如下优先级:VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
schedulingInfo | String | Job调度信息 |
jobACLs | Map | 该Job设置的ACL(访问控制列表)列表信息 |
- Counters结构
Counters包含了一组计数器,用来跟踪一个Job运行的信息,结构如下图所示:
每个Job都包含一组Counter计数器,如下表所示:
标识名称 | 类型 |
NUM_FAILED_MAPS | 失败的MapTask数量 |
NUM_FAILED_REDUCES | 失败的ReduceTask数量 |
TOTAL_LAUNCHED_MAPS | 所有启动的 MapTask数量 |
TOTAL_LAUNCHED_REDUCES | 所有启动的 ReduceTask数量 |
OTHER_LOCAL_MAPS | 其他Local MapTask数量 |
DATA_LOCAL_MAPS | DATA_LOCAL的MapTask数量 |
NODEGROUP_LOCAL_MAPS | NODEGROUP_LOCAL的MapTask数量 |
RACK_LOCAL_MAPS | RACK_LOCAL的MapTask数量 |
SLOTS_MILLIS_MAPS | 被占用的Map slot的“Slot个数 * (结束时间 – 开始时间)” |
SLOTS_MILLIS_REDUCES | 被占用的Reduce slot:“Slot个数 * (结束时间 – 开始时间)” |
FALLOW_SLOTS_MILLIS_MAPS | 空闲Map Slot:“(当前时间 – 开始时间) * Slot个数” |
FALLOW_SLOTS_MILLIS_REDUCES | 空闲Reduce Slot:“(当前时间 – 开始时间) * Slot个数” |
- 其他结构
JobInProgress中使用了大量的集合来维护Job/Task相关的状态信息,具体内容如下表所示:
结构图 | 说明 | ||||||||
|
JobTracker端维护了某个Node上,没有运行的MapTask列表的信息。 |
||||||||
|
JobTracker端维护了某个Node上,当前正在运行的MapTask列表的信息。 |
||||||||
|
JobTracker端维护的、非Local,并且还没有运行的MapTask的列表。 |
||||||||
|
JobTracker端维护了失败的MapTask的信息,在该集合中的TaskInProgress基于失败次数降序排序。 |
||||||||
|
JobTracker端维护的、 非Local、正在运行的MapTask保存在该数据结构中。 |
||||||||
|
JobTracker端维护的、没有运行的ReduceTask的列表。 |
||||||||
|
为MapTask运行的cleanup task列表。 |
||||||||
|
为ReduceTask运行的cleanup task列表。 |
||||||||
|
TaskCompletionEvent是用来跟踪Task完成事件的数据结构,该列表结构保存了TaskCompletionEvent。 |
||||||||
|
TaskTracker发送心跳的时候,会将TaskTracker的状态信息发送给JobTracker,状态信息通过TaskTrackerStatus表示,该对象中包含了Task的报告信息TaskStatus。如果是在运行ReduceTask时,抓取MapTask输出的结果失败时,会在根据Task报告信息,更新JobTracker端维护的mapTaskIdToFetchFailuresMap,记录了Task抓取MapTask输出失败的次数计数信息。 |
||||||||
|
TaskType枚举类型定义如下:
该firstTaskLaunchTimes数据结构保存了某个TaskType类型第一次运行的时间戳信息。 |
||||||||
|
FallowSlotInfo包含了空闲的slot信息,主要九个包含了空闲的slot的个数信息。 |
||||||||
|
该数据结构维护了在某个TaskTracker上为ReduceTask运行所预留的空闲slot的信息。 |
JobTracker数据结构
JobTracker通过在内存中维护有关Job、Task相关的所有信息,来跟踪他们运行、交互过程中所发生的数据交换,等等,如下表所示:
结构图 | 说明 | ||
|
通过ServicePlugin接口,可以基于任意的RPC协议暴露DataNode或NameNode的功能。 |
||
|
JobTracker维护了一组JobInProgressListener监听器,在JobTracker运行过程中,发生某些事件会触发注册的JobInProgressListener的执行。比如,JobClient提交一个Job,JobTracker端会触发对应的JobInProgressListener调用jobAdded()初始化该Job;比如,Job执行过程中状态发生变更,会触发JobInProgressListener调用jobUpdated()执行;比如,Job运行完成,会触发obInProgressListener调用jobRemoved()执行。 |
||
|
JobTracker维护一个JobID->JobInProgress映射的列表,JobID标识一个提交的Job,JobInProgress是JobTracker端维护的Job的所有信息的数据结构。在如下情况下,会检索/操作该jobs数据结构:
|
||
|
用来跟踪某个用户提交的需要运行的Job集合的数据结构。 |
||
|
用来跟踪某个TaskTracker上运行的Job集合的数据结构。 |
||
|
用来跟踪某个TaskTracker上运行的Task集合的数据结构。 |
||
|
TaskAttemptID用来标识一个MapTask或一个ReduceTask,通过该数据结构可以根据TaskAttemptID获取到MapTask/ReduceTask的运行信息,也就是TaskInProgress对象。 |
||
|
维护TaskAttemptID到TaskTracker的映射关系,可以通过一个Task的ID获取到该Task运行在哪个TaskTracker上。 |
||
|
一台主机上,可能运行着多个TaskTracker进程,该数据结构用来维护host到TaskTracker集合的映射关系。如果一个host被加入了黑名单,则该host上面的所有TaskTracker都无法接收任务。 |
||
|
某个TaskTracker上都运行着哪些Task,通过该数据结构来维护这种映射关系。 |
||
|
在某个TaskTracker上都运行完成了哪些Task,通过该数据结构来维护这种映射关系。 |
||
|
TaskTracker会周期性地向JobTracker发送心跳报告,最近一次发送的心跳报告,JobTracker会给其一个响应,最后的这个响应的数据保存在该数据结构中。 |
||
|
JobTracker维护了一个网络拓扑结构(NetworkTopology),组成该拓扑结构的是一个一个的Node,每个Node都包含了网络位置信息、继承关系信息、名称等。 |
附录
这里给出文中(文字/图片上)一些缩写词对应的完整名称,如下表所示:
简写词 | 完整名称 |
JIP | JobInProgress |
TIP | TaskInProgress |
TAID | TaskAttemptID |
TTID | TaskTracker ID |
TT HOST | TaskTracker Host Name |
TCE | TaskCompletionEvent |
JIPL | JobInProgressListener |