01 引言
最近经常使用到了hadoop里面的yarn kill命令,如下:
yarn application -kill application_1653633751750_0039
本文就从这个简单的命令开始分析该命令的执行流程,分析一下其源码。
02 源码分析
2.1 导入hadoop源码
首先下载hadoop的源码并导入我们的IDEA:
github地址: https://github.com/apache/hadoop
源码结构如下(红色报错忽略,我们只看源码):
2.2 源码入口
从命令可以分析,执行完这个命令之后,最后会有一句打印“Killed application
”:
项目全局搜索“Killed application
”:
可以看到整个项目只有在org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
打印了,看看这个类位于项目的哪个位置:
可以看到YarnClientImpl
是处于hadoop-yarn-project
->hadoop-yarn
->hadoop-yarn-client
下的。
从上图,不难发现,还有一个bin目录,展开:
很容易就能得知我们运行的启动命令就在这里面,这里也是程序的入口,我们来看看yarn命令里面的脚本内容。
2.3 yarn 脚本分析
从命令的内容看,一开始就定义了hadoop_usage
方法,也就是我们使用这个命令时的help
提示:
继续往下翻,可以看到定义了yarncmd_case
方法,这里的意思判断终端输入的命令里面是否包含一些特定的子命令(如:application
),然后就在org.apache.hadoop.yarn.client.cli.ApplicationCLI
这个类里面执行了:
其余的命令内容不重要了,我们主要看我们需要的,继续进入ApplicationCLI类看看。
2.4 ApplicationCLI 代码分析
进入ApplicationCLI这个类,可以看到里面的run方法内容如下(包含解析):
@Override
public int run(String[] args) throws Exception {
int exitCode = -1;
// 定义命令的title和参数
Options opts = new Options();
String title = null;
if (firstArg != null) {
title = firstArg;
} else if (args.length > 0) {
title = args[0];
}
if (APPLICATION.equalsIgnoreCase(title) || APP.equalsIgnoreCase(title)) {
title = APPLICATION;
addApplicationOptions(opts);
} else if (APPLICATION_ATTEMPT.equalsIgnoreCase(title)) {
addApplicationAttemptOptions(opts);
} else if (CONTAINER.equalsIgnoreCase(title)) {
addContainerOptions(opts);
}
// 解析命令
CommandLine cliParser = createCLIParser(opts, args);
if (cliParser == null) {
printUsage(title, opts);
return exitCode;
}
// 创建yarn客户端
if (cliParser.hasOption(CLUSTER_ID_OPTION)) {
String clusterIdStr = cliParser.getOptionValue(CLUSTER_ID_OPTION);
getConf().set(YarnConfiguration.RM_CLUSTER_ID, clusterIdStr);
}
createAndStartYarnClient();
// 执行命令
if (cliParser.hasOption(STATUS_CMD)) {
return executeStatusCommand(cliParser, title, opts);
} else if (cliParser.hasOption(LIST_CMD)) {
return executeListCommand(cliParser, title, opts);
} else if (cliParser.hasOption(KILL_CMD)) {
return executeKillCommand(cliParser, title, opts);
} else if (cliParser.hasOption(MOVE_TO_QUEUE_CMD)) {
return executeMoveToQueueCommand(cliParser, title, opts);
} else if (cliParser.hasOption(FAIL_CMD)) {
return executeFailCommand(cliParser, title, opts);
} else if (cliParser.hasOption(UPDATE_PRIORITY)) {
return executeUpdatePriorityCommand(cliParser, title, opts);
} else if (cliParser.hasOption(SIGNAL_CMD)) {
return executeSignalCommand(cliParser, title, opts);
} else if (cliParser.hasOption(SHELL_CMD)) {
return executeShellCommand(cliParser, title, opts);
} else if (cliParser.hasOption(LAUNCH_CMD)) {
return executeLaunchCommand(cliParser, title, opts);
} else if (cliParser.hasOption(STOP_CMD)) {
return executeStopCommand(cliParser, title, opts);
} else if (cliParser.hasOption(START_CMD)) {
return executeStartCommand(cliParser, title, opts);
} else if (cliParser.hasOption(SAVE_CMD)) {
return executeSaveCommand(cliParser, title, opts);
} else if (cliParser.hasOption(DESTROY_CMD)) {
return executeDestroyCommand(cliParser, title, opts);
} else if (cliParser.hasOption(FLEX_CMD)) {
return executeFlexCommand(cliParser, title, opts);
} else if (cliParser.hasOption(ENABLE_FAST_LAUNCH)) {
return executeEnableFastLaunchCommand(cliParser, title, opts);
} else if (cliParser.hasOption(UPDATE_LIFETIME)) {
return executeUpdateLifeTimeCommand(cliParser, title, opts);
} else if (cliParser.hasOption(CHANGE_APPLICATION_QUEUE)) {
return executeChangeApplicationQueueCommand(cliParser, title, opts);
} else if (cliParser.hasOption(UPGRADE_CMD)) {
return executeUpgradeCommand(cliParser, title, opts);
} else if (cliParser.hasOption(DECOMMISSION)) {
return executeDecommissionCommand(cliParser, title, opts);
} else if (cliParser.hasOption(HELP_CMD)) {
printUsage(title, opts);
return 0;
} else {
syserr.println("Invalid Command Usage : ");
printUsage(title, opts);
}
return 0;
}
从该方法,可以看到做了几个事:
- ① 解析命令(
createCLIParser(opts, args);
) - ② 创建并启动yarn客户端(
createAndStartYarnClient()
) - ③ 执行命令(
executeKillCommand(cliParser, title, opts)
)
下面根据步骤讲解。
2.4.1 创建并启动yarn客户端
可以看到创建并启动yarn客户端代码在YarnCLI代码里面,而ApplicationCLI是继承与YarnCLI的:
createAndStartYarnClient | 继承关系 |
---|---|
继续点击createYarnClient
方法:
可以看得出,这个类就是一开始本文根据提示查找出来的 源码入口类 了,那是如何执行到里面的killApplication
方法呢?先不看这个类里面的源码,继续看ApplicationCLI
是如何执行这个executeKillCommand
方法的。
2.4.2 executeKillCommand方法
代码内容及解析如下:
继续点击里面的killApplication方法:
可以看到最终使用的是YarnClientImpl里面的killApplication方法了。
2.4.3 小结
到这里,可以看到整条链路已经串好了,如下:
- Step1: yarn脚本 (定义了入口
Java
类,即ApplicationCLI
) - Step2: ApplicationCLI (run方法开始执行,里面初始化yarn客户端以及执行命令脚本)
- Step3: YarnClientImpl (真正执行killApplication方法)。
那么YarnClientImpl是如何具体的执行killApplication命令呢?有兴趣的童鞋可以接着看。
2.5 YarnClient 代码分析
killApplication的代码如下(已写好注释):
/**
* 根据给定的ApplicationID kill掉应用
*
* @author : YangLinWei
* @createTime: 2023/3/23 11:09
*/
@Override
public void killApplication(ApplicationId applicationId, String diagnostics)
throws YarnException, IOException {
// 1. 创建请求(KillApplicationRequest类型)
KillApplicationRequest request =
Records.newRecord(KillApplicationRequest.class);
request.setApplicationId(applicationId);
if (diagnostics != null) {
request.setDiagnostics(diagnostics);
}
try {
int pollCount = 0;
long startTime = System.currentTimeMillis();
// 2. rmClient不断地去请求kill掉应用
while (true) {
KillApplicationResponse response =
rmClient.forceKillApplication(request);
// 3. 判断是否已经kill掉,如果没有,继续循环,否则跳出循环
if (response.getIsKillCompleted()) {
LOG.info("Killed application " + applicationId);
break;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout()
&& elapsedMillis >= this.asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application "
+ applicationId + " to be killed.");
}
if (++pollCount % 10 == 0) {
LOG.info(
"Waiting for application " + applicationId + " to be killed.");
}
Thread.sleep(asyncApiPollIntervalMillis);
}
} catch (InterruptedException e) {
String msg = "Interrupted while waiting for application "
+ applicationId + " to be killed.";
LOG.error(msg);
throw new YarnException(msg, e);
}
}
可以看到,rmClient是在serviceStart方法初始化的,通过代理模式创建:
2.5.1 ApplicationClientProtocol
从ClientRMProxy.createRMProxy(getConfig(),ApplicationClientProtocol.class)
代码可以看出,他是通过配置来动态加载这个接口的实现的。
进入ApplicationClientProtocol看看:
那接口,谁去实现它呢,我们看看有哪些实现(Ctrl+T查不了,只能委婉地使用Ctrl+H了):
实现它的有两个类:
- org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl
- org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService
那就看看ApplicationClientProtocolPBClientImpl这个类吧。
2.5.2 ApplicationClientProtocolPBClientImpl
看看forceKillApplication方法:
可以看到执行了proxy.forceKillApplication(null,requestProto)方法,继续查看proxy:
继续查看ApplicationClientProtocolService:
原来rpc走的是protobuf协议:
所有交互的接口都在hadoop-yarn-api模块,这里不再详述,其实就是使用了rpc + protobuf协议。
除了这种方式,Yarn也提供了REST API,之前也写过相关的博客,有兴趣的童鞋可以去参考:
《YARN REST API 总结》
03 文末
本文主要从一个简单的“yarn application -kill”命令讲解了hadoop部分源码,希望能帮助到大家,谢谢大家的阅读,本文完!