ResourceManager是Hadoop资源管理器Yarn的Master,负责资源的统一管理和调度。它与Yarn中另外三个组件协同工作,共同完成应用程序在Yarn上的资源管理与调度。通过以下这张图我们就可以看出ResourceManager在资源管理和调度中的地位和作用(ps:图片截取自董西成的《Hadoop技术内幕:深入解析YARN架构设计与实现原理》一书)。
不难看出,ResourceManager居于整体体系的正中,这也印证了它是Hadoop Yarn中Master节点。它通过不同的RPC协议与NodeManager、ApplicationMaster、Application Client一起协同工作,首先由Application Client向ResourceManager提交应用程序,并产生ApplicationMaster,然后ApplicationMaster申请向RM注册并申请资源,而RM对资源清理能够了如指掌并能够适当分配,则是由于NodeManager向ResourceManager注册并周期性汇报资源情况,这三大组件在RM的统一调度和管理下,共同完成应用程序在Yarn上的资源管理与调度。箭头中间的三个RPC协议,就是本文我将要论述的重点。
一、ResourceTracker
这是ResourceManager与NodeManager通信使用的RPC协议。基于ResourceTracker,NodeManager可完成向ResourceManager注册、周期性心跳汇报工作,并在周期性心跳汇报中领取RM下达的命令,比如重新初始化、清理 Container等。在这个过程中,NM扮演的是RPC client的角色,而RM扮演的是RPC server的角色,而这一过程是pull模型,即总是由slave节点NM主动发起,向RM注册或周期性汇报。
ResourceTracker就提供了两个方法,如下:
@Idempotent public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException; @AtMostOnce public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException;其中,registerNodeManager()实现了NM向RM注册的过程,而nodeHeartbeat()则实现了NM向RM周期性心跳汇报的过程。
作为RPC server端,RM中实现这个RPC功能的组件是ResourceTrackerService,在RM内部有成员变量如下定义:
protected ResourceTrackerService resourceTracker;而ResourceTrackerService中也对应实现了ResourceTracker这一RPC接口的两个方法,如下:
@SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { ...... ...省略主要代码... ...... return response; } @SuppressWarnings("unchecked") @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { ...... ...省略主要代码... ...... return nodeHeartBeatResponse; }至于其具体实现,我以后再分析。
二、ApplicationMasterProtocol
这是ResourceManager与ApplicationMaster通信使用的RPC协议。ApplicationMaster通过它完成向RM注册、申请资源和释放资源的过程。在这个过程中,AM扮演的是RPC client的角色,而RM扮演的是RPC server的角色,而这一过程也是pull模型,即总是由AM主动发起,RM被动接受的。
ApplicationMasterProtocol提供了三个方法,如下:
@Public @Stable @Idempotent public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException; @Public @Stable @AtMostOnce public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException; @Public @Stable @AtMostOnce public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException;其中,registerApplicationMaster()实现的是AM向RM注册这一过程,allocate()实现的则是AM向RM发起的资源申请,最后finishApplicationMaster()则完成了应用程序的关闭与资源释放。
同样,作为RPC server端,RM中实现这个RPC功能的组件是ApplicationMasterService,在RM内部有成员变量如下定义:
protected ApplicationMasterService masterService;ApplicationMasterService中也实现了ApplicationMasterProtocol这一RPC接口的三个核心方法,如下:
@Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { ...... ...省略主要代码... ...... return response; } } @Override public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException { ...... ...省略主要代码... ...... // For UnmanagedAMs, return true so they don't retry return FinishApplicationMasterResponse.newInstance( rmApp.getApplicationSubmissionContext().getUnmanagedAM()); } } @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { ...... ...省略主要代码... ...... return allocateResponse; } }具体过程以后分析。
三、ApplicationClientProtocol
这是ResourceManager与Application Client通信使用的RPC协议。Application Client通过它完成向RM提交应用程序、查看应用程序状态、控制应用程序(杀死)等的过程。在这个过程中,Application Client扮演的是RPC client的角色,而RM扮演的是RPC server的角色。
ApplicationClientProtocol提供了十四个方法,其中比较常见的有如下:
@Public @Stable @Idempotent public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException; @Public @Stable @Idempotent public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnException, IOException; @Public @Stable @Idempotent public GetClusterMetricsResponse getClusterMetrics( GetClusterMetricsRequest request) throws YarnException, IOException; @Public @Stable @Idempotent public GetClusterNodesResponse getClusterNodes( GetClusterNodesRequest request) throws YarnException, IOException; @Public @Stable @Idempotent public GetQueueInfoResponse getQueueInfo( GetQueueInfoRequest request) throws YarnException, IOException; @Public @Unstable @Idempotent public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException;其中,submitApplication()实现的是Application Client向RM提交应用程序这一过程,forceKillApplication()实现的则是Application Client向RM发起的强制杀死应用程序,moveApplicationAcrossQueues()则完成了Application Client请求RM将应用程序从一个队列移动到另外一个队列的过程,getClusterMetrics()则实现了Application Client向RM请求获取集群整体情况的过程,最后getClusterNodes()、getQueueInfo()则相应实现了AC请求RM获取集群节点情况、队列情况的过程。 作为RPC server端,RM中实现这个RPC功能的组件是ClientRMService,在RM内部有成员变量如下定义:
private ClientRMService clientRM;ClientRMService中也实现了ApplicationClientProtocol这一RPC接口的方法,如下:
@Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ...... ...省略主要代码... ...... return response; } @SuppressWarnings("unchecked") @Override public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnException { ...... ...省略主要代码... ...... // For UnmanagedAMs, return true so they don't retry return KillApplicationResponse.newInstance( application.getApplicationSubmissionContext().getUnmanagedAM()); } @SuppressWarnings("unchecked") @Override public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( MoveApplicationAcrossQueuesRequest request) throws YarnException { ...... ...省略主要代码... ...... return response; } @Override public GetClusterMetricsResponse getClusterMetrics( GetClusterMetricsRequest request) throws YarnException { ...... ...省略主要代码... ...... return response; } @Override public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) throws YarnException { ...... ...省略主要代码... ...... return response; } @Override public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) throws YarnException { ...... ...省略主要代码... ...... return response; }同样,具体实现以后再做详细分析。