背景
《Hadoop权威指南》这本书上关于MapReduce的工作机制进行了详细的介绍,job运行过程每一步的讲解比较清晰,在此进行整理和汇总。我的个人博客 http://www.wangjialong.cc
MapReduce1 简介
MapReduce1 是Hadoop2.0之前的MapReduce程序调度机制,也就是YARN出现之前的经典调度模型,最顶层包含了4个独立的实体。
- 客户端, 提交MapReduce程序
- jobtracker,协调作业的运行。它是一个Java应用程序,主类为JobTracker,扮演的角色是一个协调者的角色,是一个调度中心。负责的工作是作业的分配,tasktracker的监控
- tasktracker,运行作业划分后的任务。也是一个Java应用程序,主类为TaskTracker,它是MapReduce程序实际的运行者,受jobtracker的调度和指挥,并向jobtracker报告自己的状态,可以分为maptask、reducetask等
- 分布式文件系统(一般为HDFS),用于存放共享作业,如作业的jar文件,配置文件,计算所得的输入分片等,不存放计算的中间数据,中间数据存放在task所在节点的磁盘上。
下图简单介绍了MapReduce程序的工作流程及上述几个实体扮演的角色
一个job的完整流程包括了作业的提交,作业的初始化,任务的分配,任务的执行,进度和状态的更新,作业的完成六大步骤,下面进行分步介绍。为了方便,下面将job执行的工作原理图放上,并添加标题,方便后面文字说明时进行查找。
作业(job)的提交
job的提交涉及到的实体是客户端(job client)、JobTracker和分布式文件系统(HDFS)。它的步骤主要是工作原理图中的第1-4步。
1.启动job。会创建一个JobSubmmiter对象,之后的作业提交交给它来完成。
2. 向JobTracker请求一个作业ID(调用JobTracker的getNewJobId方法)
3. 在作业信息发给HDFS之前,需要先检查作业的输出、输出路劲是否存在,满足后将作业JAR文件、配置文件和输入分片等复制到一个以作业ID命名的目录下(位于JobTracker的文件系统子目录中)。作业jar有多个副本,默认为10,运行作业是以供tasktracker访问
4. 告知jobtracker准备执行(调用JobTracker的submitJob方法)
提交作业后,waitForCompletion()每秒轮询作业的进度,更新进度报告。作业完成后,若成功,则输出作业计数器,否则,报告作业失败信息。
作业的初始化
5.JobTracker将提交上来的作业进行初始化,创建一个表示作业的对象,用于封装任务和记录信息,以便跟踪任务状态。之后,将他们放入一个内部队列中,交由job scheduler来调度。
6. job scheduler创建任务运行列表。首先从HDFS中获取客户端计算好的输入分片,为每个分片创建一个map任务。然后根据Job中设置的mapred.reduce.tasks属性n创建相应数量的reduce任务,并指定任务ID。同时创建作业创建和作业清理两个任务,他们在tasktracker中运行,负责map之前的创建作业和reduce之后的作业清理。
任务的分配
7.tasktracker定期发送“心跳信息”给jobtracker,来表示自己的存活和是否准备好运行任务。jobtracker为ready状态的tasktracker分配一个任务(map或reduce)。 任务的分配取决于调度算法,如tasktracker上有固定的map槽和reduce槽(取决于tasktracker的核和内存),默认调度算法优先占用map槽。但对于map任务,jobtracker会考虑距离输入分片文件最近的tasktracker进行调度,map任务调度可能有数据本地化(任务运行在输入分片所在的节点上)、机架本地化(运行在所在机架),无本地化三种状态。
任务的执行
8.taskTracker将作业的jar文件从HDFS上复制到本地,然后将应用所需的全部文件从分布式缓存复制到本地磁盘,然后创建一个本地目录,将jar文件内容解压到该目录下。之后tasktracker新建一个TaskRunner对象。
9. TaskRunner启动一个新的jvm来运行每一个任务,以便用户定义的map和reduce不会影响到tasktracker。
10. 任务在JVM中运行,并定期汇报自己的进度,直到任务完成。
进度和状态的更新
MapReduce程序是一个耗时较长的任务,对于用户来说,需要得知作业的进度。任务在运行时,对于map任务来说,进度是已经处理的输入所占的比例。对于reduce任务来说,系统会估计已处理reduce输入的比例。
MapReduce中改变进度状态的操作为:
- 读入一条输入记录(mapper或reducer)
- 写入一条输出记录
- 在一个Reporter中设置状态描述
- 增加计数器(Reporter)
- 调用Reporter的progress()任务
进度最终会由tasktracker汇总到JobTracker中,JobClient每秒查询JobTracker来获取最新状态,也可以显式调用Job的getStatus方法来获取所有状态信息。
作业的完成
作业完成后,客户端从waitForCompletion()方法返回,Job的统计信息和计数值也输出到控制台。最后JobTracker清空作业的工作状态,并指示tasktracker也清空作业的工作状态(如删除中间输出)
感想
第一代的MapReduce虽然已经不在被使用,但对于学习MapReduce程序的流程还是有很大的帮助的,通过整理,了解了一些细微的知识点,相信对于理解YARN也会有很大的帮助。
Know one thing, Know it well.