MapReduce V1:JobTracker端Job/Task数据结构

简介:

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。
在编写MapReduce程序时,我们是以Job为单位进行编程处理,一个应用程序可能由一组Job组成,而MapReduce框架给我们暴露的只是一些Map和Reduce的函数接口,在运行期它会构建对应MapTask和ReduceTask,所以我们知道一个Job是由一个或多个MapTask,以及0个或1个ReduceTask组成。而对于MapTask,它是根据输入的数据文件的的逻辑分片(InputSplit)而定的,通常有多少个分片就会有多少个MapTask;而对于ReduceTask,它会根据我们编写的MapReduce程序配置的个数来运行。
有了这些信息,我们能够预想到,在Job运行过程中,无非也需要维护与这些Job/Task相关的一些状态信息,通过一定的调度策略来管理Job/Task的运行。这里,我们主要关注JobTracker端的一些非常有用的数据结构:JobTracker、JobInProgress、TaskInProgress,来熟悉各种数据结构的定义及作用。

数据结构总体抽象

MapReduce框架就为了运行Job,所以我们基于Job的抽象来对JobTracker端的相关对象进行抽象,总体上理解它们之间的关系,如下图所示:

在JobTracker端,通过维护JobInProgress的信息来跟踪Job的运行生命周期,那么,JobTracker端肯定有一个用来维护所有Job状态的JobInProgress对象集合。而Job又是由Task组成,所以自然而然JobInProgress中应该有对Task运行状态的维护,Task的状态在JobTracker端通过TaskInProgress来抽象。一个Task可能运行失败,所以可能经过多次运行才能成功,而每一次运行会对应一个TaskAttempID,那么一个TaskInProgress又可能对应着多个TaskAttempID。

TaskInProgress数据结构

  • TaskAttemptID结构

一个TaskInProgress结构中包含了3个TaskAttemptID类型的数据,如下图所示:

  • TaskSplitMetaInfo结构

JobTracker会创建每一个Task需要运行Split的信息,包含了该Split所在的位置信息、起始偏移量、总输入字节数,结构图如下所示:


其他结构

结构图 说明

1 TreeSet<TaskAttemptID> tasks

一个TaskInProgress包含的TaskAttemptID的集合。
一个Task(MapTask/ReduceTask)可能包含多个TaskAttemptID在TaskTracker上运行,比如一个 ReduceTask在TaskTracker上运行,同时可能存在一个推测执行的ReduceTask,他们对应了2个不同的 TaskAttemptID。


1 TreeMap<TaskAttemptID, String> activeTasks

维护了当前运行的Task,该Task运行在哪个TaskTracker上。


1 TreeMap<TaskAttemptID, String> cleanupTasks

记录了某个cleanup task在哪个TaskTracker上。


1 TreeSet<TaskAttemptID> tasksReportedClosed

满足如下3种条件的Task会被加入到该数据结构中:

  • this.failed) || ((job.getStatus().getRunState() != JobStatus.RUNNING && (job.getStatus().getRunState() != JobStatus.PREP))
  • isComplete() && !(isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid))
  • isCommitPending(taskid) && !shouldCommit(taskid)

该数据结构用来辅助判断,是否一个Task已经完成(成功/失败),需要被TaskTracker终止掉,这个需要JobTracker发送KillTaskAction指令,通知TaskTracker终止该Task运行。


1 TreeMap<TaskAttemptID, Boolean> tasksToKill

记录了某个Task是否需要被Kill掉。


1 TreeSet<String> machinesWhereFailed

记录了执行Task失败的TaskTracker的host信息。


JobInProgress数据结构

  • JobID结构

JobID的结构,如下图所示:



上图中jtIdentifier的值为job,它是组成一个Job的ID字符串的前缀,唯一标识一个Job的完整ID的组成,如下所示:

1 job_<JobTracker启动时间字符串>_<序号>

例如,一个Job的ID字符串为job_200912121733_0002 。

  • JobProfile结构

JobProfile描述了一个Job的基本信息,它的结构,如下图所示:

通过上图可以看出,JobProfile包含了一个Job的如下信息:

