Hadoop2源码分析-YARN 的服务库和事件库

简介:

1.概述

  在《Hadoop2源码分析-YARN RPC 示例介绍》一文当中,给大家介绍了YARN 的 RPC 机制,以及相关代码的演示,今天我们继续去学习 YARN 的服务库和事件库,分享目录如下所示:

  • 服务库和事件库介绍
  • 使用示例
  • 截图预览

  下面开始今天的内容分享。

2.服务库和事件库介绍

2.1服务库

  YARN对于生命周期较长的对象使用服务的对象模型进行管理,主要特点如下:

  • 用于被服务化的对象包含4个状态,他们分别是:被创建、已初始化、已启动和已停止。源代码地址在 org.apache.hadoop.service 的 Service 接口中,内容如下所示:
复制代码
public enum STATE {
    /** Constructed but not initialized */
    NOTINITED(0, "NOTINITED"),

    /** Initialized but not started or stopped */
    INITED(1, "INITED"),

    /** started and not stopped */
    STARTED(2, "STARTED"),

    /** stopped. No further state transitions are permitted */
    STOPPED(3, "STOPPED");

    /**
     * An integer value for use in array lookup and JMX interfaces.
     * Although {@link Enum#ordinal()} could do this, explicitly
     * identify the numbers gives more stability guarantees over time.
     */
    private final int value;

    /**
     * A name of the state that can be used in messages
     */
    private final String statename;

    private STATE(int value, String name) {
      this.value = value;
      this.statename = name;
    }

    /**
     * Get the integer value of a state
     * @return the numeric value of the state
     */
    public int getValue() {
      return value;
    }

    /**
     * Get the name of a state
     * @return the state's name
     */
    @Override
    public String toString() {
      return statename;
    }
  }
复制代码
public abstract class AbstractService implements Service {
  
  // ......

}

  通过阅读代码,我们可以看出,服务的对象它实现了接口Service,并定义了最基本的服务状态:创建、初始化、启动以及停止。对于 AbstractService 类来说,它实现了 Service 接口。

  • 任何服务状态的变化都可以触发其他的动作,例如:
复制代码
public void start() {
    if (isInState(STATE.STARTED)) {
      return;
    }
    //enter the started state
    synchronized (stateChangeLock) {
      if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
        try {
          startTime = System.currentTimeMillis();
          serviceStart();
          if (isInState(STATE.STARTED)) {
            //if the service started (and isn't now in a later state), notify
            if (LOG.isDebugEnabled()) {
              LOG.debug("Service " + getName() + " is started");
            }
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }
复制代码

  这里,我们会去触发一个监听动作,全局监听状态的改变,异常的捕捉监听等。

  • 可以通过组合的方式进行服务组合,这样做的好处是便于统一去管理:在 YARN 中,如果是非组合服务,可以直接继承 AbstractService 类,否则需继承 CompositeService。

2.2事件库

  在 YARN 中,核心服务其本质就是一个中央异步调度器,包含有ResourceManager、 NodeManager、MRAppMaster等内容,YARN 事件与事件处理器的关系在 

org.apache.hadoop.yarn.event  中。在使用 YARN 事件库的时候,需要先定义一个中央异步调度器 AsyncDispatcher,它负责事件的处理与转发,然后我们根据实际业务需求定义一系列事件 Event 与事件处理器 EventHandler,并将事件注册到中央异步调度器中用于完成事件统一管理和应用调度。流程如下图所示:

3.使用示例

  接下来,我们编写示例代码,去代码中理解这部分流程。

  • 首先是 JMRAppMaster 类:
复制代码
package cn.hadoop.task.exec;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;

import cn.hadoop.task.CompositeService;
import cn.hadoop.task.JobEvent;
import cn.hadoop.task.JobEventType;
import cn.hadoop.task.TaskEvent;
import cn.hadoop.task.TaskEventType;

/**
 * @Date Jul 22, 2015
 *
 * @Author dengjie
 *
 * @Note TODO
 */
public class JMRAppMaster extends CompositeService {
    private Dispatcher dispatcher; // AsyncDispatcher
    private String jobID;
    private int taskNumber; // include numbers
    private String[] taskIDs; // include all task

    public JMRAppMaster(String name, String jobID, int taskNumber) {
        super(name);
        this.jobID = jobID;
        this.taskNumber = taskNumber;
        taskIDs = new String[taskNumber];
        for (int i = 0; i < taskNumber; i++) {
            taskIDs[i] = new String(this.jobID + "_task_" + i);
        }
    }

    public void serviceInit(Configuration conf) throws Exception {
        dispatcher = new AsyncDispatcher();// default a AsyncDispatcher
        dispatcher.register(JobEventType.class, new JobEventDispatcher());// register a job
        dispatcher.register(TaskEventType.class, new TaskEventDispatcher());// register a task
        addService((Service) dispatcher);
        super.serviceInit(conf);
    }

    public Dispatcher getDispatcher() {
        return dispatcher;
    }

    private class JobEventDispatcher implements EventHandler<JobEvent> {

        @SuppressWarnings("unchecked")
        public void handle(JobEvent event) {
            if (event.getType() == JobEventType.JOB_KILL) {
                System.out.println("Receive JOB_KILL event, killing all the tasks");
                for (int i = 0; i < taskNumber; i++) {
                    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_KILL));
                }
            } else if (event.getType() == JobEventType.JOB_INIT) {
                System.out.println("Receive JOB_INIT event, scheduling tasks");
                for (int i = 0; i < taskNumber; i++) {
                    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE));
                }
            }
        }
    }

    private class TaskEventDispatcher implements EventHandler<TaskEvent> {

        public void handle(TaskEvent event) {
            if (event.getType() == TaskEventType.T_KILL) {
                System.out.println("Receive T_KILL event of task id " + event.getTaskID());
            } else if (event.getType() == TaskEventType.T_SCHEDULE) {
                System.out.println("Receive T_SCHEDULE event of task id " + event.getTaskID());
            }
        }
    }
}
复制代码

  另外,还需要添加一些其他类,这些类以来可以在 Hadoop 源码工程中找到,这里就不贴代码了,大家可以到 Hadoop 工程的源码中找到对应的类,相关类名如下图所示:

  接下来是一个测试类,去测试一下我们所编写的示例流程。

  • JMRAppMasterTest类:

