Apache Zeppelin系列教程第五篇——Interpreter原理分析

简介: Apache Zeppelin系列教程第五篇——Interpreter原理分析

Apache Zeppelin系列教程第四篇——JDBCInterpreter原理分析

以JDBCInterpreter为例讲解了实际jdbc的执行过程。下面是一个整体的架构图

其实就是web 向server 发送请求,然后调用zengine,再到interpreter,最后到实际的执行模块,比如上文中介绍的JDBCInterpreter

本篇文章重点分析下Interpreter模块,重点来看下测试类

zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java

@Test
  public void testInterpreter2() throws Exception {
    final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
            RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
    server.init(new HashMap<>());
    server.intpEventClient = mock(RemoteInterpreterEventClient.class);
    Map<String, String> intpProperties = new HashMap<>();
    intpProperties.put("property_1", "value_1");
    intpProperties.put("zeppelin.interpreter.localRepo", "/tmp");
    // create Test1Interpreter in session_1
    server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
            intpProperties, "user_1");
    Test1Interpreter interpreter1 = (Test1Interpreter)
            ((LazyOpenInterpreter) server.getInterpreterGroup().get("session_1").get(0))
                    .getInnerInterpreter();
    assertEquals(1, server.getInterpreterGroup().getSessionNum());
    assertEquals(1, server.getInterpreterGroup().get("session_1").size());
    assertEquals(2, interpreter1.getProperties().size());
    assertEquals("value_1", interpreter1.getProperty("property_1"));
    // create Test2Interpreter in session_1
    server.createInterpreter("group_1", "session_1", Test2Interpreter.class.getName(),
            intpProperties, "user_1");
    assertEquals(2, server.getInterpreterGroup().get("session_1").size());
    final RemoteInterpreterContext intpContext = new RemoteInterpreterContext();
    intpContext.setNoteId("note_1");
    intpContext.setParagraphId("paragraph_1");
    intpContext.setGui("{}");
    intpContext.setNoteGui("{}");
    intpContext.setLocalProperties(new HashMap<>());
    // single output of SUCCESS
    RemoteInterpreterResult result = server.interpret("session_1", Test2Interpreter.class.getName(),
            "COMBO_OUTPUT_SUCCESS", intpContext);
    System.out.println(new Gson().toJson(result));
    //List<InterpreterResultMessage> resultMessages = intpContext.out.toInterpreterResultMessage();
    //System.out.println(new Gson().toJson(resultMessages));
    /*assertEquals("SUCCESS", result.code);
    assertEquals(2, result.getMsg().size());
    assertEquals("INTERPRETER_OUT", result.getMsg().get(0).getData());
    assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(1).getData());*/
  }

这边简单修改了这个测试类的代码

createInterpreter 是采用反射的方式构建进行实例化Interpreter,核心代码如下:

      Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
      Properties p = new Properties();
      p.putAll(properties);
      setSystemProperty(p);
      Constructor<Interpreter> constructor =
              replClass.getConstructor(new Class[]{Properties.class});
      Interpreter interpreter = constructor.newInstance(p);
      interpreter.setClassloaderUrls(new URL[]{});
      interpreter.setInterpreterGroup(interpreterGroup);
      interpreter.setUserName(userName);
      interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId);

interpreter 方法,就是实际执行具体的interpreter了,通过放入队列然后去执行job,最终实际执行代码InterpretJob 里面的jobRun()方法