标识名称 类型 说明
jobid JobID 唯一标识一个Job的ID,例如:job_200912121733_0002
user String 提交的该Job的所属用户名称,例如:shirdrn
url String 在Web UI页面上查看该Job信息的链接,例如:http://jobtracker.hadoopcluster.com:8080/jobdetails.jsp?jobid=job_200912121733_0002
name String 提交Job的用户为该Job设置的名称字符串,例如:ChainUserEventsJob
jobFile String 该Job所对应的配置文件,例如:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml
queueName String 提交的该Job所在的队列的名称,例如:default
  • 组成Job的Task的信息

一个Job可能包含多个Task(MapTask/ReduceTask),每个Task在JobTracker端使用TaskInProgress结构来跟踪Task的信息,一个Job由下面4组结构来表示这些信息,如下图所示:


上图中出现了4种类型的Task,我们需要明白每种Task的作用是什么。一个Job在调度时,需要分解为上述4种类型的Task,基于类型来说明,一个Job对应的这4种类型的Task的运行顺序为:setup task、MapTask、ReduceTask、cleanup task,其中setup task和cleanup task运行也需要申请slot来运行,map setup运行需要占用Map Slot,而reduce setup运行需要占用Reduce Slot,对于cleanup task也是类似的。
这里说明一下cleanup task和setup task的作用。其实在JobTracker端来看,setup task、cleanup task都与MapTask、ReduceTask使用相同的TaskInProgress数据结构来维护状态。setup task主要是在一个Job开始运行之前,初始化一些状态信息,由于存在MapTask和ReduceTask两种计算型Task,那么对应就存在map setup task和reduce setup task两种setup task。cleanup task主要是在一个Job运行结束后,负责清理在TaskTracker上运行的Task生成的临时数据,更新TaskTracker端维护的相关对象的状态信息,等等,类似地也存在map cleanup task和reduce cleanup task两种cleanup task。

  • JobStatus结构

JobStatus结构定义一个Job的当前状态信息,如下图所示:

除了定义Job运行状态信息,还包含了其他信息,如下表所示:

标识名称 类型 说明
jobid JobID 唯一标识一个Job的ID,例如:job_200912121733_0002
mapProgress float MapTask运行进度百分比
reduceProgress float ReduceTask运行进度百分比
cleanupProgress float cleanup task运行进度百分比
setupProgress float setup task运行进度百分比
runState int Job运行状态:RUNNING = 1;SUCCEEDED = 2;FAILED = 3;PREP = 4;KILLED = 5;
user String 提交的该Job的所属用户名称,例如:shirdrn
priority JobPriority Job优先级信息,是一个枚举类型,包含如下优先级:VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
schedulingInfo String Job调度信息
jobACLs Map 该Job设置的ACL(访问控制列表)列表信息
  • Counters结构

Counters包含了一组计数器,用来跟踪一个Job运行的信息,结构如下图所示:

每个Job都包含一组Counter计数器,如下表所示:

标识名称 类型
NUM_FAILED_MAPS 失败的MapTask数量
NUM_FAILED_REDUCES 失败的ReduceTask数量
TOTAL_LAUNCHED_MAPS 所有启动的 MapTask数量
TOTAL_LAUNCHED_REDUCES 所有启动的 ReduceTask数量
OTHER_LOCAL_MAPS 其他Local MapTask数量
DATA_LOCAL_MAPS DATA_LOCAL的MapTask数量
NODEGROUP_LOCAL_MAPS NODEGROUP_LOCAL的MapTask数量
RACK_LOCAL_MAPS RACK_LOCAL的MapTask数量
SLOTS_MILLIS_MAPS 被占用的Map slot的“Slot个数 * (结束时间 – 开始时间)”
SLOTS_MILLIS_REDUCES 被占用的Reduce slot:“Slot个数 * (结束时间 – 开始时间)”
FALLOW_SLOTS_MILLIS_MAPS 空闲Map Slot:“(当前时间 – 开始时间) * Slot个数”
FALLOW_SLOTS_MILLIS_REDUCES 空闲Reduce Slot:“(当前时间 – 开始时间) * Slot个数”
  • 其他结构

