我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。
本文不打算深入地详细分析TaskTracker某个具体的处理流程,而是概要地分析TaskTracker在MapReduce框架中的主要负责处理那些事情,是我们能够在宏观上了解TaskTracker端都做了哪些工作。我尽量将TaskTracker端的全部要点内容提出来,但是涉及到详细的分析,只是点到为止,后续会对相应模块的处理流程结合代码进行分析。
TaskTracker主要负责MapReduce计算集群中Task运行的管理,所以TaskTracker要管理的事情比较多。一个MapReduce Job由很多的Task组成,而一个Job的所有Task被分成几个相斥的子集,每个子集被分配到某一个TaskTracker上去运行,所以一个TaskTracker管理运行了一个Job的所有Task的一个子集,也就是说TaskTracker不仅要维护每个Job对应的一个Task的子集,还要维护这些Task所属的Job的运行状态,对于Job/Task的状态的管理都是与JobTracker通过RPC通信保持状态的同步。
下面是TaskTracker端的主要组件,如下图所示:
为了了解TaskTracker中各个组件都负责处理哪些工作,我们通过下表来简要地说明各个组件的功能,如下表所示:
组件名称 |
组件功能 |
localFs: FileSystem |
TaskTracker本地文件系统,用来管理本地文件和目录 |
systemFS: FileSystem |
HDFS分布式文件系统,可以访问HDFS,用来检索Job/Task对应的资源文件等。 |
TrackerDistributedCacheManager |
TrackerDistributedCacheManager负责跨Job的缓存的管理,每个Job会对应一个TaskDistributedCacheManager实例。比如,每次TaskTracker被分配执行一个Job的一组Task,此时需要将该Job对应的资源文件和split相关数据从HDFS下载到TaskTracker本地,这些文件都需要进行管理,包括位置查询、文件访问、文件清理等。 |
TaskTrackerInstrumentation |
用来管理TaskTracker上运行的一些Task的监控数据,主要是采集某些点的数据,如Task完成时、Task失败时、Task超时时等,目前该组件中都是空实现。 |
IndexCache |
Map阶段需要输出临时文件,要对MapTask的输出写入TaskTracker本地文件系统,需要对这些输出数据进行分区(partition),IndexCache负责管理分区文件的相关信息。 |
UserLogManager |
负责管理TaskTracker节点上执行Task输出的日志信息,目前通过UserLogEvent定义了JVM_FINISHED、JOB_STARTED,、JOB_COMPLETED,、DELETE_JOB这4种事件,通过UserLogManager可以实现日志记录的输出。 |
ACLsManager |
用来控制MapReduce管理员管理Job和Queue级别操作的访问权限。 |
NodeHealthCheckerService |
用来检测节点之间的心跳服务。 |
ResourceCalculatorPlugin |
用来计算系统的资源的插件,默认使用的是LinuxResourceCalculatorPlugin实现,可以方便地访问系统中的资源信息状态,如内存、CPU。 |
JvmManager |
为了保证TaskTracker与实际Task(MapTask/ReduceTask)运行的隔离性,会将Task在单独的JVM实例中运行,JvmManager用来管理Task运行所在的JVM实例的信息,包括创建/销毁JVM实例等操作。 |
LocalStorage |
管理TaskTracker本地文件系统的存储目录信息,如访问本地目录失败、检测目录可用性等。 |
LocalDirAllocator |
管理TaskTracker本地目录分配,初始化LocalDirAllocator基于配置mapred.local.dir指定的目录,它采用的Round-Robin方式,在Task运行之前需要写一个启动Task的脚本文件,使用LocalDirAllocator来控制对应文件的读写。 |
JettyBugMonitor |
在Map阶段输出中间结果,Reduce阶段会基于HTTP协议(基于Jetty)来拷贝属于自己的分区,为了解决Jetty已知的一些类存在的Bug,它们可能会影响TaskTracker,通过检测Jetty所在JVM实例使用CPU量,当超过配置的值时终止TaskTracker进程。 |
MapOutputServlet |
TaskTracker上启动一个Jetty容器,该Servlet用来负责暴露HTTP接口,供其它运行ReduceTask的TaskTracker拉取Map输出文件。 |
jobClient: InterTrackerProtocol |
与JobTracker进行RPC通信的代理(Proxy)对象。 |
taskReportServer: Server |
TaskTracker节点上启动的RPC Server,在其上运行的Task,在运行过程中会向TaskTracker汇报状态,使TaskTracker知道Task的运行状态报告。 |
CleanupQueue |
负责清理Job或Task运行完成后遗留下的一些不再使用的文件或目录。 |
TaskTrackerStatus |
维护TaskTracker当前的状态信息,主要包括:TaskTracker的配置信息、TaskTracker上资源状态信息、TaskTracker上运行的Task的状态报告信息。 |
JobTokenSecretManager |
用来管理Job运行的令牌相关信息。 |
ShuffleServerInstrumentation |
管理Job运行过程中,shuffle阶段的监控数据,包括一组计数器:serverHandlerBusy、outputBytes、failedOutputs、successOutputs、exceptionsCaught。 |
TaskController |
用来管理Task的初始化、完成、清理工作,还负责启动和终止Task运行所在的JVM实例。 |
HttpServer |
用来处理Map输出的Jetty容器,其中MapOutputServlet会注册到该HTTP server中。 |
ShuffleExceptionTracker |
跟踪Shuffle阶段出现异常情况的信息。 |
MapEventsFetcherThread |
跟踪每个运行的Job对应的ReduceTask的Shuffle阶段,如果有Map完成,会对应着TaskCompletionEvent触发该线程,从已经完成的Map所在节点拷贝Map输出的中间结果数据,为ReduceTask运行做准备。 |
ReduceTaskLauncher |
启动ReduceTask。 |
MapTaskLauncher |
启动MapTask。 |
TaskCleanupThread |
负责清理Job/Task执行完成后遗留的文件或目录。 |
TaskMemoryManagerThread |
管理在该TaskTracker上运行的Task使用内存的信息。 |
通过上表,我们可以了解到TaskTracker端各个组件的基本功能,也稍微了解到组件之间的一些关系。下面,我们从TaskTracker抽象层次的视角,来分析组件之间的关系和交互,概要地描述一些主要的处理流程:
- TaskTracker处理心跳响应
- MapReduce Job恢复运行
- Task隔离运行
- 启动MapTask过程
- 启动ReduceTask过程
下面,我们分别分析上述列举的5个处理流程:
TaskTracker处理心跳响应
TaskTracker周期性地向JobTracker发送心跳报告,将TaskTracker上运行的Task的状态信息、节点资源信息、节点健康状况信息封装到TaskTrackerStatus对象中,通过RPC调用heartbeat将心跳发送到JobTracker端,并返回HeartbeatResponse,其中心跳响应对象中包包含了JobTracker分配的任务,通过TaskAction这种指令(包括:LaunchTaskAction/KillTaskAction/CommitTaskAction)列表的方式进行指派。TaskTracker解析RPC调用返回的心跳响应,根据TaskAction指令列表,执行具体的操作。
TaskTracker处理心跳响应的流程,如下序列图所示:
TaskTracker收到心跳响应,首先会检查是否存在需要恢复的Job,如果存在,则会检查要进行恢复的Job的状态,从而将需要进行恢复的Job对应的Task加入到恢复队列中,等待调度运行。
接着,TAskTracker会检查TaskAction指令的类型,根据其实际类型,执行对应的处理流程:
- 如果是LaunchTaskAction,则启动Task
- 如果是KillTaskAction,则杀掉Task,修改TaskTracker维护的Job及Task状态,并清理临时数据
- 如果是KillJobAction,则杀掉该Job,说明该Job已经完成(成功/失败),修改TaskTracker维护的Job及Task状态,并清理临时数据
- 如果是CommitTaskAction,则说明对应的该Task已经执行完成,修改TaskTracker维护的Task即Job状态,最后还要清理临时数据
由于某些重要的处理流程,如启动一个Task的详细流程,我们会在后续单独写几篇文章,用更加合适的方式来详细分析。
MapReduce Job恢复运行
这里,我们介绍一下MapReduce计算中是如何实现Job的恢复的,包括JobTracker端和TaskTracker端之间的简单交互流程。
JobTracker存在一个系统目录(System Directory),默认值为/tmp/hadoop/mapred/system,也可以根据配置项mapred.system.dir指定该值。当JobClient提交一个Job到JobTracker时,JobTracker会首先将该Job的信息写入到JobTracker的系统目录下,每个Job对应一个以Job ID为名称的子目录,以便JobTracker因为重启,能够恢复这些Job的运行。我们可以看一下JobTracker中submitJob方法中保存Job信息的实现,代码如下所示:
06 |
Path jobDir = getSystemDirectoryForJob(jobId); |
07 |
FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION)); |
08 |
FSDataOutputStream out = fs.create(getSystemFileForJob(jobId)); |
上面代码中,JobInfo主要包含了JobID信息、用户名称、Job提交目录(例如,/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/ ,该目录是在JobClient提交Job时在HDFS上创建的,用于将该Job所需要的资源都拷贝到该Job对应的提交目录下面,便于后续JobTracker能够读取这些数据)。
如果JobTracker因为某些原因重新启动了,那么在JobTracker重启之后,需要从JobTracker的系统目中读取这些Job的信息,以便能够恢复这些尚未完成的Job的运行,并以HeartbeatResponse的结构,在TaskTracker发送Heartbeat的时候响应给TaskTracker,TaskTracker解析响应数据,然后去恢复这些Job的运行。
上面的序列图中,我们可以看到,当TaskTracker发送Heartbeat并收到响应后,从HeartbeatResponse中解析取出需要Recovered的Job,并进行处理,代码如下所示:
02 |
Set<JobID> jobs = heartbeatResponse.getRecoveredJobs(); |
03 |
if (jobs.size() > 0 ) { |
06 |
for (JobID job : jobs) { |
08 |
synchronized (runningJobs) { |
09 |
rjob = runningJobs.get(job); |
12 |
FetchStatus f = rjob.getFetchStatus(); |
22 |
synchronized (shouldReset) { |
23 |
for (Map.Entry<TaskAttemptID, TaskInProgress> entry |
24 |
: runningTasks.entrySet()) { |
25 |
if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) { |
26 |
this .shouldReset.add(entry.getKey()); |
通过上面的代码我们可以看到,当JobTracker重启的时候,已经在TaskTracker上运行的属于某些Job的Task可能无法立即感知到,对应的Job仍然存在于TaskTracker的runningJobs集合中。在JobTracker重启之后,TaskTracker所发送的第一个Heartbeat返回的响应数据中,应该会存在需要Recovered的Job列表,所以这时在TaskTracker端只需要从runningJobs中取出需要Recovered的Job,并查看其是否存在Fetch状态,如果存在,应该重新设置状态(主要对应于MapEventsFetcherThread 维护的TaskCompletionEvent列表,触发ReduceTask拉取MapTask的输出中间结果),以便该Job的各个Task恢复运行。如果该ReduceTask正在运行于SHUFFLE阶段,需要将对应的Job的MapTask的输出拷贝到该ReduceTask所在的节点上,通过调用FetchStatus的reset方法重置状态,这样就重新恢复了ReduceTask的运行。
Task隔离运行
由于MapReduce用户程序包含用户代码,可能会存在Bug,为了不因为用户代码存在的Bug影响TaskTracker服务,所以MapReduce采用了隔离Task运行的方式来运行MapTask/ReduceTask。在运行Task时,会单独创建一个独立的JVM实例,让Task的代码再该JVM实例中加载运行,TaskTracker需要跟踪该JVM实例中运行的Task的状态。在TaskTracker端,加载一个运行Task的JVM实例,是通过org.apache.hadoop.mapred.Child类来实现。下面,我们看一下Child类是如何实现Task加载运行的,如下面序列图所示:
Child类包含一个入口主方法main,在运行的时候需要传递对应的参数,来运行MapTask和ReduceTask,通过上面序列图我们可以看出,命令行输入如下5个参数:
- host:表示TaskTracker节点的主机名称
- port:表示TaskTracker节点RPc端口号
- taskID:表示启动的Task对应的TaskAttemptID,标识一个Task的一个运行实例
- log location:表示该Task运行实例对应的日志文件的路径
- JVM ID:表示该Task实例对应的JVMId信息,包括JobID、Task类型(MapTask/ReduceTask)、JVM编号(标识该JVM实例对应的id)
有了上述参数,就可以获取到一个Task运行所需要的全部资源,如一个Task处理哪一个Split,一个Task对应的Job配置信息,还可以方便TaskTracker监控该Task实例所在的JVM的状态。该Child创建时,会创建一个到TaskTracker的RPC代理对象,通过该RPC连接向TaskTracker汇报Task执行进度及其状态信息。然后,一切运行Task的基本条件都已经具备,接下来从该Task对应的Job的代码(job.jar)开始加载任务处理类,如果是MapTask则执行MapTask运行的处理流程,如果是ReduceTask则执行ReduceTask的处理流程,最后,断开Task汇报状态的RPc连接,Task运行结束。
启动MapTask过程
在Child类中加载启动Task,如果是MapTask,则执行MapTask对应的处理流程,如下序列图所示:
启动一个MapTask运行,包含4个阶段,我们通过运行各个阶段的方法来表示:
- runJobCleanupTask():清理Job对应的相关目录和文件
- runJobSetupTask():创建Job运行所需要的相关目录和文件
- runTaskCleanupTask():清理一个Task对应的工作目录下与Task相关的目录或文件
- runNewMapper()/runOldMapper():调用用户编写的MapReduce程序中的Mapper中的处理逻辑
在MapTask运行过程中, 如果阶段或者状态发生变化,要与TaskTracker进行通信,汇报状态,并更新TaskTracker维护的关于Task和Job对应的状态数据。最后,Task运行完成,也要通知TaskTracker。
启动ReduceTask过程
在Child类中加载启动Task,如果是ReduceTask,则执行ReduceTask对应的处理流程,如下序列图所示:
启动一个ReduceTask运行,与MapTask的处理流程有很大的不同,它包含3个阶段,如下所示:
- copy阶段:从运行MapTask的TaskTracker节点,拷贝属于该ReduceTask对应的Job所包含的MapTask的输出中间结果数据,这些数据存储在该Reduce所在TaskTracker的本地文件系统(可能会放在内存中),为后续阶段准备数据
- sort阶段:对从MapTask拉取过来的数据进行合并、排序
- reduce阶段:调用用户编写的MapReduce程序中的Reducer中的处理逻辑
执行reduce阶段,比MapTask复杂的多。在ReduceTask运行过程中,也会周期性地与TaskTracker通信,汇报Task运行进度和状态,以保证与TaskTracker所维护的Task的状态数据同步。当ReduceTask完成后,如果有输出的话,最终的结果数据会输出到HDFS中保存。