public InterpreterResult jobRun() throws Throwable {
      ClassLoader currentThreadContextClassloader = Thread.currentThread().getContextClassLoader();
      try {
        InterpreterContext.set(context);
        // clear the result of last run in frontend before running this paragraph.
        context.out.clear();
        InterpreterResult result = null;
        // Open the interpreter instance prior to calling interpret().
        // This is necessary because the earliest we can register a hook
        // is from within the open() method.
        LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter;
        if (!lazy.isOpen()) {
          lazy.open();
          result = lazy.executePrecode(context);
        }
        if (result == null || result.code() == Code.SUCCESS) {
          // Add hooks to script from registry.
          // note scope first, followed by global scope.
          // Here's the code after hooking:
          //     global_pre_hook
          //     note_pre_hook
          //     script
          //     note_post_hook
          //     global_post_hook
          processInterpreterHooks(context.getNoteId());
          processInterpreterHooks(null);
          LOGGER.debug("Script after hooks: {}", script);
          result = interpreter.interpret(script, context);
        }
        // data from context.out is prepended to InterpreterResult if both defined
        context.out.flush();
        List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
        for (InterpreterResultMessage resultMessage : result.message()) {
          // only add non-empty InterpreterResultMessage
          if (!StringUtils.isBlank(resultMessage.getData())) {
            resultMessages.add(resultMessage);
          }
        }
        List<String> stringResult = new ArrayList<>();
        for (InterpreterResultMessage msg : resultMessages) {
          if (msg.getType() == InterpreterResult.Type.IMG) {
            LOGGER.debug("InterpreterResultMessage: IMAGE_DATA");
          } else {
            LOGGER.debug("InterpreterResultMessage: {}", msg);
          }
          stringResult.add(msg.getData());
        }
        // put result into resource pool
        if (context.getLocalProperties().containsKey("saveAs")) {
          if (stringResult.size() == 1) {
            LOGGER.info("Saving result into ResourcePool as single string: " +
                    context.getLocalProperties().get("saveAs"));
            context.getResourcePool().put(
                    context.getLocalProperties().get("saveAs"), stringResult.get(0));
          } else {
            LOGGER.info("Saving result into ResourcePool as string list: " +
                    context.getLocalProperties().get("saveAs"));
            context.getResourcePool().put(
                    context.getLocalProperties().get("saveAs"), stringResult);
          }
        }
        return new InterpreterResult(result.code(), resultMessages);
      } catch (Throwable e) {
        return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
      } finally {
        Thread.currentThread().setContextClassLoader(currentThreadContextClassloader);
        InterpreterContext.remove();
      }
    }

至此这个代码已经和上一篇文章的jdbc Interpreter代码呼应了,也就是Interpreter 执行具体的jdbc Interpreter 的过程

RemoteInterpreterServer里面的main 方法启动线程实际在run方法 里面启动thrift server 端服务

public static void main(String[] args) throws Exception {
    String zeppelinServerHost = null;
    int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
    String portRange = ":";
    String interpreterGroupId = null;
    if (args.length > 0) {
      zeppelinServerHost = args[0];
      port = Integer.parseInt(args[1]);
      interpreterGroupId = args[2];
      if (args.length > 3) {
        portRange = args[3];
      }
    }
    RemoteInterpreterServer remoteInterpreterServer =
        new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange);
    remoteInterpreterServer.start();
    /*
     * Registration of a ShutdownHook in case of an unpredictable system call
     * Examples: STRG+C, SIGTERM via kill
     */
    shutdownThread = remoteInterpreterServer.new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_HOOK);
    Runtime.getRuntime().addShutdownHook(shutdownThread);
    remoteInterpreterServer.join();
    LOGGER.info("RemoteInterpreterServer thread is finished");
    /* TODO(pdallig): Remove System.exit(0) if the thrift server can be shut down successfully.
     * https://github.com/apache/thrift/commit/9cb1c794cd39cfb276771f8e52f0306eb8d462fd
     * should be part of the next release and solve the problem.
     * We may have other threads that are not terminated successfully.
     */
    if (remoteInterpreterServer.isForceShutdown) {
      LOGGER.info("Force shutting down");
      System.exit(0);
    }
  }
  @Override
  public void run() {
    RemoteInterpreterService.Processor<RemoteInterpreterServer> processor =
      new RemoteInterpreterService.Processor<>(this);
    try (TServerSocket tSocket = new TServerSocket(port)){
      server = new TThreadPoolServer(
      new TThreadPoolServer.Args(tSocket)
        .stopTimeoutVal(DEFAULT_SHUTDOWN_TIMEOUT)
        .stopTimeoutUnit(TimeUnit.MILLISECONDS)
        .processor(processor));
      if (null != intpEventServerHost && !isTest) {
        Thread registerThread = new Thread(new RegisterRunnable());
        registerThread.setName("RegisterThread");
        registerThread.start();
      }
      LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port);
      server.serve();
    } catch (TTransportException e) {
      LOGGER.error("Failure in TTransport", e);
    }
    LOGGER.info("RemoteInterpreterServer-Thread finished");
  }

参考:

mock 使用:https://zhuanlan.zhihu.com/p/51673406

Apache Thrift系列详解(一) - 概述与入门 - 掘金


相关文章
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
217 2
|
13天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
14天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
2月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
1月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
52 3
|
1月前
|
消息中间件 分布式计算 druid
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
36 2
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
32 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
52 1
|
1月前
|
负载均衡 应用服务中间件 Apache
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
37 3
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
72 0

相关实验场景

更多

推荐镜像

更多