JobInProgress中使用了大量的集合来维护Job/Task相关的状态信息,具体内容如下表所示:

结构图 说明

1 Map<Node, List<TaskInProgress>> nonRunningMapCache

JobTracker端维护了某个Node上,没有运行的MapTask列表的信息。
在调度MapTask之前,需要计算某个MapTask将要运行在哪些Node上,这里维护了某个Node所对应的没有运行的MapTask的列表信息。


1 Map<Node, Set<TaskInProgress>> runningMapCache

JobTracker端维护了某个Node上,当前正在运行的MapTask列表的信息。


1 List<TaskInProgress> nonLocalMaps

JobTracker端维护的、非Local,并且还没有运行的MapTask的列表。


1 SortedSet<TaskInProgress> failedMaps

JobTracker端维护了失败的MapTask的信息,在该集合中的TaskInProgress基于失败次数降序排序。
当某个MapTask失败以后,就会被放到该集合中,后续重新调度MapTask运行时,会检索该集合。

1 Set<TaskInProgress> nonLocalRunningMaps

JobTracker端维护的、 非Local、正在运行的MapTask保存在该数据结构中。

1 Set<TaskInProgress> nonRunningReduces

JobTracker端维护的、没有运行的ReduceTask的列表。

1 List<TaskAttemptID> mapCleanupTasks

为MapTask运行的cleanup task列表。

1 List<TaskAttemptID> reduceCleanupTasks

为ReduceTask运行的cleanup task列表。

1 List<TaskCompletionEvent> taskCompletionEvents

TaskCompletionEvent是用来跟踪Task完成事件的数据结构,该列表结构保存了TaskCompletionEvent。
当一个Task的状态为TaskStatus.State.SUCCEEDED/TaskStatus.State.FAILED/TaskStatus.State.KILLED的时候,会创建一个对应的TaskCompletionEvent对象,根据该对象来更新JobTracker端维护的Task的状态信息。

1 Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap

TaskTracker发送心跳的时候,会将TaskTracker的状态信息发送给JobTracker,状态信息通过TaskTrackerStatus表示,该对象中包含了Task的报告信息TaskStatus。如果是在运行ReduceTask时,抓取MapTask输出的结果失败时,会在根据Task报告信息,更新JobTracker端维护的mapTaskIdToFetchFailuresMap,记录了Task抓取MapTask输出失败的次数计数信息。

1 Map<TaskType, Long> firstTaskLaunchTimes

TaskType枚举类型定义如下:

1 public enum TaskType {
2 MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP
3 }

该firstTaskLaunchTimes数据结构保存了某个TaskType类型第一次运行的时间戳信息。

1 Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps

FallowSlotInfo包含了空闲的slot信息,主要九个包含了空闲的slot的个数信息。
该数据结构维护了在某个TaskTracker上为MapTask运行所预留的空闲slot的信息。


1 Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces

该数据结构维护了在某个TaskTracker上为ReduceTask运行所预留的空闲slot的信息。

JobTracker数据结构

JobTracker通过在内存中维护有关Job、Task相关的所有信息,来跟踪他们运行、交互过程中所发生的数据交换,等等,如下表所示:

结构图 说明

1 List<ServicePlugin> plugins

通过ServicePlugin接口,可以基于任意的RPC协议暴露DataNode或NameNode的功能。
通过配置项mapreduce.jobtracker.plugins可以设置ServicePlugin,JobTracker启动的时候会加载初始化配置的ServicePlugin。


1 List<JobInProgressListener> jobInProgressListeners

JobTracker维护了一组JobInProgressListener监听器,在JobTracker运行过程中,发生某些事件会触发注册的JobInProgressListener的执行。比如,JobClient提交一个Job,JobTracker端会触发对应的JobInProgressListener调用jobAdded()初始化该Job;比如,Job执行过程中状态发生变更,会触发JobInProgressListener调用jobUpdated()执行;比如,Job运行完成,会触发obInProgressListener调用jobRemoved()执行。
JobTracker初始化时会创建TaskScheduler,而启动TaskScheduler的时候,会把TaskScheduler所维护的JobInProgressListener添加到jobInProgressListeners列表中。


