一、简介
Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。
Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!
二、名词解释
- FE:Frontend,即 Doris 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
- BE:Backend,即 Doris 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
- bdbje:Oracle Berkeley DB Java Edition (opens new window)。在 Doris 中,我们使用 bdbje 完成元数据操作日志的持久化、FE 高可用等功能。
三、流程图
四、源码分析
下载Doris源码详细步骤:https://doris.apache.org/zh-CN/developer-guide/fe-idea-dev.html#_1-%E7%8E%AF%E5%A2%83%E5%87%86%E5%A4%87
Doris FE启动步骤
我们先看看 FE启动类代码:
if (Strings.isNullOrEmpty(dorisHomeDir)) { System.err.println("env DORIS_HOME is not set."); return; } if (Strings.isNullOrEmpty(pidDir)) { System.err.println("env PID_DIR is not set."); return; } CommandLineOptions cmdLineOpts = parseArgs(args); try { // 创建 pid 文件 if (!createAndLockPidFile(pidDir + "/fe.pid")) { throw new IOException("pid file is already locked."); } // 初始化 config文件 Config config = new Config(); config.init(dorisHomeDir + "/conf/fe.conf"); // Must init custom config after init config, separately. // Because the path of custom config file is defined in fe.conf config.initCustom(Config.custom_config_dir + "/fe_custom.conf"); LdapConfig ldapConfig = new LdapConfig(); if (new File(dorisHomeDir + "/conf/ldap.conf").exists()) { ldapConfig.init(dorisHomeDir + "/conf/ldap.conf"); } // check it after Config is initialized, otherwise the config 'check_java_version' won't work. if (!JdkUtils.checkJavaVersion()) { throw new IllegalArgumentException("Java version doesn't match"); } Log4jConfig.initLogging(dorisHomeDir + "/conf/"); // set dns cache ttl java.security.Security.setProperty("networkaddress.cache.ttl", "60"); // check command line options checkCommandLineOptions(cmdLineOpts); LOG.info("Palo FE starting..."); //FE Address 初始化 FrontendOptions.init(); // 检查端口是否正常 checkAllPorts(); if (Config.enable_bdbje_debug_mode) { // Start in BDB Debug mode BDBDebugger.get().startDebugMode(dorisHomeDir); return; } // 初始化 Catelog 并且等待加载完成 Catalog.getCurrentCatalog().initialize(args); Catalog.getCurrentCatalog().waitForReady(); // 第一步 启动 HttpServer 类 // 第二步 启动 FeServer 类 // 第三步 启动 QeService QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled, ExecuteEnv.getInstance().getScheduler()); FeServer feServer = new FeServer(Config.rpc_port); feServer.start(); HttpServer httpServer = new HttpServer(); httpServer.setPort(Config.http_port); httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size); httpServer.setAcceptors(Config.jetty_server_acceptors); httpServer.setSelectors(Config.jetty_server_selectors); httpServer.setWorkers(Config.jetty_server_workers); httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads); httpServer.setMaxThreads(Config.jetty_threadPool_minThreads); httpServer.start(); qeService.start(); ThreadPoolManager.registerAllThreadPoolMetric(); while (true) { Thread.sleep(2000); } } catch (Throwable e) { e.printStackTrace(); }
通过上面代码,我们可以清楚了解FE启动时主要执行以下过程:
- 初始化 Catalog ,并且等待Catelog加载完成
- 创建 QeServer ,负责与mysql client 通信
- 创建 FeServer ,由Thrift Server组成,负责 FE 和 BE 通信
- 创建 HttpServer ,负责提供Rest API以及Doris FE前端页面接口
CataLog 源码解析
CataLog 主要职责是维护FE 元数据,接下来我们看看FE启动时,CataLog初始化时,做什么处理:
// 获取本地节点和helper节点信息 getSelfHostPort(); getHelperNodes(args); // 检查meta文件目录是否创建 File meta = new File(metaDir); if (!meta.exists()) { LOG.warn("Doris' meta dir {} does not exist. You need to create it before starting FE", meta.getAbsolutePath()); throw new Exception(meta.getAbsolutePath() + " does not exist, will exit"); } //检查 BDB和Image目录是否创建 if (Config.edit_log_type.equalsIgnoreCase("bdb")) { File bdbDir = new File(this.bdbDir); if (!bdbDir.exists()) { bdbDir.mkdirs(); } File imageDir = new File(this.imageDir); if (!imageDir.exists()) { imageDir.mkdirs(); } } else { throw new Exception("Invalid edit log type: " + Config.edit_log_type); } // 初始化插件管理 pluginMgr.init(); auditEventProcessor.start(); // 2.获取集群ID和角色(Observer or Follower) getClusterIdAndRole(); // 3. 首次加载image文件和回放Elog日志 this.editLog = new EditLog(nodeName); loadImage(this.imageDir); // 加载image文件 editLog.open(); // 夹杂bdb环境配置 this.globalTransactionMgr.setEditLog(editLog); this.idGenerator.setEditLog(editLog); // 4. 创建加载和导出作业标签清理Daemon线程 createLabelCleaner(); // 5. 创建事务清理Daemon线程 createTxnCleaner(); // 6. 开始监听线程状态(MASTER/FOLLOWER/OBSERVER状态转换,以及leader选举工作和元数据同步工作) createStateListener(); listener.start();
通过上面源码,我们可以发现,CateLog初始化时,执行以下操作:
- 首先对Image镜像文件读取数据,和对Elog进行回放操作。
- 创建加载和导出作业标签清理Daemon线程
- 创建事务清理Daemon线程
- 开始监听线程状态(MASTER/FOLLOWER/OBSERVER状态转换,以及leader选举工作和元数据同步工作)「(后续更新一篇文章,专门说元数据同步和Leader选举流程源码解析)」
QeServer 源码解析
QeServer职责是与Mysql Client进行通讯,支持Socket和Nio连接,具体源码:
try { HelpModule.getInstance().setUpModule(); } catch (Exception e) { LOG.error("Help module failed, because:", e); } this.port = port; if (nioEnabled) { mysqlServer = new NMysqlServer(port, scheduler); } else { mysqlServer = new MysqlServer(port, scheduler); }
当nioEnabled(可配置) 为true时,使用Nio进行通讯,采用这种方式通信的好处是:
- 同步非阻塞IO
- IO是面向流的,NIO是面向缓冲区的
- NIO引入了选择器的概念,选择器用于监听多个通道的事件
FeServer 源码解析
FeServer职责是负责FE和BE之间通信。
try { switch (type) { case SIMPLE: createSimpleServer(); break; case THREADED_SELECTOR: createThreadedServer(); break; default: createThreadPoolServer(); } } catch (TTransportException ex) { LOG.warn("create thrift server failed.", ex); throw new IOException("create thrift server failed.", ex); } ThriftServerEventProcessor eventProcessor = new ThriftServerEventProcessor(this); server.setServerEventHandler(eventProcessor); serverThread = new Thread(new Runnable() { @Override public void run() { server.serve(); } }); serverThread.setDaemon(true); serverThread.start();
FE的Thrift使用的服务模型分为三种:
- SIMPLE:一般不适用于生产环境,仅限于测试使用。
- THREADED_SELECTOR:非阻塞式I/O模型,即主从 Reactor 模型,该模型能及时响应大量的并发连接请求,在多数场景下有较好的表现。
- THREAD_POOL:阻塞式I/O模型,使用线程池处理用户连接,并发连接数受限于线程池的数量,如果能提前预估并发请求的数量,并且能容忍足够多的线程资源开销,该模型会有较好的性能表现,默认使用该服务模型
HttpServer 源码解析
HttpServer职责主要是为Rest API和doris Web页面提供接口服务,源码如下:
Map<String, Object> properties = new HashMap<>(); properties.put("server.port", port); properties.put("server.servlet.context-path", "/"); properties.put("spring.resources.static-locations", "classpath:/static"); properties.put("spring.http.encoding.charset", "UTF-8"); properties.put("spring.http.encoding.enabled", true); properties.put("spring.http.encoding.force", true); //enable jetty config properties.put("server.jetty.acceptors", this.acceptors); properties.put("server.jetty.max-http-post-size", this.maxHttpPostSize); properties.put("server.jetty.selectors", this.selectors); //Worker thread pool is not set by default, set according to your needs if(this.workers > 0) { properties.put("server.jetty.workers", this.workers); } // This is to disable the spring-boot-devtools restart feature. // To avoid some unexpected behavior. System.setProperty("spring.devtools.restart.enabled", "false"); // Value of `DORIS_HOME_DIR` is null in unit test. if (PaloFe.DORIS_HOME_DIR != null) { System.setProperty("spring.http.multipart.location", PaloFe.DORIS_HOME_DIR); } System.setProperty("spring.banner.image.location", "doris-logo.png"); if (FeConstants.runningUnitTest) { // this is currently only used for unit test properties.put("logging.config", getClass().getClassLoader().getResource("log4j2.xml").getPath()); } else { properties.put("logging.config", Config.custom_config_dir + "/" + SpringLog4j2Config.SPRING_LOG_XML_FILE); } new SpringApplicationBuilder() .sources(HttpServer.class) .properties(properties) .run(new String[]{});
HttpServer继承了SpringBootServletInitializer,同时使用了SpringApplicationBuilder类,那么我们就可以很清楚知道,使用Springboot框架提供Rest Api服务。
五、问题思考
- CataLog 如何对Elog进行回放?
- ELog 日志如何实现数据同步?
- BDB 如何存储元数据的?
- 说说Doris FE Leader选举流程
- Doris FE Leader节点如何判断non-Leader节点是否在线?leader和non-leader长时间失联会导致non-leader提供过期元数据,此问题如何解决?