从一个简单的命令阅读hadoop源码

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 从一个简单的命令阅读hadoop源码

01 引言

最近经常使用到了hadoop里面的yarn kill命令,如下:

yarn application -kill application_1653633751750_0039

本文就从这个简单的命令开始分析该命令的执行流程,分析一下其源码。

02 源码分析

2.1 导入hadoop源码

首先下载hadoop的源码并导入我们的IDEA:

github地址https://github.com/apache/hadoop

源码结构如下红色报错忽略,我们只看源码):
在这里插入图片描述

2.2 源码入口

从命令可以分析,执行完这个命令之后,最后会有一句打印“Killed application ”:
在这里插入图片描述
项目全局搜索“Killed application ”:
在这里插入图片描述

可以看到整个项目只有在org.apache.hadoop.yarn.client.api.impl.YarnClientImpl打印了,看看这个类位于项目的哪个位置:
在这里插入图片描述
可以看到YarnClientImpl是处于hadoop-yarn-project ->hadoop-yarn ->hadoop-yarn-client下的。

从上图,不难发现,还有一个bin目录,展开:
在这里插入图片描述
很容易就能得知我们运行的启动命令就在这里面,这里也是程序的入口,我们来看看yarn命令里面的脚本内容。

2.3 yarn 脚本分析

从命令的内容看,一开始就定义了hadoop_usage方法,也就是我们使用这个命令时的help提示:
在这里插入图片描述
继续往下翻,可以看到定义了yarncmd_case方法,这里的意思判断终端输入的命令里面是否包含一些特定的子命令(如:application),然后就在org.apache.hadoop.yarn.client.cli.ApplicationCLI这个类里面执行了:
在这里插入图片描述
其余的命令内容不重要了,我们主要看我们需要的,继续进入ApplicationCLI类看看。

2.4 ApplicationCLI 代码分析

进入ApplicationCLI这个类,可以看到里面的run方法内容如下(包含解析):

@Override
  public int run(String[] args) throws Exception {
    int exitCode = -1;

    // 定义命令的title和参数
    Options opts = new Options();
    String title = null;
    if (firstArg != null) {
      title = firstArg;
    } else if (args.length > 0) {
      title = args[0];
    }

    if (APPLICATION.equalsIgnoreCase(title) || APP.equalsIgnoreCase(title)) {
      title = APPLICATION;
      addApplicationOptions(opts);
    } else if (APPLICATION_ATTEMPT.equalsIgnoreCase(title)) {
      addApplicationAttemptOptions(opts);
    } else if (CONTAINER.equalsIgnoreCase(title)) {
      addContainerOptions(opts);
    }

    // 解析命令
    CommandLine cliParser = createCLIParser(opts, args);
    if (cliParser == null) {
      printUsage(title, opts);
      return exitCode;
    }

    // 创建yarn客户端
    if (cliParser.hasOption(CLUSTER_ID_OPTION)) {
      String clusterIdStr = cliParser.getOptionValue(CLUSTER_ID_OPTION);
      getConf().set(YarnConfiguration.RM_CLUSTER_ID, clusterIdStr);
    }
    createAndStartYarnClient();

    // 执行命令
    if (cliParser.hasOption(STATUS_CMD)) {
      return executeStatusCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(LIST_CMD)) {
      return executeListCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(KILL_CMD)) {
      return executeKillCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(MOVE_TO_QUEUE_CMD)) {
      return executeMoveToQueueCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(FAIL_CMD)) {
      return executeFailCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(UPDATE_PRIORITY)) {
      return executeUpdatePriorityCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(SIGNAL_CMD)) {
      return executeSignalCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(SHELL_CMD)) {
      return executeShellCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(LAUNCH_CMD)) {
      return executeLaunchCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(STOP_CMD)) {
      return executeStopCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(START_CMD)) {
      return executeStartCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(SAVE_CMD)) {
      return executeSaveCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(DESTROY_CMD)) {
      return executeDestroyCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(FLEX_CMD)) {
      return executeFlexCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(ENABLE_FAST_LAUNCH)) {
      return executeEnableFastLaunchCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(UPDATE_LIFETIME)) {
      return executeUpdateLifeTimeCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(CHANGE_APPLICATION_QUEUE)) {
      return executeChangeApplicationQueueCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(UPGRADE_CMD)) {
      return executeUpgradeCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(DECOMMISSION)) {
      return executeDecommissionCommand(cliParser, title, opts);
    } else if (cliParser.hasOption(HELP_CMD)) {
      printUsage(title, opts);
      return 0;
    } else {
      syserr.println("Invalid Command Usage : ");
      printUsage(title, opts);
    }
    return 0;
  }

从该方法,可以看到做了几个事:

  • 解析命令createCLIParser(opts, args);
  • 创建并启动yarn客户端 createAndStartYarnClient()
  • 执行命令 executeKillCommand(cliParser, title, opts)

下面根据步骤讲解。

2.4.1 创建并启动yarn客户端

可以看到创建并启动yarn客户端代码在YarnCLI代码里面,而ApplicationCLI是继承与YarnCLI的:

createAndStartYarnClient 继承关系
在这里插入图片描述 在这里插入图片描述