1 Map<JobID, JobInProgress> jobs

JobTracker维护一个JobID->JobInProgress映射的列表,JobID标识一个提交的Job,JobInProgress是JobTracker端维护的Job的所有信息的数据结构。在如下情况下,会检索/操作该jobs数据结构:

  • JobClient提交Job的时候,会创建JobInProgress,并加入到jobs集合中
  • JobClient远程调用Kill掉指定Job的时候,会根据JobID从jobs中获取JobInProgress信息,并Kill掉该Job,更新状态信息
  • JobClient查询当前运行的所有Job信息时,会检索jobs列表
  • 在JobTracker端检索一个Job所维护的Task信息时,会根据JobInProgress所维护的数据结构获取到对应的Task的信息TaskInProgress
  • Job运行状态不为RUNNING,并且也不为PREP,并且完成时间早于当前时间,会将Job从jobs列表删除
  • JobTracker解析接收到的TaskTracker发送的心跳的过程中,会检索并更新jobs列表中的Job信息,找到可以分配给该TaskTracker的属于满足条件的Job所包含的Task

1 TreeMap<String, ArrayList<JobInProgress>> userToJobsMap

用来跟踪某个用户提交的需要运行的Job集合的数据结构。
当Job完成(success/failure/killed)后,会在JobTracker内存中保存一些Job,这些Job属于哪些用户的。默认情况下会保存MAX_COMPLETE_USER_JOBS_IN_MEMORY=100个用户的已完成的Job,当超过该值时,会清理掉最早的用户以及对应的完成的Job信息。
可以通过配置项mapred.jobtracker.completeuserjobs.maximum来设置该值。


1 Map<String, Set<JobID>> trackerToJobsToCleanup

用来跟踪某个TaskTracker上运行的Job集合的数据结构。
当一个Job已经运行完成,TaskTracker需要知道哪些运行在该节点上的Job已经完成,并等待通知进行清理,这时会在JobTracker端检索该Map,取出该TaskTracker对应的需要进行清理的Job的集合。
另外,还有一种情况,当JobTracker一段时间内没有收到TaskTracker发送的心跳报告,这时会将该TaskTracker对应的Job集合从trackerToJobsToCleanup中删除,后续会重新调度这些运行在该有问题的TaskTracker上的Task(这些Task属于某些Job,JobTracker分配任务的单位是Task)。


1 Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup

用来跟踪某个TaskTracker上运行的Task集合的数据结构。
当Job运行完成(成功或者失败)后,一个TaskTracker需要知道属于该Job的哪些Task运行在该TaskTracker上,需要对这些Task进行清理。JobTracker端会查询出这类Task,并通过心跳的响应,向对应的TaskTracker发送KillTaskAction指令,通知TaskTracker清理这些Task运行时生成的临时文件等。


1 Map<TaskAttemptID, TaskInProgress> taskidToTIPMap

TaskAttemptID用来标识一个MapTask或一个ReduceTask,通过该数据结构可以根据TaskAttemptID获取到MapTask/ReduceTask的运行信息,也就是TaskInProgress对象。
当需要检索MapTask/ReduceTask,或者对JobTracker端所维护的该Task的状态信息进行更新的时候,需要通过该数据结构获取到。


1 TreeMap<TaskAttemptID, String> taskidToTrackerMap

维护TaskAttemptID到TaskTracker的映射关系,可以通过一个Task的ID获取到该Task运行在哪个TaskTracker上。


1 Map<String, Set<TaskTracker>> hostnameToTaskTracker

一台主机上,可能运行着多个TaskTracker进程,该数据结构用来维护host到TaskTracker集合的映射关系。如果一个host被加入了黑名单,则该host上面的所有TaskTracker都无法接收任务。


1 TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap

某个TaskTracker上都运行着哪些Task,通过该数据结构来维护这种映射关系。


1 TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap

在某个TaskTracker上都运行完成了哪些Task,通过该数据结构来维护这种映射关系。


1 Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap

TaskTracker会周期性地向JobTracker发送心跳报告,最近一次发送的心跳报告,JobTracker会给其一个响应,最后的这个响应的数据保存在该数据结构中。