复制代码
package cn.hadoop.rpc.test.yarn.task;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import cn.hadoop.task.JobEvent;
import cn.hadoop.task.JobEventType;
import cn.hadoop.task.exec.JMRAppMaster;

/**
 * @Date Jul 22, 2015
 *
 * @Author dengjie
 *
 * @Note TODO
 */
public class JMRAppMasterTest {
    @SuppressWarnings({ "unchecked", "resource" })
    public static void main(String[] args) {
        String jobID = "job_20150723_11";
        JMRAppMaster appMaster = new JMRAppMaster("Simple MRAppMaster Test", jobID, 10);
        YarnConfiguration conf = new YarnConfiguration(new Configuration());
        try {
            appMaster.serviceInit(conf);
            appMaster.serviceStart();
        } catch (Exception e) {
            e.printStackTrace();
        }
        appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_KILL));
        appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_INIT));
    }
}
复制代码

4.截图预览

  在编写完成相关流程代码后,我们运行代码来观察整个流程,截图如下所示:

5.总结

  在编写这部分流程代码时,可以参考 Hadoop YARN 部分的工程源码,通过运行调试代码,掌握对事件库和服务库的流程,以及它们的工作机制。另外,在编写的过程当中,最好将源码的文件引入到自己的工程,不要单独使用 JAR 包的方式导入,由于我们是独立运行某个模块,需要改动源代码的函数访问权限,若是直接引入 JAR 包地址,会导致函数修饰权限问题而不能运行,这里大家在运行调试的时候注意即可。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter: https://twitter.com/smartloli 
QQ群(Hadoop - 交流社区1): 424769183 
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢! 

热爱生活,享受编程,与君共勉!



本文转自哥不是小萝莉博客园博客,原文链接:http://www.cnblogs.com/smartloli/,如需转载请自行联系原作者

相关文章
|
2月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
128 9
|
3月前
|
存储 分布式计算 资源调度
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
97 5
|
3月前
|
资源调度 数据可视化 大数据
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(二)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(二)
42 4
|
3月前
|
XML 分布式计算 资源调度
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
208 5
|
3月前
|
XML 资源调度 网络协议
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(二)
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(二)
179 4
|
3月前
|
分布式计算 资源调度 Hadoop
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
103 4
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop YARN资源管理-容量调度器(Yahoo!的Capacity Scheduler)
详细讲解了Hadoop YARN资源管理中的容量调度器(Yahoo!的Capacity Scheduler),包括队列和子队列的概念、Apache Hadoop的容量调度器默认队列、队列的命名规则、分层队列、容量保证、队列弹性、容量调度器的元素、集群如何分配资源、限制用户容量、限制应用程序数量、抢占申请、启用容量调度器以及队列状态管理等方面的内容。
101 3
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop YARN资源管理-公平调度器(Fackbook的Fair Scheduler)
详细介绍了Hadoop YARN资源管理中的公平调度器(Fair Scheduler),包括其概述、配置、队列结构、以及如何将作业提交到指定队列,展示了公平调度器如何通过分配文件(fair-scheduler.xml)来控制资源分配,并提供了配置示例和如何通过命令行提交作业到特定队列的方法。
227 0
Hadoop YARN资源管理-公平调度器(Fackbook的Fair Scheduler)
|
5月前
|
图形学 数据可视化 开发者
超实用Unity Shader Graph教程:从零开始打造令人惊叹的游戏视觉特效,让你的作品瞬间高大上,附带示例代码与详细步骤解析!
【8月更文挑战第31天】Unity Shader Graph 是 Unity 引擎中的强大工具,通过可视化编程帮助开发者轻松创建复杂且炫酷的视觉效果。本文将指导你使用 Shader Graph 实现三种效果:彩虹色渐变着色器、动态光效和水波纹效果。首先确保安装最新版 Unity 并启用 Shader Graph。创建新材质和着色器图谱后,利用节点库中的预定义节点,在编辑区连接节点定义着色器行为。
367 0
|
5月前
|
图形学 C# 开发者
Unity粒子系统全解析:从基础设置到高级编程技巧,教你轻松玩转绚丽多彩的视觉特效,打造震撼游戏画面的终极指南
【8月更文挑战第31天】粒子系统是Unity引擎的强大功能,可创建动态视觉效果,如火焰、爆炸等。本文介绍如何在Unity中使用粒子系统,并提供示例代码。首先创建粒子系统,然后调整Emission、Shape、Color over Lifetime等模块参数,实现所需效果。此外,还可通过C#脚本实现更复杂的粒子效果,增强游戏视觉冲击力和沉浸感。
336 0