Drill-on-YARN之源码解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: ## 1. 概要 前面介绍了如何把Drill部署在YARN上,然后通过Drill-on-YARN客户端,你可以启动、停止、调整、清零命令操作Drill。但是在这么命令背后,到底是如何执行的呢,下面会对Drill-on-YARN的源码进行详细的解析,重点解析启动过程,其他命令简单介绍。 **说明:下面涉及到的代码,以drill 1.14.0为准,并且为了减少篇幅,进行了删减。** ##

1. 概要

前面介绍了如何把Drill部署在YARN上,然后通过Drill-on-YARN客户端,你可以启动、停止、调整、清零命令操作Drill。但是在这么命令背后,到底是如何执行的呢,下面会对Drill-on-YARN的源码进行详细的解析,重点解析启动过程,其他命令简单介绍。

说明:下面涉及到的代码,以drill 1.14.0为准,并且为了减少篇幅,进行了删减。

2. Drill-on-YARN start

2.1 drill-on-yarn.sh

通过查看drill-on-yarn.sh脚本,很容易发现最终执行的java类是CLIENT_CMD="$JAVA $VM_OPTS -cp $CP org.apache.drill.yarn.client.DrillOnYarn ${args[@]}" org.apache.drill.yarn.client.DrillOnYarn便是启动Drill-on-YARN的入口。我们可以总览一下这个类:

public class DrillOnYarn {
  public static void main(String argv[]) {
    BasicConfigurator.configure();
    ClientContext.init();
    run(argv);
  }
  public static void run(String argv[]) {
    ClientContext context = ClientContext.instance();
    CommandLineOptions opts = new CommandLineOptions();
    if (!opts.parse(argv)) {
      opts.usage();
      context.exit(-1);
    }
    if (opts.getCommand() == null) {
      opts.usage();
      context.exit(-1);
    }
    try {
      DrillOnYarnConfig.load().setClientPaths();
    } catch (DoyConfigException e) {
      ClientContext.err.println(e.getMessage());
      context.exit(-1);
    }
    ClientCommand cmd;
    switch (opts.getCommand()) {
    case UPLOAD:
      cmd = new StartCommand(true, false);
      break;
    case START:
      cmd = new StartCommand(true, true);
      break;
    case DESCRIBE:
      cmd = new PrintConfigCommand();
      break;
    case STATUS:
      cmd = new StatusCommand();
      break;
    case STOP:
      cmd = new StopCommand();
      break;
    case CLEAN:
      cmd = new CleanCommand();
      break;
    case RESIZE:
      cmd = new ResizeCommand();
      break;
    default:
      cmd = new HelpCommand();
    }
    cmd.setOpts(opts);
    try {
      cmd.run();
    } catch (ClientException e) {
      displayError(opts, e);
      context.exit(1);
    }
  }
}

可以看到入口main方法,其中最关键的便是run方法,包含了很多的命令,我们重点看start命令,代码如下:

public void run() throws ClientException {
    checkExistingApp();

    dryRun = opts.dryRun;
    config = DrillOnYarnConfig.config();
    FileUploader uploader = upload();
    if (launch) {
      launch(uploader);
    }
}

概括的来说,它主要包含以下流程:

  1. 检查application是否已经存在,如果已经存在,便不允许启动,否则执行启动操作(此处检查的application是YARN的application,启动成功会将YARN的applicationId写入本地磁盘的一个文件,通过此文件来检查)。
  2. 上传Drill二方包和site目录下的内容至DFS上,其中site目录下的内容会被打包为site.tar.gz

    public void run() throws ClientException {
      setup();
      uploadDrillArchive();
      if (hasSiteDir()) {
        uploadSite();
      }
    }
  3. 启动ApplicationMaster,主要流程为:

    • 创建YARN客户端,并启动

      // AMRunner#connectToYarn
      private void connectToYarn() {
          System.out.print("Loading YARN Config...");
          client = new YarnRMClient();
          System.out.println(" Loaded.");
      }
    • 创建ApplicationMaster

      // AMRunner#createApp
      private void createApp() throws ClientException {
          try {
            appResponse = client.createAppMaster();
          } catch (YarnClientException e) {
            throw new ClientException("Failed to allocate Drill application master",
                e);
          }
          appId = appResponse.getApplicationId();
          System.out.println("Application ID: " + appId.toString());
      }
    • 设置ApplicationMaster上下文,包括:Heap memory、Class Path、启动的命令(dirll-am.sh)、启动am容器使用的资源(memory、vCores、disks)
    • 校验资源,主要是ApplicationMaster使用资源是否超过了YARN的设置
    • 提交ApplicationMaster

      private void launchApp(AppSpec master) throws ClientException {
          try {
            client.submitAppMaster(master);
          } catch (YarnClientException e) {
            throw new ClientException("Failed to start Drill application master", e);
          }
      }
    • 等待启动,并打印启动日志
    • 将ApplicationMaster的appid写入文件(在第1步,检测Application是否存在,就是使用这个文件)

