2.4.2 executeKillCommand方法
代码内容及解析如下:
继续点击里面的killApplication方法:
可以看到最终使用的是YarnClientImpl里面的killApplication方法了。
2.4.3 小结
到这里,可以看到整条链路已经串好了,如下:
- Step1: yarn脚本 (定义了入口
Java
类,即ApplicationCLI
) - Step2: ApplicationCLI (run方法开始执行,里面初始化yarn客户端以及执行命令脚本)
- Step3: YarnClientImpl (真正执行killApplication方法)。
那么YarnClientImpl是如何具体的执行killApplication命令呢?有兴趣的童鞋可以接着看。
2.5 YarnClient 代码分析
killApplication的代码如下(已写好注释):
/** * 根据给定的ApplicationID kill掉应用 * * @author : YangLinWei * @createTime: 2023/3/23 11:09 */ @Override public void killApplication(ApplicationId applicationId, String diagnostics) throws YarnException, IOException { // 1. 创建请求(KillApplicationRequest类型) KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class); request.setApplicationId(applicationId); if (diagnostics != null) { request.setDiagnostics(diagnostics); } try { int pollCount = 0; long startTime = System.currentTimeMillis(); // 2. rmClient不断地去请求kill掉应用 while (true) { KillApplicationResponse response = rmClient.forceKillApplication(request); // 3. 判断是否已经kill掉,如果没有,继续循环,否则跳出循环 if (response.getIsKillCompleted()) { LOG.info("Killed application " + applicationId); break; } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= this.asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be killed."); } if (++pollCount % 10 == 0) { LOG.info( "Waiting for application " + applicationId + " to be killed."); } Thread.sleep(asyncApiPollIntervalMillis); } } catch (InterruptedException e) { String msg = "Interrupted while waiting for application " + applicationId + " to be killed."; LOG.error(msg); throw new YarnException(msg, e); } }
可以看到,rmClient是在serviceStart方法初始化的,通过代理模式创建:
2.5.1 ApplicationClientProtocol
从ClientRMProxy.createRMProxy(getConfig(),ApplicationClientProtocol.class)
代码可以看出,他是通过配置来动态加载这个接口的实现的。
进入ApplicationClientProtocol看看:
那接口,谁去实现它呢,我们看看有哪些实现(Ctrl+T查不了,只能委婉地使用Ctrl+H了):
实现它的有两个类:
- org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl
- org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService
那就看看ApplicationClientProtocolPBClientImpl这个类吧。
2.5.2 ApplicationClientProtocolPBClientImpl
看看forceKillApplication方法:
可以看到执行了proxy.forceKillApplication(null,requestProto)方法,继续查看proxy:
继续查看ApplicationClientProtocolService:
原来rpc走的是protobuf协议:
所有交互的接口都在hadoop-yarn-api模块,这里不再详述,其实就是使用了rpc + protobuf协议。
除了这种方式,Yarn也提供了REST API,之前也写过相关的博客,有兴趣的童鞋可以去参考:
03 文末
本文主要从一个简单的“yarn application -kill”命令讲解了hadoop部分源码,希望能帮助到大家,谢谢大家的阅读,本文完!