继续点击createYarnClient方法:
在这里插入图片描述
可以看得出,这个类就是一开始本文根据提示查找出来的 源码入口类 了,那是如何执行到里面的killApplication方法呢?先不看这个类里面的源码,继续看ApplicationCLI是如何执行这个executeKillCommand方法的。

2.4.2 executeKillCommand方法

代码内容及解析如下:
在这里插入图片描述
继续点击里面的killApplication方法:
在这里插入图片描述
可以看到最终使用的是YarnClientImpl里面的killApplication方法了。

2.4.3 小结

到这里,可以看到整条链路已经串好了,如下:

  • Step1: yarn脚本 定义了入口Java类,即ApplicationCLI
  • Step2: ApplicationCLI run方法开始执行,里面初始化yarn客户端以及执行命令脚本
  • Step3: YarnClientImpl 真正执行killApplication方法)。

那么YarnClientImpl是如何具体的执行killApplication命令呢?有兴趣的童鞋可以接着看。

2.5 YarnClient 代码分析

killApplication的代码如下(已写好注释):

/**
 * 根据给定的ApplicationID kill掉应用
 * 
 * @author : YangLinWei
 * @createTime: 2023/3/23 11:09
 */
@Override
public void killApplication(ApplicationId applicationId, String diagnostics)
        throws YarnException, IOException {
    
    // 1. 创建请求(KillApplicationRequest类型)
    KillApplicationRequest request =
            Records.newRecord(KillApplicationRequest.class);
    request.setApplicationId(applicationId);

    if (diagnostics != null) {
        request.setDiagnostics(diagnostics);
    }

    try {
        int pollCount = 0;
        long startTime = System.currentTimeMillis();
        
        // 2. rmClient不断地去请求kill掉应用
        while (true) {
            KillApplicationResponse response =
                    rmClient.forceKillApplication(request);
            
            // 3. 判断是否已经kill掉,如果没有,继续循环,否则跳出循环
            if (response.getIsKillCompleted()) {
                LOG.info("Killed application " + applicationId);
                break;
            }

            long elapsedMillis = System.currentTimeMillis() - startTime;
            if (enforceAsyncAPITimeout()
                    && elapsedMillis >= this.asyncApiPollTimeoutMillis) {
                throw new YarnException("Timed out while waiting for application "
                        + applicationId + " to be killed.");
            }

            if (++pollCount % 10 == 0) {
                LOG.info(
                        "Waiting for application " + applicationId + " to be killed.");
            }
            Thread.sleep(asyncApiPollIntervalMillis);
        }
    } catch (InterruptedException e) {
        String msg = "Interrupted while waiting for application "
                + applicationId + " to be killed.";
        LOG.error(msg);
        throw new YarnException(msg, e);
    }
}

可以看到,rmClient是在serviceStart方法初始化的,通过代理模式创建:
在这里插入图片描述

2.5.1 ApplicationClientProtocol

ClientRMProxy.createRMProxy(getConfig(),ApplicationClientProtocol.class)代码可以看出,他是通过配置来动态加载这个接口的实现的。

进入ApplicationClientProtocol看看:
在这里插入图片描述
那接口,谁去实现它呢,我们看看有哪些实现(Ctrl+T查不了,只能委婉地使用Ctrl+H了):
在这里插入图片描述

实现它的有两个类:

  • org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl
  • org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService

那就看看ApplicationClientProtocolPBClientImpl这个类吧。

2.5.2 ApplicationClientProtocolPBClientImpl

看看forceKillApplication方法:
在这里插入图片描述
可以看到执行了proxy.forceKillApplication(null,requestProto)方法,继续查看proxy:
在这里插入图片描述
继续查看ApplicationClientProtocolService:
在这里插入图片描述
原来rpc走的是protobuf协议:
在这里插入图片描述
所有交互的接口都在hadoop-yarn-api模块,这里不再详述,其实就是使用了rpc + protobuf协议。

除了这种方式,Yarn也提供了REST API,之前也写过相关的博客,有兴趣的童鞋可以去参考:
《YARN REST API 总结》

03 文末

本文主要从一个简单的“yarn application -kill”命令讲解了hadoop部分源码,希望能帮助到大家,谢谢大家的阅读,本文完!

目录
相关文章
|
2月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
43 1
|
3月前
|
资源调度 分布式计算 Hadoop
使用YARN命令管理Hadoop作业
本文介绍了如何使用YARN命令来管理Hadoop作业,包括查看作业列表、检查作业状态、杀死作业、获取作业日志以及检查节点和队列状态等操作。
67 1
使用YARN命令管理Hadoop作业
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
|
5月前
|
分布式计算 Hadoop
hadoop格式化HDFS的命令
【7月更文挑战第21天】
551 5
|
5月前
|
分布式计算 资源调度 Hadoop
Hadoop执行格式化命令
【7月更文挑战第20天】
130 1
|
6月前
|
分布式计算 Hadoop Java
|
6月前
|
分布式计算 Hadoop Shell
分布式系统详解--框架(Hadoop-基本shell命令)
分布式系统详解--框架(Hadoop-基本shell命令)
42 0
|
2月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
181 6
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
74 2