ApplicationMaster启动后,会向RM申请资源,启动Drillbits,下面详细介绍ApplicationMaster启动后的操作

2.2 drill-am.sh

通过查看drill-am.sh脚本,很容易发现最终执行的java类是AMCMD="$JAVA $AM_JAVA_OPTS ${args[@]} -cp $CP org.apache.drill.yarn.appMaster.DrillApplicationMaster"org.apache.drill.yarn.appMaster.DrillApplicationMaste表示ApplicationMaster执行的入口,下面总览一下这个类:

public class DrillApplicationMaster {
  public static void main(String[] args) {
    LOG.trace("Drill Application Master starting.");
    try {
      DrillOnYarnConfig.load().setAmDrillHome();
    } catch (DoyConfigException e) {
      System.err.println(e.getMessage());
      System.exit(-1);
    }
    Dispatcher dispatcher;
    try {
      dispatcher = (new DrillControllerFactory()).build();
    } catch (ControllerFactoryException e) {
      LOG.error("Setup failed, exiting: " + e.getMessage(), e);
      System.exit(-1);
      return;
    }
    try {
      if (!dispatcher.start()) {
        return;
      }
    } catch (Throwable e) {
      LOG.error("Fatal error, exiting: " + e.getMessage(), e);
      System.exit(-1);
    }
    WebServer webServer = new WebServer(dispatcher);
    try {
      webServer.start();
    } catch (Exception e) {
      LOG.error("Web server setup failed, exiting: " + e.getMessage(), e);
      System.exit(-1);
    }
    try {
      dispatcher.run();
    } catch (Throwable e) {
      LOG.error("Fatal error, exiting: " + e.getMessage(), e);
      System.exit(-1);
    } finally {
      try {
        webServer.close();
      } catch (Exception e) {
      }
    }
  }
}

