从一个简单的命令阅读hadoop源码(下)

简介: 从一个简单的命令阅读hadoop源码(下)

2.4.2 executeKillCommand方法

代码内容及解析如下:

继续点击里面的killApplication方法:

可以看到最终使用的是YarnClientImpl里面的killApplication方法了。

2.4.3 小结

到这里,可以看到整条链路已经串好了,如下:

  • Step1: yarn脚本 (定义了入口Java类,即ApplicationCLI
  • Step2: ApplicationCLI (run方法开始执行,里面初始化yarn客户端以及执行命令脚本
  • Step3: YarnClientImpl (真正执行killApplication方法)。

那么YarnClientImpl是如何具体的执行killApplication命令呢?有兴趣的童鞋可以接着看。

2.5 YarnClient 代码分析

killApplication的代码如下(已写好注释):

/**
 * 根据给定的ApplicationID kill掉应用
 * 
 * @author : YangLinWei
 * @createTime: 2023/3/23 11:09
 */
@Override
public void killApplication(ApplicationId applicationId, String diagnostics)
        throws YarnException, IOException {
    // 1. 创建请求(KillApplicationRequest类型)
    KillApplicationRequest request =
            Records.newRecord(KillApplicationRequest.class);
    request.setApplicationId(applicationId);
    if (diagnostics != null) {
        request.setDiagnostics(diagnostics);
    }
    try {
        int pollCount = 0;
        long startTime = System.currentTimeMillis();
        // 2. rmClient不断地去请求kill掉应用
        while (true) {
            KillApplicationResponse response =
                    rmClient.forceKillApplication(request);
            // 3. 判断是否已经kill掉,如果没有,继续循环,否则跳出循环
            if (response.getIsKillCompleted()) {
                LOG.info("Killed application " + applicationId);
                break;
            }
            long elapsedMillis = System.currentTimeMillis() - startTime;
            if (enforceAsyncAPITimeout()
                    && elapsedMillis >= this.asyncApiPollTimeoutMillis) {
                throw new YarnException("Timed out while waiting for application "
                        + applicationId + " to be killed.");
            }
            if (++pollCount % 10 == 0) {
                LOG.info(
                        "Waiting for application " + applicationId + " to be killed.");
            }
            Thread.sleep(asyncApiPollIntervalMillis);
        }
    } catch (InterruptedException e) {
        String msg = "Interrupted while waiting for application "
                + applicationId + " to be killed.";
        LOG.error(msg);
        throw new YarnException(msg, e);
    }
}

可以看到,rmClient是在serviceStart方法初始化的,通过代理模式创建:

2.5.1 ApplicationClientProtocol

ClientRMProxy.createRMProxy(getConfig(),ApplicationClientProtocol.class)代码可以看出,他是通过配置来动态加载这个接口的实现的。

进入ApplicationClientProtocol看看:

那接口,谁去实现它呢,我们看看有哪些实现(Ctrl+T查不了,只能委婉地使用Ctrl+H了):

实现它的有两个类:

  • org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl
  • org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService

那就看看ApplicationClientProtocolPBClientImpl这个类吧。

2.5.2 ApplicationClientProtocolPBClientImpl

看看forceKillApplication方法:

可以看到执行了proxy.forceKillApplication(null,requestProto)方法,继续查看proxy:

继续查看ApplicationClientProtocolService:

原来rpc走的是protobuf协议:

所有交互的接口都在hadoop-yarn-api模块,这里不再详述,其实就是使用了rpc + protobuf协议。

除了这种方式,Yarn也提供了REST API,之前也写过相关的博客,有兴趣的童鞋可以去参考:

《YARN REST API 总结》

03 文末

本文主要从一个简单的“yarn application -kill”命令讲解了hadoop部分源码,希望能帮助到大家,谢谢大家的阅读,本文完!

目录
相关文章
|
2月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
48 1
|
3月前
|
资源调度 分布式计算 Hadoop
使用YARN命令管理Hadoop作业
本文介绍了如何使用YARN命令来管理Hadoop作业,包括查看作业列表、检查作业状态、杀死作业、获取作业日志以及检查节点和队列状态等操作。
76 1
使用YARN命令管理Hadoop作业
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
|
5月前
|
分布式计算 Hadoop
hadoop格式化HDFS的命令
【7月更文挑战第21天】
581 5
|
5月前
|
分布式计算 资源调度 Hadoop
Hadoop执行格式化命令
【7月更文挑战第20天】
135 1
|
6月前
|
分布式计算 Hadoop Java
|
6月前
|
分布式计算 Hadoop Shell
分布式系统详解--框架(Hadoop-基本shell命令)
分布式系统详解--框架(Hadoop-基本shell命令)
42 0
|
2月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
192 6
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
83 2
|
6天前
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
33 4