Apache Zeppelin系列教程第六篇——Zengine调用Interpreter原理分析

简介: Apache Zeppelin系列教程第六篇——Zengine调用Interpreter原理分析

前文介绍jdbc interpreter和interpreter模块交互代码,本篇文章主要分析Zengine调用Interpreter模块代码。

介绍完这篇文章之后,我们即可将paragraph run的流程串起来(后面会将整个流程进行串讲)

同样,来看下这个测试类

zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java

  @Test
  public void testFIFOScheduler() throws InterruptedException, InterpreterException {
    LOGGER.info("===testFIFOScheduler====");
    interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
    // by default SleepInterpreter would use FIFOScheduler
    LOGGER.info("===getInterpreter====");
    final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", note1Id, "sleep");
    LOGGER.info("===createDummyInterpreterContext====");
    final InterpreterContext context1 = createDummyInterpreterContext();
    // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
    // time overhead of launching the process.
    LOGGER.info("111");
    LOGGER.info("=====name:{}=======",interpreter1.getClassName());
    System.out.println(interpreter1.getClassName());
    interpreter1.interpret("10101", context1);
    LOGGER.info("222");
    Thread thread1 = new Thread() {
      @Override
      public void run() {
        try {
          assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
        } catch (InterpreterException e) {
          e.printStackTrace();
          fail();
        }
      }
    };
    Thread thread2 = new Thread() {
      @Override
      public void run() {
        try {
          assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
        } catch (InterpreterException e) {
          e.printStackTrace();
          fail();
        }
      }
    };
    long start = System.currentTimeMillis();
    thread1.start();
    thread2.start();
    thread1.join();
    thread2.join();
    long end = System.currentTimeMillis();
    assertTrue((end - start) >= 200);
  }

可以看下这个测试方法,这边加了一些日志

RemoteInterpreterTest 继承 AbstractInterpreterTest 里面的抽象类,会先执行setUp方法对读取配置文件信息interpreter 进行初始化

核心主要是执行RemoteInterpreter里面的 interpret 方法,

  @Override
  public InterpreterResult interpret(final String st, final InterpreterContext context)
      throws InterpreterException {
    LOGGER.info("st:\n{}", st);
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("st:\n{}", st);
    }
    final FormType form = getFormType();
    RemoteInterpreterProcess interpreterProcess = null;
    try {
      interpreterProcess = getOrCreateInterpreterProcess();
    } catch (IOException e) {
      throw new InterpreterException(e);
    }
    if (!interpreterProcess.isRunning()) {
      return new InterpreterResult(InterpreterResult.Code.ERROR,
              "Interpreter process is not running\n" + interpreterProcess.getErrorMessage());
    }
    return interpreterProcess.callRemoteFunction(client -> {
          RemoteInterpreterResult remoteResult = client.interpret(
              sessionId, className, st, convert(context));
          Map<String, Object> remoteConfig = (Map<String, Object>) GSON.fromJson(
              remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
              }.getType());
          context.getConfig().clear();
          if (remoteConfig != null) {
            context.getConfig().putAll(remoteConfig);
          }
          GUI currentGUI = context.getGui();
          GUI currentNoteGUI = context.getNoteGui();
          if (form == FormType.NATIVE) {
            GUI remoteGui = GUI.fromJson(remoteResult.getGui());
            GUI remoteNoteGui = GUI.fromJson(remoteResult.getNoteGui());
            currentGUI.clear();
            currentGUI.setParams(remoteGui.getParams());
            currentGUI.setForms(remoteGui.getForms());
            currentNoteGUI.setParams(remoteNoteGui.getParams());
            currentNoteGUI.setForms(remoteNoteGui.getForms());
          } else if (form == FormType.SIMPLE) {
            final Map<String, Input> currentForms = currentGUI.getForms();
            final Map<String, Object> currentParams = currentGUI.getParams();
            final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
            final Map<String, Input> remoteForms = remoteGUI.getForms();
            final Map<String, Object> remoteParams = remoteGUI.getParams();
            currentForms.putAll(remoteForms);
            currentParams.putAll(remoteParams);
          }
          return convert(remoteResult);
        }
    );
  }