概况的来说,它主要包含以下流程:

  1. 加载Drill-on-YARN的配置,并设置AM的DirllHome,比如/home/admin/tmp2/hadoop/nm-local-dir/usercache/admin/appcache/application_1534698866098_0022/container_1534698866098_0022_01_000001/drill/apache-drill-1.14.0
  2. 构造Dispatcher,Dispatcher用于分配YARN、timer、ZooKeeper事件给给集群控制器,它是轻量级多线程的,用于响应RM、NM、timer线程的事件,对于某一个事件,它是连续的,所以需要同步,但是不同类型的事件不需要同步。整个的构造流程如下:

    • 准备资源,包括:drill二方包、site压缩包的目录

      private Map<String, LocalResource> prepareResources() {
          ...
          drillArchivePath = drillConfig.getDrillArchiveDfsPath();
          siteArchivePath = drillConfig.getSiteArchiveDfsPath();
          ...
      }
    • 定义任务启动的规格(TaskSpec),包括:运行时环境、YARN container的规格、dirllbit的规格

      private TaskSpec buildDrillTaskSpec(Map<String, LocalResource> resources) throws DoyConfigException {
          ...
          ContainerRequestSpec containerSpec = new ContainerRequestSpec();
          containerSpec.memoryMb = config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY);
          ...
          LaunchSpec drillbitSpec = new LaunchSpec();
          ...
          TaskSpec taskSpec = new TaskSpec();
          taskSpec.name = "Drillbit";
          taskSpec.containerSpec = containerSpec;
          taskSpec.launchSpec = drillbitSpec;
      }
    • 设置Dispatcher的控制器:实现类为ClusterControllerImpl,它主要通过状态来控制Drill集群、调整整个集群的任务(Drill启动、停止等任务)、处理container的回调

      public void setYarn(AMYarnFacade yarn) throws YarnFacadeException {
              this.yarn = yarn;
              controller = new ClusterControllerImpl(yarn);
          }
    • 为控制器注册Scheduler,比如DrillbitScheduler,此外Scheduler配置来源于之前drill-on-yarn.conf

      cluster: [
          {
            name: "drill-group1"
            type: "basic"
            count: 1
          }
      ]
      ...
      ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, 0);
      Scheduler testGroup = new DrillbitScheduler(pool.getName(), taskSpec,
      pool.getCount(), requestTimeoutSecs, maxExtraNodes);
      dispatcher.getController().registerScheduler(testGroup);
      ...
    • 创建ZooKeeper集群协调器

      String zkConnect = config.getString(DrillOnYarnConfig.ZK_CONNECT);
      String zkRoot = config.getString(DrillOnYarnConfig.ZK_ROOT);
      String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID);
  3. 启动Dispatcher,主要启动AMRMClientAsync、NMClientAsync、YarnClient

    ...
    yarn.start(new ResourceCallback(), new NodeCallback());
    String url = trackingUrl.replace("<port>", Integer.toString(httpPort));
    if (DrillOnYarnConfig.config().getBoolean(DrillOnYarnConfig.HTTP_ENABLE_SSL)) {
      url = url.replace("http:", "https:");
    }
    yarn.register(url);
    controller.started();
    ...
    ...
    resourceMgr = AMRMClientAsync.createAMRMClientAsync(pollPeriodMs, resourceCallback);
    resourceMgr.init(conf);
    resourceMgr.start();
    ...
    nodeMgr = NMClientAsync.createNMClientAsync(nodeCallback);
    nodeMgr.init(conf);
    nodeMgr.start();
    ...
    client = YarnClient.createYarnClient();
    client.init(conf);
    client.start();
    ...
  4. 启动dirll运维界面

    WebServer webServer = new WebServer(dispatcher);
    webServer.start();
  5. 运行Dispatcher,主要是启动一个线程,此线程会不断的轮询当前的任务队列中的任务情况,比如启动、停止、resize等类型的任务,然后执行相应的动作,拿启动来说

    • 添加一个启动任务,然后放入pendingTask队列中

      if (state == State.LIVE) {
        adjustTasks(curTime);
        requestContainers();
      }
    • 向RM请求container:创建一个ContainerRequest

      ContainerRequest request = containerSpec.makeRequest();
      resourceMgr.addContainerRequest(containerSpec.makeRequest());
      return request;
    • ResourceCallback监听container分配,然后启动container

      private class ResourceCallback implements AMRMClientAsync.CallbackHandler {
          @Override
          public void onContainersAllocated(List<Container> containers) {
            controller.containersAllocated(containers);
          }
      }
      public void containerAllocated(EventContext context, Container container) {
        Task task = context.task;
        LOG.info(task.getLabel() + " - Received container: "
            + DoYUtil.describeContainer(container));
        context.group.dequeueAllocatingTask(task);
      
        // No matter what happens below, we don't want to ask for this
        // container again. The RM async API is a bit bizarre in this
        // regard: it will keep asking for container over and over until
        // we tell it to stop.
      
        context.yarn.removeContainerRequest(task.containerRequest);
      
        // The container is need both in the normal and in the cancellation
        // path, so set it here.
      
        task.container = container;
        if (task.cancelled) {
          context.yarn.releaseContainer(container);
          taskStartFailed(context, Disposition.CANCELLED);
          return;
        }
        task.error = null;
        task.completionStatus = null;
        transition(context, LAUNCHING);
      
        // The pool that manages this task wants to know that we have
        // a container. The task manager may want to do some task-
        // specific setup.
      
        context.group.containerAllocated(context.task);
        context.getTaskManager().allocated(context);
      
        // Go ahead and launch a task in the container using the launch
        // specification provided by the task group (pool).
      
        try {
          context.yarn.launchContainer(container, task.getLaunchSpec());
          task.launchTime = System.currentTimeMillis();
        } catch (YarnFacadeException e) {
          LOG.error("Container launch failed: " + task.getContainerId(), e);
      
          // This may not be the right response. RM may still think
          // we have the container if the above is a local failure.
      
          task.error = e;
          context.group.containerReleased(task);
          task.container = null;
          taskStartFailed(context, Disposition.LAUNCH_FAILED);
        }
      }
    • NodeCallback监听container启动

      public class NodeCallback implements NMClientAsync.CallbackHandler {
          @Override
          public void onStartContainerError(ContainerId containerId, Throwable t) {
            controller.taskStartFailed(containerId, t);
          }
      
          @Override
          public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
            controller.containerStarted(containerId);
          }
      
          @Override
          public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
          }
      
          @Override
          public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
          }
      
          @Override
          public void onStopContainerError(ContainerId containerId, Throwable t) {
            controller.stopTaskFailed(containerId, t);
          }
      
          @Override
          public void onContainerStopped(ContainerId containerId) {
            controller.containerStopped(containerId);
          }
      }

2.3 fail over

Drill-on-YARN除了提供start、stop、resize功能外,还提供了fail over功能,当前某个drillbit挂掉后,Drill-on-YARN会尝试再次启动drillbit,目前重试的次数为2。此外,如果一个drillbit所在的节点频繁挂掉,会被列入黑名单。

