在Apache Zeppelin系列教程第四篇——JDBCInterpreter原理分析
以JDBCInterpreter为例讲解了实际jdbc的执行过程。下面是一个整体的架构图,
其实就是web 向server 发送请求,然后调用zengine,再到interpreter,最后到实际的执行模块,比如上文中介绍的JDBCInterpreter
本篇文章重点分析下Interpreter模块,重点来看下测试类
zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@Test public void testInterpreter2() throws Exception { final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true); server.init(new HashMap<>()); server.intpEventClient = mock(RemoteInterpreterEventClient.class); Map<String, String> intpProperties = new HashMap<>(); intpProperties.put("property_1", "value_1"); intpProperties.put("zeppelin.interpreter.localRepo", "/tmp"); // create Test1Interpreter in session_1 server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(), intpProperties, "user_1"); Test1Interpreter interpreter1 = (Test1Interpreter) ((LazyOpenInterpreter) server.getInterpreterGroup().get("session_1").get(0)) .getInnerInterpreter(); assertEquals(1, server.getInterpreterGroup().getSessionNum()); assertEquals(1, server.getInterpreterGroup().get("session_1").size()); assertEquals(2, interpreter1.getProperties().size()); assertEquals("value_1", interpreter1.getProperty("property_1")); // create Test2Interpreter in session_1 server.createInterpreter("group_1", "session_1", Test2Interpreter.class.getName(), intpProperties, "user_1"); assertEquals(2, server.getInterpreterGroup().get("session_1").size()); final RemoteInterpreterContext intpContext = new RemoteInterpreterContext(); intpContext.setNoteId("note_1"); intpContext.setParagraphId("paragraph_1"); intpContext.setGui("{}"); intpContext.setNoteGui("{}"); intpContext.setLocalProperties(new HashMap<>()); // single output of SUCCESS RemoteInterpreterResult result = server.interpret("session_1", Test2Interpreter.class.getName(), "COMBO_OUTPUT_SUCCESS", intpContext); System.out.println(new Gson().toJson(result)); //List<InterpreterResultMessage> resultMessages = intpContext.out.toInterpreterResultMessage(); //System.out.println(new Gson().toJson(resultMessages)); /*assertEquals("SUCCESS", result.code); assertEquals(2, result.getMsg().size()); assertEquals("INTERPRETER_OUT", result.getMsg().get(0).getData()); assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(1).getData());*/ }
这边简单修改了这个测试类的代码
createInterpreter 是采用反射的方式构建进行实例化Interpreter,核心代码如下:
Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className); Properties p = new Properties(); p.putAll(properties); setSystemProperty(p); Constructor<Interpreter> constructor = replClass.getConstructor(new Class[]{Properties.class}); Interpreter interpreter = constructor.newInstance(p); interpreter.setClassloaderUrls(new URL[]{}); interpreter.setInterpreterGroup(interpreterGroup); interpreter.setUserName(userName); interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId);
interpreter 方法,就是实际执行具体的interpreter了,通过放入队列然后去执行job,最终实际执行代码InterpretJob 里面的jobRun()方法
public InterpreterResult jobRun() throws Throwable { ClassLoader currentThreadContextClassloader = Thread.currentThread().getContextClassLoader(); try { InterpreterContext.set(context); // clear the result of last run in frontend before running this paragraph. context.out.clear(); InterpreterResult result = null; // Open the interpreter instance prior to calling interpret(). // This is necessary because the earliest we can register a hook // is from within the open() method. LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter; if (!lazy.isOpen()) { lazy.open(); result = lazy.executePrecode(context); } if (result == null || result.code() == Code.SUCCESS) { // Add hooks to script from registry. // note scope first, followed by global scope. // Here's the code after hooking: // global_pre_hook // note_pre_hook // script // note_post_hook // global_post_hook processInterpreterHooks(context.getNoteId()); processInterpreterHooks(null); LOGGER.debug("Script after hooks: {}", script); result = interpreter.interpret(script, context); } // data from context.out is prepended to InterpreterResult if both defined context.out.flush(); List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); for (InterpreterResultMessage resultMessage : result.message()) { // only add non-empty InterpreterResultMessage if (!StringUtils.isBlank(resultMessage.getData())) { resultMessages.add(resultMessage); } } List<String> stringResult = new ArrayList<>(); for (InterpreterResultMessage msg : resultMessages) { if (msg.getType() == InterpreterResult.Type.IMG) { LOGGER.debug("InterpreterResultMessage: IMAGE_DATA"); } else { LOGGER.debug("InterpreterResultMessage: {}", msg); } stringResult.add(msg.getData()); } // put result into resource pool if (context.getLocalProperties().containsKey("saveAs")) { if (stringResult.size() == 1) { LOGGER.info("Saving result into ResourcePool as single string: " + context.getLocalProperties().get("saveAs")); context.getResourcePool().put( context.getLocalProperties().get("saveAs"), stringResult.get(0)); } else { LOGGER.info("Saving result into ResourcePool as string list: " + context.getLocalProperties().get("saveAs")); context.getResourcePool().put( context.getLocalProperties().get("saveAs"), stringResult); } } return new InterpreterResult(result.code(), resultMessages); } catch (Throwable e) { return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } finally { Thread.currentThread().setContextClassLoader(currentThreadContextClassloader); InterpreterContext.remove(); } }
至此这个代码已经和上一篇文章的jdbc Interpreter代码呼应了,也就是Interpreter 执行具体的jdbc Interpreter 的过程
RemoteInterpreterServer里面的main 方法启动线程实际在run方法 里面启动thrift server 端服务
public static void main(String[] args) throws Exception { String zeppelinServerHost = null; int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT; String portRange = ":"; String interpreterGroupId = null; if (args.length > 0) { zeppelinServerHost = args[0]; port = Integer.parseInt(args[1]); interpreterGroupId = args[2]; if (args.length > 3) { portRange = args[3]; } } RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange); remoteInterpreterServer.start(); /* * Registration of a ShutdownHook in case of an unpredictable system call * Examples: STRG+C, SIGTERM via kill */ shutdownThread = remoteInterpreterServer.new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_HOOK); Runtime.getRuntime().addShutdownHook(shutdownThread); remoteInterpreterServer.join(); LOGGER.info("RemoteInterpreterServer thread is finished"); /* TODO(pdallig): Remove System.exit(0) if the thrift server can be shut down successfully. * https://github.com/apache/thrift/commit/9cb1c794cd39cfb276771f8e52f0306eb8d462fd * should be part of the next release and solve the problem. * We may have other threads that are not terminated successfully. */ if (remoteInterpreterServer.isForceShutdown) { LOGGER.info("Force shutting down"); System.exit(0); } }
@Override public void run() { RemoteInterpreterService.Processor<RemoteInterpreterServer> processor = new RemoteInterpreterService.Processor<>(this); try (TServerSocket tSocket = new TServerSocket(port)){ server = new TThreadPoolServer( new TThreadPoolServer.Args(tSocket) .stopTimeoutVal(DEFAULT_SHUTDOWN_TIMEOUT) .stopTimeoutUnit(TimeUnit.MILLISECONDS) .processor(processor)); if (null != intpEventServerHost && !isTest) { Thread registerThread = new Thread(new RegisterRunnable()); registerThread.setName("RegisterThread"); registerThread.start(); } LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port); server.serve(); } catch (TTransportException e) { LOGGER.error("Failure in TTransport", e); } LOGGER.info("RemoteInterpreterServer-Thread finished"); }
参考:
mock 使用:https://zhuanlan.zhihu.com/p/51673406
Apache Thrift系列详解(一) - 概述与入门 - 掘金