其中getOrCreateInterpreterProcess()一路点下去 最终是去调用zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java 里面的start 方法,通过 commons-exec命令执行shell 或者cmd 脚本(bin/interpreter.sh) 启动一个独立的进程,shell 脚本里面具体执行的类(org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer),和前一篇文章interpreter 原理分析相呼应

  @Override
  public void start(String userName) throws IOException {
    // start server process
    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
    cmdLine.addArgument("-d", false);
    cmdLine.addArgument(getInterpreterDir(), false);
    cmdLine.addArgument("-c", false);
    cmdLine.addArgument(getIntpEventServerHost(), false);
    cmdLine.addArgument("-p", false);
    cmdLine.addArgument(String.valueOf(intpEventServerPort), false);
    cmdLine.addArgument("-r", false);
    cmdLine.addArgument(getInterpreterPortRange(), false);
    cmdLine.addArgument("-i", false);
    cmdLine.addArgument(getInterpreterGroupId(), false);
    if (isUserImpersonated() && !userName.equals("anonymous")) {
      cmdLine.addArgument("-u", false);
      cmdLine.addArgument(userName, false);
    }
    cmdLine.addArgument("-l", false);
    cmdLine.addArgument(getLocalRepoDir(), false);
    cmdLine.addArgument("-g", false);
    cmdLine.addArgument(getInterpreterSettingName(), false);
    interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, getEnv());
    interpreterProcessLauncher.launch();
    interpreterProcessLauncher.waitForReady(getConnectTimeout());
    if (interpreterProcessLauncher.isLaunchTimeout()) {
      throw new IOException(
          String.format("Interpreter Process creation is time out in %d seconds", getConnectTimeout() / 1000) + "\n"
              + "You can increase timeout threshold via "
              + "setting zeppelin.interpreter.connect.timeout of this interpreter.\n"
              + interpreterProcessLauncher.getErrorMessage());
    }
    if (!interpreterProcessLauncher.isRunning()) {
      throw new IOException("Fail to launch interpreter process:\n" + interpreterProcessLauncher.getErrorMessage());
    }
    if (isHadoopClientAvailable()) {
      String launchOutput = interpreterProcessLauncher.getProcessLaunchOutput();
      Matcher m = YARN_APP_PATTER.matcher(launchOutput);
      if (m.find()) {
        String appId = m.group(1);
        LOGGER.info("Detected yarn app: {}, add it to YarnAppMonitor", appId);
        YarnAppMonitor.get().addYarnApp(ConverterUtils.toApplicationId(appId), this);
      }
    }
  }

而实际调用thrift server 端服务的client 端代码

上述图片是代码运行的log,可以帮助我们定位代码的运行顺序

({FIFO-RemoteInterpreter-python-shared_process-shared_session-1} ProcessLauncher.java[launch]:96) - Process is launched: [.\\bin\interpreter.cmd, -d, ./\interpreter/python, -c, 10.4.144.223, -p, 52945, -r, :, -i, python-shared_process, -l, ./\local-repo\python, -g, python]
({FIFO-RemoteInterpreter-md-shared_process-shared_session-1} ProcessLauncher.java[launch]:96) - Process is launched: [.\\bin\interpreter.cmd, -d, ./\interpreter/md, -c, 10.4.144.223, -p, 52945, -r, :, -i, md-shared_process, -l, ./\local-repo\md, -g, md]
({FIFO-RemoteInterpreter-jdbc-shared_process-shared_session-1} ProcessLauncher.java[launch]:96) - Process is launched: [.\\bin\interpreter.cmd, -d, ./\interpreter/jdbc, -c, 10.4.144.223, -p, 52945, -r, :, -i, jdbc-shared_process, -l, ./\local-repo\jdbc, -g, jdbc]

参考

程序员的福音 - Apache Commons Exec - 知乎

Apache Thrift系列详解(一) - 概述与入门 - 掘金


相关文章
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
188 2
|
1天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
2月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
25天前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
44 3
|
25天前
|
消息中间件 分布式计算 druid
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
27 2
|
25天前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
28 2
|
25天前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
47 1
|
28天前
|
负载均衡 应用服务中间件 Apache
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
32 3
|
29天前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
56 0
|
2月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
131 11

推荐镜像

更多