1 Map<String, Node> hostnameToNodeMap

JobTracker维护了一个网络拓扑结构(NetworkTopology),组成该拓扑结构的是一个一个的Node,每个Node都包含了网络位置信息、继承关系信息、名称等。
每个TaskTracker都是整个Hadoop集群的一个节点,通过该数据结构维护了TaskTracker在集群拓扑结构中相关信息。
比如,根据给定TaskTracker ID,从hostnameToNodeMap中检索出其对应的Node信息,在调度一个Job的MapTask运行时(MapTask运行具有Locality特性),可以基于local、rack-local、off-switch的顺序优先选择前面的Node运行该MapTask。

附录
这里给出文中(文字/图片上)一些缩写词对应的完整名称,如下表所示:

简写词 完整名称
JIP JobInProgress
TIP TaskInProgress
TAID TaskAttemptID
TTID TaskTracker ID
TT HOST TaskTracker Host Name
TCE TaskCompletionEvent
JIPL JobInProgressListener

目录
相关文章
|
SQL 分布式计算 Hadoop
通过Job Committer保证Mapreduce/Spark任务数据一致性
通过对象存储系统普遍提供的Multipart Upload功能,实现的No-Rename Committer在数据一致性和性能方面相对于FileOutputCommitter V1/V2版本均有较大提升,在使用MapRedcue和Spark写入数据到S3/Oss的场景中更加推荐使用。
通过Job Committer保证Mapreduce/Spark任务数据一致性
|
分布式计算 Java API
阿里云E-MapReduce集群不同计算引擎sleep task使用笔记
需求:日常在E-MapReduce集群中进行相关测试,验证一些切换或变更是否会影响业务的运行导致任务failed。所以需要在测试集群中运行指定资源数(vcore及memory)或者指定运行时间的任务。 目前用到MapReduce和spark任务两种,其余的持续更新补充中……
|
存储 分布式计算 Hadoop
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
|
存储 XML 缓存
Hadoop中的MapReduce框架原理、Job提交流程源码断点在哪断并且介绍相关源码、切片与MapTask并行度决定机制、MapTask并行度决定机制
Hadoop中的MapReduce框架原理、Job提交流程源码断点在哪断并且介绍相关源码、切片与MapTask并行度决定机制、MapTask并行度决定机制
Hadoop中的MapReduce框架原理、Job提交流程源码断点在哪断并且介绍相关源码、切片与MapTask并行度决定机制、MapTask并行度决定机制
|
分布式计算 Hadoop 分布式数据库
通过Job Committer保证Mapreduce/Spark任务数据一致性
并发地向目标存储系统写数据是分布式任务的一个天然特性,通过在节点/进程/线程等级别的并发写数据,充分利用集群的磁盘和网络带宽,实现高容量吞吐。并发写数据的一个主要需要解决的问题就是如何保证数据一致性的问题,本文主要介绍MapReduce/Spark如何通过Job Committer机制解决写数据一致性的问题,以及在OSS等对象存储上的解决方案。
487 0
|
资源调度 分布式计算 调度
Yarn源码分析之MapReduce作业中任务Task调度整体流程(一)
        v2版本的MapReduce作业中,作业JOB_SETUP_COMPLETED事件的发生,即作业SETUP阶段完成事件,会触发作业由SETUP状态转换到RUNNING状态,而作业状态转换中涉及作业信息的处理,是由SetupCompletedTransition来完成的,它主要做了...
1166 0
|
分布式计算 调度
MapReduce源码分析之作业Job状态机解析(一)简介与正常流程浅析
        作业Job状态机维护了MapReduce作业的整个生命周期,即从提交到运行结束的整个过程。Job状态机被封装在JobImpl中,其主要包括14种状态和19种导致状态发生的事件。         作业Job的全部状态维护在类JobStateInternal中,如下所示: publ...
1025 0
Activity启动模式&Task栈
在AMS中,ActivityRecord对应一个Activity,TaskRecord对应一个Task,每个TaskRecord中保存了若干ActivityRecord,TaskRecord由taskId标识,通过getTaskId()可以获取Activity所属的Task。
1043 0