我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。这篇文章的内容,更多地主要是描述处理/交互流程性的东西,大部分流程图都是经过我梳理后画出来的(开始我打算使用序列图来描述流程,但是发现很多流程在单个对象内部都已经非常复杂,想要通过序列图表达有点担心描述不清,所以选择最基本的程序流程图),可能看起来比较枯燥,重点还是关注主要的处理流程要点,特别的地方我会刻意标示出来,便于理解。
JobTracker与TaskTracker之间通过org.apache.hadoop.mapred.InterTrackerProtocol协议来进行通信,TaskTracker通过该接口进行远程调用实现Heartbeat消息的发送,协议方法定义如下所示:
1 |
HeartbeatResponse heartbeat(TaskTrackerStatus status, |
2 |
boolean restarted, |
3 |
boolean initialContact, |
4 |
boolean acceptNewTasks, |
5 |
short responseId) throws IOException; |
通过该方法可以看出,最核心的Heartbeat报告数据都封装在TaskTrackerStatus对象中,JobTracker端会接收TaskTracker周期性地发送的心跳报告,根据这些心跳信息来更新整个Hadoop集群中计算资源的状态/数量,以及Task的运行状态。
另外,在JobTracker端维护的对象的数据结构,主要包括如下3个:
- TaskTracker:这个类是在JobTracker端定义的,描述了TaskTracker的基本信息和状态(需要注意的是:它与TaskTracker进程的实现类同名,但是含义完全不同)
- JobInProgress:简写JIP,在JobTracker端用来描述,JobClient提交的Job运行状态的数据结构,一个JIP对象还包含了组成一个Job的Task对应的一组TIP的信息
- TaskInProgress:简写TIP,在JobTracker端用来描述,在TaskTracker上运行的Task状态的数据结构(需要注意的是:在TaskTracker端也对应一个TaskInProgress实现类,它与JobTracker端的同名,但是所包含的内容也并不完全相同)
- TaskAttemptID:简写TAID,它是唯一标识了组成一个Job的Task的一个运行实例,一个Task(MapTask/ReduceTask)可能运行多次,比如第一次运行失败,对应一个失败的TAID,第二次调度又运行,又对应一个新的TAID;再比如,推测执行,可能会对应着同一个Task的、具有2个不同TAID的Task运行实例
- ResourceStatus
- TaskStatus
- TaskTrackerHealthStatus
- TaskTracker创建一个TaskTrackerStatus对象,TaskTrackerStatus内部封装的信息包括:TaskTracker所在节点的基本信息、运行在TaskTracker上的Task的状态信息、TaskTracker服务的健康状态信息、TaskTracker的资源信息,另外发送心跳的RPC方法还包括restarted(TaskTracker是否重启)、initialContact(TaskTracker是否初次连接JobTracker)、acceptNewTasks(TaskTracker是否能够运行新的Task)、responseId(心跳响应ID),通过InterTrackerProtocol协议的heartbeat方法发送给JobTracker。
- JobTracker接收到TaskTracker发送的心跳数据。
- JobTracker检查TaskTracker的host是否在黑名单中,如果TaskTracker在黑名单中,则直接抛出异常终止RPC调用,否则继续下一步流程。
- 检查TaskTracker RPC调用参数restarted的值,如果TaskTracker重启了,则标记TaskTracker状态为健康状态;如果TaskTracker没有重启,则检查是否可以指派任务在该TaskTracker上运行。
- 如果TaskTracker不是初次连接JobTracker,检查JobTracker是否存在上一次向该TaskTracker发送的Heartbeat响应数据,存在的话则说明TaskTracker因为失去了与JobTracker之间的RPC连接而没有接收到,JobTracker直接再给TaskTracker重新发送该响应数据;不存在的话,若JobTracker重启了,使TaskTracker重新加入集群,需要通知Recovery Manager从恢复列表中移除该TaskTracker,若JobTracker未重启,这种情况几乎是不可能存在的(既然TaskTracker不是初次连接,JobTracker也没有重启,JobTracker端不可能没有保存Heartbeat响应数据)。
- 处理JobTracker接收到的TaskTracker的Heartbeat信息,主要是TaskTrackerStatus封装的数据。
- 根据处理Heartbeat数据结果,如果TaskTracker需要重新初始化,则发送一个带有ReinitTrackerAction指令的Heartbeat响应数据,否则TaskTracker不需要重新初始化则继续下一步流程。
- 检查是否可以向该TaskTracker指派任务,如果可以可以向该TaskTracker指派任务,则直接使用TaskScheduler指定的调度策略,选择当前可以指派给TaskTracker的一组需要启动的Task(对应指令LaunchTaskAction)。
- 根据TaskScheduler调度策略选择的需要启动的Task,并根据TaskTracker发送的Task状态报告,继续选择一些已经完成/需要被清理的Task分配给TaskTracker:先检查在该TaskTracker上是否有完成的Job,计算属于这些Job的需要被Kill掉(对应指令KillTaskAction)的Task;再检查是否有完成的Job,并且对应在该TaskTracker上的Task需要被清理(对应指令KillTaskAction);最后检查是否有已经完成需要被提交的Task(以此来通知TaskTracker提交Task完成并更新状态,对应指令CommitTaskAction)。
- 构造一个包含可调度Task(LaunchTaskAction/KillTaskAction/CommitTaskAction)的HeartbeatResponse对象,更新JobTracker内部维护的trackerToHeartbeatResponseMap映射。根据TaskTracker的Heartbeat报告的Task状态信息,对标记为完成的Task,更新JobTracker内部维护的多个队列和Map:trackerToMarkedTasksMap、taskidToTrackerMap、trackerToTaskMap、taskidToTIPMap。最后,返回TaskTracker调用的结果:HeartbeatResponse对象。
- 从队列Map<String, Set<JobID>> trackerToJobsToCleanup中移除在该TaskTracker上已经完成且需要清理的所有Job。
- 从队列Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup中移除在TaskTracker上已经运行完成且需要清理的所有Task。
- 通知Recovery Manager从其维护的Set<String>类型的恢复列表JobTracker.RecoveryManager.recoveredTrackers中移除该TaskTracker。
- 从TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap中删除在该TaskTracker上运行的所有Task。
- 对在该TaskTracker上的运行的每一个Task(在队列trackerToTaskMap中),进行如下2步处理:
- (1)从队列Map<TaskAttemptID, TaskInProgress> taskidToTIPMap中取出TaskAttemptID对应的TaskInProgress tip结构,再根据tip获取到JobInProgress:JobInProgress job = tip.getJob();;
- (2)如果ReduceTask已经完成,以及具有0个ReduceTask的所有MapTask已经完成,则将这些Task放入到队列TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap中;如果tip标记Task没有完成,或者满足条件tip.isMapTask() && !tip.isJobSetupTask() && job.desiredReduces() != 0,检查Job运行状态,当job.getStatus().getRunState() == JobStatus.RUNNING || job.getStatus().getRunState() == JobStatus.PREP成立时,则该Task运行失败,并更新Task状态,同时收集这类Job,放入集合Set<JobInProgress> jobsWithFailures中,后续对这些Job进行处理;
- 由于该TaskTracker被JobTracker标记为lost状态,则对上面收集到的jobsWithFailures集合中的Job,只要存在属于该Job的Task被分配到该TaskTracker上运行,会通过累加计算在该TaskTracker上失败的Task计数,给该TaskTracker以惩罚,并释放所有在该TaskTracker上预留的Slot。
- 从队列TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap中移除所有被标记完成的Task,同时更新JobTracker内部维护的如下3个队列:TreeMap<TaskAttemptID, String> taskidToTrackerMap、TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap、Map<TaskAttemptID, TaskInProgress> taskidToTIPMap。
- 如果在该TaskTracker上的运行的Task还有没处理的,则转第6步进行处理;否则,流程结束。
- 检查是否JobTracker端存在该TaskTracker上一次汇报的状态报告,如果不存在,则直接处理当前发送的状态报告;否则,会更新JobTracker端维护的如下4个全局计数器:totalMaps(MapTask总数)、totalReduces(ReduceTask总数)、occupiedMapSlots(占用的Map Slot总数)、occupiedReduceSlots(占用的Reduce Slot总数),在当前计数值的基础上,减去上次汇报的报告中的数量(实际上是假定上次汇报的全部指标都已完成,如果没完成,再通过本次汇报的状态报告再加回去);如果TaskTracker没有被加入到黑名单中,还需要更新下面2个JobTracker端全局计数器:totalMapTaskCapacity(该TaskTracker上最大Map Slot总数)、totalReduceTaskCapacity(该TaskTracker上最大Reduce Slot总数)。
- 处理TaskTracker当前汇报的状态报告,更新JobTracker内部维护的6个全局计数器:totalMaps、totalReduces、occupiedMapSlots、occupiedReduceSlots、totalMapTaskCapacity、totalReduceTaskCapacity,各个计数器具体含义见上一步说明。
- 如果TaskTracker是第一次汇报状态报告,则需要在JobTracker内部注册,构造一个org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker对象(该TaskTracker对象是在JobTracker的视角看到的结构),加入到队列HashMap<String, TaskTracker> taskTrackers中,同时还要计算该TaskTracker所在的host节点上TaskTracker进程的个数,更新队列Map<String, Integer> uniqueHostsMap。
- 从TaskTracker发送的TaskTrackerStatus对象可以提取Task状态报告集合,然后对每一个状态报告进行处理,直到所有的Task的状态都已经被更新到JobTracker内部维护的状态对象上,下面描述每一个TaskStatus的处理过程:
- (1)如果一个Task的运行状态不为TaskStatus.State.UNASSIGNED,说明该Task还没有在TaskTracker上获得运行机会,则并不让该Task失败(当一个Task指派给一个TaskTracker运行时,会首先在JobTracker端加入到一个超时列表中,由一个独立的线程JobTracker.ExpireLaunchingTasks去检测,该Task是否在给定的时间内(默认是10分钟 )是否在TaskTracker上启动而且一直没有报告状态,如果没有报告,则会将该Task标记为失败),等待下一次被调度分配给TaskTracker去运行。
- (2)根据Task的ID,获取到它对应的JobInProgress信息,如果没有获取到则将该Task对应的JobInProgress对象加入到cleanup列表Map<String, Set<JobID>> trackerToJobsToCleanup中,直接返回继续处理下一个TaskStatus报告;如果能够获取到对应的JobInProgress信息,则检查该JobInProgress中包含的Job是否设置初始化完成状态,如果没有设置,则直接将该Task加入到队列Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup中,等待JobTracker调度Kill掉该Task,直接返回继续处理下一个TaskStatus报告。
- (3)检查该TaskStatus报告中对应的TaskAttemptID,是否在JobTracker端存在对应的TaskInProgress对象,很有可能JobTracker重启,内存中维护的Map<TaskAttemptID, TaskInProgress> taskidToTIPMap队列中没有TaskInProgress对象,这时JobInProgress对象一定存在,可以通过JobInProgress对象获取到该Task对应的TaskInProgress对象(因为在JobTracker端创建Job的时候,会分别创建4类TIP:map、reduce、cleanup、setup),再将其加入到Map<TaskAttemptID, TaskInProgress> taskidToTIPMap队列中,同时触发已知的一组JobInProgressListener的jobUpdated方法,去更新Job状态。
- (4)根据TaskStatus能够获取到所有Fetch失败的Task,查询该Task对应的TaskInProgress对象,从而进一步通知JobInProgress对象,根据设定的允许Task Fetch失败的最大次数限制,确定是否要让该Task失败,并更新TaskInProgress状态。
- 如果心跳汇报的status中,Task运行状态为SUCCEEDED,当tip标识已经完成或标识被Kill掉,则统一修改status的运行状态为KILLED;如果心跳汇报的status对应的TaskAttemptID不是cleanup task,当该TaskAttemptID 对应的JobInProgress表示Job已经完成,或失败,或被Kill掉,那么status运行状态为FAILED_UNCLEAN则修改为FAILED,运行状态为KILLED_UNCLEAN则修改为KILLED。
- 调用TaskInProgress的updateStatus方法,传入当前TaskTracker汇报的status状态对象,更新tip的状态。TaskInProgress会维护每个Task对应的TaskStatus对象oldStatus,并根据汇报的status对更新替换oldStatus。有3种情况不需要更新:第一种是当status的运行状态不等于RUNNING/COMMIT_PENDING/FAILED_UNCLEAN/KILLED_UNCLEAN/UNASSIGNED中的任何一种状态;第二种是status的运行状态为RUNNING、UNASSIGNED中的任意一种状态,并且oldStatus的运行状态为FAILED/KILLED/FAILED_UNCLEAN/KILLED_UNCLEAN/SUCCEEDED/COMMIT_PENDING中任意一种状态;第三种是oldStatus的运行状态为FAILED/KILLED中的任意一种状态,这种情况会把该TaskAttemptID加入到队列TreeMap<TaskAttemptID, Boolean> tasksToKill中标识需要Kill掉该Task。
- 如果status的运行状态为FAILED状态,并且JobTracker在Safe模式下,则设置status的运行状态为KILLED。
- 此时,如果oldStatus与status不相等,即TaskAttemptID的状态已经发生变化,则会根据status的运行状态创建不同的TaskCompletionEvent事件(SUCCEEDED/FAILED/KILLED),这些 TaskCompletionEvent事件会被加入到JobInProgress的taskCompletionEvents列表中,供JobClient查询或供JobTracker检索;或者执行相应的操作:如果运行状态为FAILED_UNCLEAN/KILLED_UNCLEAN,则tip中该TaskAttemptID标记为失败并更新相关结构,然后加入到mapCleanupTasks/reduceCleanupTasks列表中等待被清理,同时将该TaskAttemptID对应的数据从JobTracker的taskidToTIPMap、taskidToTrackerMap、trackerToTaskMap这3个队列中删除。
- 根据构造的TaskCompletionEvent对象,并且如果status的运行状态为SUCCEEDED,则更新其对应的JobInProgress的状态为成功。
TaskTrackerStatus结构
TaskTrackerStatus对象要在网络间进行序列化传输,所以实现了接口org.apache.hadoop.io.Writable,该对象的数据结构,如下图所示:
字段名称 | 字段类型 | 说明 |
trackerName | String | TaskTracker的名称,例如:tracker_ + localHostname + : + taskReportAddress |
host | String | TaskTracker所在主机名称 |
httpPort | int | HTTP端口号,默认50030 |
taskFailures | int | 在该TaskTracker上运行失败的Task的个数 |
dirFailures | int | 在TaskTracker节点上,配置的mapred.local.dir目录失败的个数 |
maxMapTasks | int | 在该TaskTracker上同时运行Map的最大个数,通过mapred.tasktracker.map.tasks.maximum配置的,默认值是2 |
maxReduceTasks | int | 在该TaskTracker上同时运行Map的最大个数,通过 mapred.tasktracker.reduce.tasks.maximum配置的,默认值是2 |
resStatus | ResourceStatus | 在该TaskTracker上的资源情况,主要包括如下内容:虚拟内存大小、物理内存大小、Map slot数量、Reduce slot数量、可用磁盘空间、可用虚拟内存大小、可用物理内存大小、处理器数量、CPU频率、CPU使用百分比、累积CPU时间 |
taskStatus | TaskStatus | 在该TaskTracker上,当前task的状态,它有分为MapTaskStatus和ReduceTaskStatus,主要包含如下内容:taskid(TaskAttemptID)、运行进度百分比、运行状态、诊断信息、所在TaskTracker名称、slot数、开始时间、结束时间、执行阶段(Phase)、一组计数器信息 |
taskTrackerHealthStatus | TaskTrackerHealthStatus | TaskTracker的健康状态信息 |
下面,主要对ResourceStatus、TaskStatus、TaskTrackerHealthStatus进行说明:
ResourceStatus封装了一个TaskTracker节点的资源信息,结构如下图所示:
TaskStatus封装了一个TaskTracker节点上运行的Task的状态信息,结构如下图所示:
TaskTrackerHealthStatus封装了TaskTracker的健康状态信息,如下图所示:
JobTracker处理Heartbeat流程
JobTracker处理Heartbeat的流程,如果把每个处理细节都详细地展开,非常地复杂,可能从头到尾描述下来会感觉枯燥无味,所以这里我先概要地描述JobTracker处理Heartbeat的整体流程,然后再按照功能划分出一个个看似还算独立的子处理流程,单独地进行详细说明,这样能够更容易理解。整体处理流程,如下图所示:
上面流程图中,黑色虚线所表示的处理流程,我们说明一下:这种情况是不可能出现的,因为TaskTracker不是第一次连接JobTracker,而JobTracker端还没有上一次TaskTracker发送的Heartbeat对应的HeartbeatResponse,同时JobTracker又没有重启动过,所以这种条件是不存在的,那么该流程分支也不可能执行,故而用虚线描述,指向发送一个带有ReinitTrackerAction的HeartbeatResponse。
下面,我们细化整个流程,将一些比较重要的流程详细分析说明:
TaskTracker与JobTracker失去连接,更新状态
JobTracker如果在给定超时时间范围之内没有收到TaskTracker的Heartbeat报告,会认为该TaskTracker已经无法执行/指派任务,那么在JobTracker端与该TaskTracker相关的数据结构都需要更新,受到影响的Job和Task的数据结构也需要更新,具体处理流程如下图所示:
检查是否可以向TaskTracker指派运行Task
当TaskTracker发送Heartbeat标志其没有重启,那么会执行该子流程,如下图所示:
标记TaskTracker为Health状态
当TaskTracker重启了,然后再次连接JobTracker时,发送Heartbeat的过程中,会执行该流程。重启的TaskTracker,JobTracker会将一个TaskTracker标记为Health状态,说明该TaskTracker对应的资源信息(内存/CPU)应该在JobTracker端做记录,表示这些资源是可用的,更新JobTracker端的几个可用资源的变量计数。但是,很有可能TaskTracker重启之前,其上运行Task失败了很多次,在JobTracker端记录该失败计数,当满足一定条件后,会将TaskTracker加入灰名单,如果TaskTracker重启了,应该将其从灰名单中移除,以便不影响任务分派,具体处理流程如下图所示:
更新TaskTracker状态
如果TaskTracker不是第一次连接JobTracker,那么在JobTracker端的队列HashMap<String, TaskTracker> taskTrackers中会保存上一次TaskTracker向JobTracker汇报的状态TaskTrackerStatus,如果该TaskTrackerStatus不存在,则直接处理当前汇报的TaskTracker的状态报告,使得JobTracker端维护的该TaskTracker的状态是最新的,具体的处理流程,如下图所示:
更新TaskTracker上所有Task状态
在JobTracker处理TaskTracker发送的Heartbeat的过程中,首先会更新JobTracker维护的TaskTracker的状态信息,因为一个TaskTracker上可能运行着很多Task,那么需要更新这些Task的状态,可以通过上面介绍的TaskTrackerStatus的结构看出,对应着一个TaskStatus的状态报告集合,所以这里有一个批量更新TaskStatus状态的操作,实际上会对每一个Task的状态分别进行更新,整体处理流程如下图所示:
更新Task状态
当Task的状态发生变化的情况下,可能需要更新Task的状态,我们根据JobTracker定义的updateTaskStatus方法,方法声明如下所示:
1 |
public synchronized void updateTaskStatus(TaskInProgress tip, TaskStatus status) |
其中,tip是当前在JobTracker端维护的Task的状态,status是TaskTracker汇报的Task状态,更新JobTracker端Task状态主要是根据心跳汇报的status来更新tip数据结构。更新Task状态的具体流程,如下图所示: