MapReduce源码分析之新API作业提交(二):连接集群

简介:          MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下: private synchronized void connect() throws IOExcep...

         MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下:

  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    
	// 如果cluster为null,构造Cluster实例cluster,
	// Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法
	if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }
        这个方法用synchronized关键字标识,处理逻辑为:如果cluster为null,构造Cluster实例cluster。

        Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法,我们看下它的成员变量,如下所示:

  // 客户端通信协议提供者
  private ClientProtocolProvider clientProtocolProvider;
  // 客户端通信协议实例
  private ClientProtocol client;
  
  // 用户信息
  private UserGroupInformation ugi;
  
  // 配置信息
  private Configuration conf;
  
  // 文件系统实例
  private FileSystem fs = null;
  
  // 系统路径
  private Path sysDir = null;
  
  // 阶段区域路径
  private Path stagingAreaDir = null;
  
  // 作业历史路径
  private Path jobHistoryDir = null;
  
  // 日志
  private static final Log LOG = LogFactory.getLog(Cluster.class);

  // 客户端通信协议提供者加载器
  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
      ServiceLoader.load(ClientProtocolProvider.class);
        Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider,客户端通信协议ClientProtocol实例client,而后者是依托前者的create()方法生成的。

        Cluster提供了两个构造函数,如下:

  public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
<span style="white-space:pre">	</span>// 设置配置信息
    this.conf = conf;
    
    // 获取当前用户
    this.ugi = UserGroupInformation.getCurrentUser();
    
    // 调用initialize()方法完成初始化
    initialize(jobTrackAddr, conf);
  }
        最终会调用initialize()方法完成初始化,代码如下:

  // 确定客户端ClientProtocol实例client
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
    	
      // 取出每个ClientProtocolProvider实例provider,通过其create()方法,
      // 构造ClientProtocol实例clientProtocol,
      // 并将两者赋值给对类应成员变量,退出循环
      for (ClientProtocolProvider provider : frameworkLoader) {
        
    	LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        
    	ClientProtocol clientProtocol = null; 
        
    	try {
        	
          // 通过ClientProtocolProvider的create()方法,获取客户端与集群通讯ClientProtocol实例clientProtocol
          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          // 设置类成员变量clientProtocolProvider、client,并退出循环
          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            
            // 记录debug级别日志信息
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
        	  
        	// 记录debug级别日志信息
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

    // 如果clientProtocolProvider、client任一为空,直接抛出IO异常
    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }
        initialize()方法唯一的一个任务就是确定客户端通信协议提供者clientProtocolProvider,并通过其create()方法构造客户端通信协议ClientProtocol实例client。

        MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。

        我们先看下Yarn模式,看下YarnClientProtocolProvider的create()方法,代码如下:

  @Override
  public ClientProtocol create(Configuration conf) throws IOException {
	  
	// 如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null
    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
      return new YARNRunner(conf);
    }
    return null;
  }
        Yarn模式下,如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null,关于YARNRunner,我们待会再讲,我们接着再看下Local模式,LocalClientProtocolProvider的create()方法,代码如下:

  @Override
  public ClientProtocol create(Configuration conf) throws IOException {
    
	// 初始化framework:取参数mapreduce.framework.name,参数未配置默认为local
	String framework =
        conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
	
	// 如果framework是local,,则返回LocalJobRunner实例,并设置map任务数量为1,否则返回null
    if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
      return null;
    }
    conf.setInt(JobContext.NUM_MAPS, 1);

    return new LocalJobRunner(conf);
  }
        Local模式也是需要看参数mapreduce.framework.name的配置是否为local,是的话,返回LocalJobRunner实例,并设置map任务数量为1,否则返回null,值得一提的是,这里参数mapreduce.framework.name未配置的话,默认为local,也就是说,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。

        到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner,记住这点,对透彻了解MapReduce作业提交的整体流程非常重要。

        好了,我们继续以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现,先看下它的成员变量,如下:

  // 记录工厂RecordFactory实例
  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
  
  // ResourceManager代理ResourceMgrDelegate实例
  private ResourceMgrDelegate resMgrDelegate;
  
  // 客户端缓存ClientCache实例
  private ClientCache clientCache;
  
  // 配置信息Configuration实例
  private Configuration conf;
  
  // 文件上下文FileContext实例
  private final FileContext defaultFileContext;

        其中,最重要的一个变量就是ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。关于ResourceMgrDelegate的详细介绍,请阅读《MapReduce源码分析ResourceMgrDelegate》一文,这里不再做详细介绍。

        另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache,

        接下来,我们看下YARNRunner的构造函数,如下:

  /**
   * Yarn runner incapsulates the client interface of
   * yarn
   * @param conf the configuration object for the client
   */
  public YARNRunner(Configuration conf) {
	  
   // 先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数
   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
  }

  /**
   * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
   * {@link ResourceMgrDelegate}. Enables mocking and testing.
   * @param conf the configuration object for the client
   * @param resMgrDelegate the resourcemanager client handle.
   */
  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
   // 先构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数
   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
  }

  /**
   * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
   * but allowing injecting {@link ClientCache}. Enable mocking and testing.
   * @param conf the configuration object
   * @param resMgrDelegate the resource manager delegate
   * @param clientCache the client cache object.
   */
  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
      ClientCache clientCache) {
    
	// 成员变量赋值  
	this.conf = conf;
    try {
      this.resMgrDelegate = resMgrDelegate;
      this.clientCache = clientCache;
      
      // 获取文件山下文FileContext实例defaultFileContext
      this.defaultFileContext = FileContext.getFileContext(this.conf);
    } catch (UnsupportedFileSystemException ufe) {
      throw new RuntimeException("Error in instantiating YarnClient", ufe);
    }
  }
        YARNRunner一共提供了三个构造函数,而我们之前说的WordCount作业提交时,其内部调用的是YARNRunner带有一个参数的构造函数,它会先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数,继而构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数,而最终的构造函数只是进行简单的类成员变量赋值,然后通过FileContext的静态getFileContext()方法获取文件山下文FileContext实例defaultFileContext。

        总结

        MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.6.0中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。









相关文章
|
10月前
|
人工智能 安全 API
Higress MCP Server 安全再升级:API 认证为 AI 连接保驾护航
Higress MCP Server 新增了 API 认证功能,为 AI 连接提供安全保障。主要更新包括:1) 客户端到 MCP Server 的认证,支持 Key Auth、JWT Auth 和 OAuth2;2) MCP Server 到后端 API 的认证,增强第二阶段的安全性。新增功能如可重用认证方案、工具特定后端认证、透明凭证透传及灵活凭证管理,确保安全集成更多后端服务。通过 openapi-to-mcp 工具简化配置,减少手动工作量。企业版提供更高可用性保障,详情参见文档链接。
1225 42
|
8月前
|
运维 Prometheus 监控
云原生 API 网关 x OKG:游戏连接治理的「最后一公里」
本文介绍了云原生技术在游戏连接治理中的应用,重点探讨了如何通过 OpenKruiseGame(OKG)与云原生 API 网关的结合,实现游戏服务的优雅下线与无感配置变更。文章分析了游戏服务的强状态特性所带来的挑战,并提出了基于状态感知与连接管理的解决方案,保障玩家会话的连续性与体验的稳定性。同时,还介绍了如何通过零改造接入、全栈可观测性与简化的 API 治理,缩短游戏服务云原生化的“最后一公里”。
354 4
|
8月前
|
Java 分布式数据库 Docker
使用Docker配置并连接HBase的Java API
本流程概要的解释了如何在Docker上配置并启动HBase服务,并通过Java API进行连接和操作表,不涉及具体的业务逻辑处理和数据模型设计,这些因应用而异需由开发者根据实际需求进行实现。
373 13
|
8月前
|
运维 Prometheus 监控
API 网关 x OKG:游戏连接治理的「最后一公里」
本文介绍了 API 网关与 OpenKruiseGame(OKG)结合,在云原生游戏场景中实现连接治理“最后一公里”的解决方案。针对游戏服务的有状态特性,该方案通过精细化流量管理和无感变更能力,保障玩家会话连续性,提升运维效率,助力游戏服务实现优雅下线、配置动态更新等功能,同时提供零改造接入和全栈可观测性,显著优化游戏体验与开发运维流程。
372 0
|
9月前
|
XML JSON API
API接口——连接世界,让你的数据畅通无阻!
API(应用程序编程接口)是连接不同软件系统的桥梁,如同数字世界的“万能适配器”。它通过标准化协议(如RESTful)和数据格式(如JSON/XML),实现前端请求与后端服务的无缝交互。API不仅提升了系统间的协作效率,还通过OAuth 2.0等技术保障安全性,并支持流量管控以优化性能。其应用场景广泛,包括金融科技、物联网、电子商务和社会化平台,为企业带来显著效益,如降低成本、提升响应速度和用户增长。未来,API将向智能化、微服务化、低代码集成和隐私计算方向发展,成为构建数字生态的核心技术,助力万物互联时代的创新与发展。
1277 1
|
9月前
|
搜索推荐 安全 API
聚合电商API:一键连接多平台数据
聚合电商API接口平台整合淘宝、天猫、京东等多平台API,提供一站式数据服务。核心功能包括数据整合、多平台搜索、详细解析与定制化服务,助力商家高效管理订单、优化商品推荐及支持数据驱动决策。平台注重智能化、个性化与全渠道发展,保障数据安全,推动业务增长。
1032 2
|
12月前
|
人工智能 JavaScript 测试技术
构建智能 API 开发环境:在 Cursor 中连接 Apifox MCP Server
本文介绍了如何将Apifox MCP Server与Cursor结合,通过AI直接获取和理解API文档,大幅提升开发效率。首先需配置Apifox的Access Token和项目ID,并在Cursor中设置MCP连接。实际应用场景包括快速生成模型代码、同步更新接口文档与代码、生成CRUD操作、搜索API文档及自动生成测试用例。此外,还提供了管理多项目、安全性实践和优化AI响应质量的技巧。这种组合可显著减少从API规范到代码实现的时间,降低错误率并加速迭代过程,为开发者带来更高效的体验。
|
监控 API 索引
Elasticsearch集群使用 _cluster/health API
Elasticsearch集群使用 _cluster/health API
604 2
|
Unix API 索引
Elasticsearch集群使用 _cat/health API
Elasticsearch集群使用 _cat/health API
320 1
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
355 3

热门文章

最新文章