我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行。如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点。
通过MRAppMaster类的定义我们就能看出,MRAppMaster继承自CompositeService,而CompositeService又继承自AbstractService,也就是说MRAppMaster也是Hadoop中的一种服务,我们看下服务启动的serviceStart()方法中关于MapReduce作业的处理,关键代码如下:
@SuppressWarnings("unchecked") @Override protected void serviceStart() throws Exception { // ......省略部分代码 // 调用createJob()方法创建作业Job实例job // /////////////////// Create the job itself. job = createJob(getConfig(), forcedState, shutDownMessage); // End of creating the job. // ......省略部分代码 // 作业初始化失败标志位initFailed默认为false,即初始化成功,没有错误 boolean initFailed = false; if (!errorHappenedShutDown) { // create a job event for job intialization // 创建一个Job初始化事件initJobEvent JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); // Send init to the job (this does NOT trigger job execution) // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. // 调用jobEventDispatcher的handle()方法,处理Job初始化事件initJobEvent,即将Job初始化事件交由事件分发器jobEventDispatcher处理, jobEventDispatcher.handle(initJobEvent); // If job is still not initialized, an error happened during // initialization. Must complete starting all of the services so failure // events can be processed. // 获取Job初始化结果initFailed initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED); // JobImpl's InitTransition is done (call above is synchronous), so the // "uber-decision" (MR-1220) has been made. Query job and switch to // ubermode if appropriate (by registering different container-allocator // and container-launcher services/event-handlers). // ......省略部分代码 // Start ClientService here, since it's not initialized if // errorHappenedShutDown is true // 启动客户端服务clientService clientService.start(); } //start all the components // 调用父类的serviceStart(),启动所有组件 super.serviceStart(); // finally set the job classloader // 最终设置作业类加载器 MRApps.setClassLoader(jobClassLoader, getConfig()); if (initFailed) { // 如果作业初始化失败,构造作业初始化失败JOB_INIT_FAILED事件,并交由事件分发器jobEventDispatcher处理 JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); jobEventDispatcher.handle(initFailedEvent); } else { // All components have started, start the job. // 调用startJobs()方法启动作业 startJobs(); } }
通过MRAppMaster服务启动的serviceStart()方法我们大致知道,MapReduce作业在MRAppMaster中经历了创建--初始化--启动三个主要过程,剪去枝叶,保留主干,具体如下:
1、创建:调用createJob()方法创建作业Job实例job;
2、初始化:
2.1、创建一个Job初始化事件initJobEvent;
2.2、调用jobEventDispatcher的handle()方法,处理Job初始化事件initJobEvent,即将Job初始化事件交由事件分发器jobEventDispatcher处理;
2.3、获取Job初始化结果initFailed;
2.4、如果作业初始化失败,构造作业初始化失败JOB_INIT_FAILED事件,并交由事件分发器jobEventDispatcher处理。
3、启动:调用startJobs()方法启动作业。
实际上,作业启动后不可能永远都不停止,MRAppMaster最终会将作业停止,这也是作业处理流程的第四步,即最后一步,作业停止!在哪里处理的呢?我们先卖个关子,请您暂时忽略这个问题,我们稍后会给出答案!
下面,我们针对MapReduce作业的上述三个主要过程,分别展开描述。
一、创建
首先看作业创建,createJob()方法如下:
/** Create and initialize (but don't start) a single job. * @param forcedState a state to force the job into or null for normal operation. * @param diagnostic a diagnostic message to include with the job. */ protected Job createJob(Configuration conf, JobStateInternal forcedState, String diagnostic) { // create single job // 创建一个作业Job实例newJob,其实现为JobImpl Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(), taskAttemptListener, jobTokenSecretManager, jobCredentials, clock, completedTasksFromPreviousRun, metrics, committer, newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos, context, forcedState, diagnostic); // 将新创建的作业newJob的jobId与其自身的映射关系存储到应用运行上下文信息context中的jobs集合中 ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); // 异步事件分发器dispatcher注册作业完成事件JobFinishEvent对应的事件处理器,通过createJobFinishEventHandler()方法获得 dispatcher.register(JobFinishEvent.Type.class, createJobFinishEventHandler()); // 返回新创建的作业newJob return newJob; } // end createJob()其主要逻辑如下:
1、创建一个作业Job实例newJob,其实现为JobImpl,传入作业艾迪jobId、应用尝试艾迪appAttemptID、任务尝试监听器taskAttemptListener、输出提交器committer、用户名currentUser.getUserName()、应用运行上下文信息context等关键成员变量;
2、将新创建的作业newJob的jobId与其自身的映射关系存储到应用运行上下文信息context中的jobs集合中;
3、异步事件分发器dispatcher注册作业完成事件JobFinishEvent对应的事件处理器,通过createJobFinishEventHandler()方法获得;
4、返回新创建的作业newJob。
关于作业创建中的一些细节,我们暂时先不做过多关注,留待以后的文章专门进行分析。这里,我们先重点看看第3步,异步事件分发器dispatcher注册作业完成事件JobFinishEvent对应的事件处理器,通过createJobFinishEventHandler()方法获得,而createJobFinishEventHandler()方法代码如下:
/** * create an event handler that handles the job finish event. * @return the job finish event handler. */ protected EventHandler<JobFinishEvent> createJobFinishEventHandler() { return new JobFinishEventHandler(); }也就是说,当作业被创建后,它就被定义了作业完成事件JobFinishEvent的处理器为JobFinishEventHandler,而JobFinishEventHandler的定义如下:
private class JobFinishEventHandler implements EventHandler<JobFinishEvent> { @Override public void handle(JobFinishEvent event) { // Create a new thread to shutdown the AM. We should not do it in-line // to avoid blocking the dispatcher itself. new Thread() { @Override public void run() { shutDownJob(); } }.start(); } }这就是我们上面没有详细介绍的第四步--作业停止,它最终是调用的shutDownJob()方法,并开启一个新的线程来完成作业停止的,我们稍后再做介绍。
二、初始化
我们再来看作业的初始化,它是通过创建一个Job初始化事件JobEvent实例initJobEvent,事件类型为JobEventType.JOB_INIT,然后交由事件分发器jobEventDispatcher处理的。我们先来看下这个jobEventDispatcher的定义及实例化,如下:
// 作业事件分发器 private JobEventDispatcher jobEventDispatcher;jobEventDispatcher是一个JobEventDispatcher类型的作业事件分发器,其实例化为:
this.jobEventDispatcher = new JobEventDispatcher();而JobEventDispatcher的定义如下:
private class JobEventDispatcher implements EventHandler<JobEvent> { @SuppressWarnings("unchecked") @Override public void handle(JobEvent event) { // 从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件 ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event); } }很简单,从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件,而这个Job实例,还记得上面描述的吗,就是在Job最初被创建时,被添加到应用运行上下文信息context中jobs集合中的,key为jobId,value就是JobImpl对象。context的实现RunningAppContext中,根据jobId获取job实例的代码如下:
@Override public Job getJob(JobId jobID) { return jobs.get(jobID); }好了,我们就看下JobImpl中handle()方法是如何对类型为JobEventType.JOB_INIT的JobEvent进行处理的吧!
@Override /** * The only entry point to change the Job. */ public void handle(JobEvent event) { if (LOG.isDebugEnabled()) { LOG.debug("Processing " + event.getJobId() + " of type " + event.getType()); } try { writeLock.lock(); JobStateInternal oldState = getInternalState(); try { getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state", e); addDiagnostic("Invalid event " + event.getType() + " on Job " + this.jobId); eventHandler.handle(new JobEvent(this.jobId, JobEventType.INTERNAL_ERROR)); } //notify the eventhandler of state change if (oldState != getInternalState()) { LOG.info(jobId + "Job Transitioned from " + oldState + " to " + getInternalState()); rememberLastNonFinalState(oldState); } } finally { writeLock.unlock(); } }最核心的就是通过语句getStateMachine().doTransition(event.getType(), event)进行处理,实际上这牵着到了Yarn中MapReduce作业的状态机,为了本文叙述的流畅性、简洁性、重点明确性,我们对于作业状态机先不做解释,这部分内容留待以后的文章专门进行介绍,这里你只要知道作业初始化最终是通过JobImpl静态内部类InitTransition的transition()方法来实现的就行。我们看下InitTransition的transition()方法,如下:
/** * Note that this transition method is called directly (and synchronously) * by MRAppMaster's init() method (i.e., no RPC, no thread-switching; * just plain sequential call within AM context), so we can trigger * modifications in AM state from here (at least, if AM is written that * way; MR version is). */ @Override public JobStateInternal transition(JobImpl job, JobEvent event) { // 调用作业度量指标体系metrics的submittedJob()方法,提交作业 job.metrics.submittedJob(job); // 调用作业度量指标体系metrics的preparingJob()方法,开始作业准备 job.metrics.preparingJob(job); // 新旧API创建不同的作业上下文JobContextImpl实例 if (job.newApiCommitter) { job.jobContext = new JobContextImpl(job.conf, job.oldJobId); } else { job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( job.conf, job.oldJobId); } try { // 调用setup()方法,完成作业启动前的部分初始化工作 setup(job); // 设置作业job对应的文件系统fs job.fs = job.getFileSystem(job.conf); //log to job history // 创建作业已提交事件JobSubmittedEvent实例jse JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, job.conf.get(MRJobConfig.JOB_NAME, "test"), job.conf.get(MRJobConfig.USER_NAME, "mapred"), job.appSubmitTime, job.remoteJobConfFile.toString(), job.jobACLs, job.queueName, job.conf.get(MRJobConfig.WORKFLOW_ID, ""), job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), getWorkflowAdjacencies(job.conf), job.conf.get(MRJobConfig.WORKFLOW_TAGS, "")); // 将作业已提交事件JobSubmittedEvent实例jse封装成作业历史事件JobHistoryEvent交由作业的时事件处理器eventHandler处理 job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); //TODO JH Verify jobACLs, UserName via UGI? // 调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId); // 确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks job.numMapTasks = taskSplitMetaInfo.length; // 确定Reduce Task数目numReduceTasks,取作业参数mapreduce.job.reduces,参数未配置默认为0 job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0); // 确定作业的map和reduce权重mapWeight、reduceWeight if (job.numMapTasks == 0 && job.numReduceTasks == 0) { job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); } else if (job.numMapTasks == 0) { job.reduceWeight = 0.9f; } else if (job.numReduceTasks == 0) { job.mapWeight = 0.9f; } else { job.mapWeight = job.reduceWeight = 0.45f; } checkTaskLimits(); // 根据分片元数据信息计算输入长度inputLength,也就是作业大小 long inputLength = 0; for (int i = 0; i < job.numMapTasks; ++i) { inputLength += taskSplitMetaInfo[i].getInputDataLength(); } // 根据作业大小inputLength,调用作业的makeUberDecision()方法,决定作业运行模式是Uber模式还是Non-Uber模式 job.makeUberDecision(inputLength); // 根据作业的Map、Reduce任务数目之和,外加10, // 初始化任务尝试完成事件TaskAttemptCompletionEvent列表taskAttemptCompletionEvents job.taskAttemptCompletionEvents = new ArrayList<TaskAttemptCompletionEvent>( job.numMapTasks + job.numReduceTasks + 10); // 根据作业的Map任务数目,外加10, // 初始化Map任务尝试完成事件TaskCompletionEvent列表mapAttemptCompletionEvents job.mapAttemptCompletionEvents = new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10); // 根据作业的Map、Reduce任务数目之和,外加10, // 初始化列表taskCompletionIdxToMapCompletionIdx job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>( job.numMapTasks + job.numReduceTasks + 10); // 确定允许Map、Reduce任务失败百分比, // 取参数mapreduce.map.failures.maxpercent、mapreduce.reduce.failures.maxpercent, // 参数未配置均默认为0,即不允许Map和Reduce任务失败 job.allowedMapFailuresPercent = job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0); job.allowedReduceFailuresPercent = job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0); // create the Tasks but don't start them yet // 创建Map Task createMapTasks(job, inputLength, taskSplitMetaInfo); // 创建Reduce Task createReduceTasks(job); // 调用作业度量指标体系metrics的endPreparingJob()方法,结束作业准备 job.metrics.endPreparingJob(job); // 返回作业内部状态,JobStateInternal.INITED,即已经初始化 return JobStateInternal.INITED; } catch (Exception e) { // 记录warn级别日志信息:Job init failed,并打印出具体异常 LOG.warn("Job init failed", e); // 调用作业度量指标体系metrics的endPreparingJob()方法,结束作业准备 job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); // Leave job in the NEW state. The MR AM will detect that the state is // not INITED and send a JOB_INIT_FAILED event. // 返回作业内部状态,JobStateInternal.NEW,即初始化失败后的新建 return JobStateInternal.NEW; } }为了主体逻辑清晰,我们去掉部分细节,保留主干,将作业初始化总结如下:
1、调用setup()方法,完成作业启动前的部分初始化工作,实际上最重要的两件事就是:
1.1、获取并设置作业远程提交路径remoteJobSubmitDir;
1.2、获取并设置作业远程配置文件remoteJobConfFile;
2、调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo:
通过SplitMetaInfoReader的静态方法readSplitMetaInfo(),从作业远程提交路径remoteJobSubmitDir中读取作业分片元数据信息,也就是每个任务的分片元数据信息,以此确定Map任务数、作业运行方式等一些列后续内容;
3、确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks;
4、确定Reduce Task数目numReduceTasks,取作业参数mapreduce.job.reduces,参数未配置默认为0;
5、根据分片元数据信息计算输入长度inputLength,也就是作业大小;
6、根据作业大小inputLength,调用作业的makeUberDecision()方法,决定作业运行模式是Uber模式还是Non-Uber模式:
小作业会通过Uber模式运行,相反,大作业会通过Non-Uber模式运行,可参见《Yarn源码分析之MRAppMaster:作业运行方式Local、Uber、Non-Uber》一文!
7、确定允许Map、Reduce任务失败百分比,取参数mapreduce.map.failures.maxpercent、mapreduce.reduce.failures.maxpercent,参数未配置均默认为0,即不允许Map和Reduce任务失败;
8、创建Map Task;
9、创建Reduce Task;
10、返回作业内部状态,JobStateInternal.INITED,即已经初始化;
11、如果出现异常:
11.1、记录warn级别日志信息:Job init failed,并打印出具体异常;
11.2、返回作业内部状态,JobStateInternal.NEW,即初始化失败后的新建;
未完待续,后续作业初始化部分详细描述、作业启动、作业停止等内容,请关注《Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)》。