我们可以通过手动kill drillbit来模拟drillbit挂掉的情况,然后等待一会儿,可以看到,drillbit进程重新启动了。下面我们看看,代码的执行流程

  1. drillbit挂掉,container结束
private class ResourceCallback implements AMRMClientAsync.CallbackHandler {
    @Override
    public void onContainersCompleted(List<ContainerStatus> statuses) {
      controller.containersCompleted(statuses);
    }
}
  1. retry task:重新将这个task加入pendingTasks,然后轮询的线程检测到pendingTasks不为空,执行启动操作
protected void taskTerminated(EventContext context) {
    Task task = context.task;
    context.getTaskManager().completed(context);
    context.group.containerReleased(task);
    assert task.completionStatus != null;
    // container结束的状态不是0,说明不是正常结束
    if (task.completionStatus.getExitStatus() == 0) {
      taskEnded(context, Disposition.COMPLETED);
      context.group.taskEnded(context.task);
    } else {
      taskEnded(context, Disposition.RUN_FAILED);
      retryTask(context);
    }
}
private void retryTask(EventContext context) {
    Task task = context.task;
    assert task.state == END;
    if (!context.controller.isLive() || !task.retryable()) {
      context.group.taskEnded(task);
      return;
    }
    if (task.tryCount > task.taskGroup.getMaxRetries()) {
      LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount);
      task.disposition = Disposition.TOO_MANY_RETRIES;
      context.group.taskEnded(task);
      return;
    }
    LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount);
    context.group.taskRetried(task);
    task.reset();
    transition(context, START);
    context.group.enqueuePendingRequest(task);
}

3. 停止

除了前面详情介绍的start命令外,Drill-on-YARN也提供了stop命令,其中stop分两种:

  1. 强制停止:直接调用yarn客户端的killApplication api yarnClient.killApplication(appId);
  2. 优雅停止:先清理所有的任务,包括pending、running的,然后调用yarn的api杀死容器,关闭controller,然后通知am运行结束
...
for (Task task : getStartingTasks()) {
  context.setTask(task);
  context.getState().cancel(context);
}
for (Task task : getActiveTasks()) {
  context.setTask(task);
  context.getState().cancel(context);
}
...
...
context.yarn.killContainer(task.container);
...
public void run() throws YarnFacadeException {
    ...
    boolean success = controller.waitForCompletion();
    ...
    ...
    finish(success, null);
    ...
  }
public boolean waitForCompletion() {
    start();
    synchronized (completionMutex) {
      try {
        completionMutex.wait();
      } catch (InterruptedException e) {
        
      }
    }
    return succeeded();
}
public void finish(boolean succeeded, String msg) throws YarnFacadeException {
    nodeMgr.stop();
    String appMsg = "Drill Cluster Shut-Down";
    FinalApplicationStatus status = FinalApplicationStatus.SUCCEEDED;
    if (!succeeded) {
      appMsg = "Drill Cluster Fatal Error - check logs";
      status = FinalApplicationStatus.FAILED;
    }
    if (msg != null) {
      appMsg = msg;
    }
    try {
      resourceMgr.unregisterApplicationMaster(status, appMsg, "");
    } catch (YarnException | IOException e) {
      throw new YarnFacadeException("Deregister AM failed", e);
    }
    resourceMgr.stop();
  }

4. resize

resize流程为:调整quantity(保留多少个container),之后轮询线程会根据quantity,调整任务,执行resize操作

public int resize(int level) {
    int limit = quantity + state.getController().getFreeNodeCount() +maxExtraNodes;
    return super.resize( Math.min( limit, level ) );
}

5. 总结

总的来说,Drill-on-YARN分为两大模块,drill-on-yarn.sh和drill-am.sh。drill-on-yarn.sh用于启动ApplicationMaster,drill-am.sh用于向ResourceManager申请资源并启动Drill集群。其中Drill的启动、停止、缩容、扩容,都被封装为一个任务,在执行这些命令时,会构建一个任务,放入任务队列中。有一个线程会一直轮询此队列,根据队列中的任务执行不同的操作,从而达到启动、停止、缩容、扩容Drill集群的功能。此外,相比独立部署,Drill-on-YARN提供的failover功能强化了Drill的稳定性。

相关文章
|
8天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
28 2
|
8天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
21天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
39 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
56 5
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
113 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
算法 Java 程序员
Map - TreeSet & TreeMap 源码解析
Map - TreeSet & TreeMap 源码解析
34 0
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
68 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
57 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
62 0

推荐镜像

更多
下一篇
无影云桌面