一、前言
在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment
二、名词解释
- FE:Frontend,即 Doris 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
- BE:Backend,即 Doris 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
- slot:计算槽,是一个资源单位, 只有给 task 分配了一个 slot 之后, 这个 task 才可以运行
- planNode : 逻辑算子
- planNodeTree: 逻辑执行计划
三、执行流程
四、Apache Doris 查询原理
(一)SQL 接收
本文只说 mysql 协议如何接收 SQL 语句, 如果感兴趣的同学可以看看 Apache Doris FE Web 的 Rest Api。Apache Doris 兼容 Mysql 协议,用户可以通过 Mysql 客户端和其他支持 Mysql 协议的工具向 Doris 发送查询请求。MysqlServer Listener() 负责监听客户端发送来的 Mysql 连接请求,每个连接请求都被封装成一个 ConnectContext 对象,并被提交给 ConnectScheduler。ConnectScheduler 会维护一个线程池,每个 ConnectContext 会在线程池中由一个 ConnectProcessor 线程处理。
- MysqlServer 类 Listener 处理:
private class Listener implements Runnable { @Override public void run(){while (running && serverChannel.isOpen()) { SocketChannel clientChannel; try {clientChannel = serverChannel.accept(); if (clientChannel == null) {continue;} // 构建 ConnectContext 对象 ConnectContext context = new ConnectContext(clientChannel); // catelog 日志 context.setCatalog(Catalog.getCurrentCatalog()); // 向 ExecutorService 提交 new LoopHandler(context) ==>(源码)executor.submit(new LoopHandler(context)) if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString()); // clear up context context.cleanup();} } catch (IOException e) { // ClosedChannelException // AsynchronousCloseException // ClosedByInterruptException // Other IOException, for example "to many open files" ... LOG.warn("Query server encounter exception.", e); try {Thread.sleep(100); } catch (InterruptedException e1) {// Do nothing} } catch (Throwable e) { // NotYetBoundException // SecurityException LOG.warn("Query server failed when calling accept.", e); } } } }
- ExecutorService 线程 LoopHandler 处理:
@Override public void run() { try { // Set thread local info context.setThreadLocalInfo(); context.setConnectScheduler(ConnectScheduler.this); // authenticate check failed. if (!MysqlProto.negotiate(context)) {return;} if (registerConnection(context)) {MysqlProto.sendResponsePacket(context); } else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections"); MysqlProto.sendResponsePacket(context); return; } context.setStartTime(); ConnectProcessor processor = new ConnectProcessor(context); processor.loop();} catch (Exception e) { // for unauthorized access such lvs probe request, may cause exception, just log it in debug level if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e); } else {LOG.debug("connect processor exception because", e); } } finally {unregisterConnection(context); context.cleanup();} }
- processOnce(读取 Mysql 客户端的 sql) 方法
// 处理 mysql 的请求 public void processOnce()throws IOException {ctx.getState().reset(); executor = null; // 重置 MySQL 协议的序列号 final MysqlChannel channel = ctx.getMysqlChannel(); channel.setSequenceId(0); // 从通道读取数据包 ==>SQL try {packetBuf = channel.fetchOnePacket(); if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString()); throw new IOException("Error happened when receiving packet."); } } catch (AsynchronousCloseException e) { // when this happened, timeout checker close this channel // killed flag in ctx has been already set, just return return; } // 下发 SQL dispatch(); // finalize finalizeCommand(); ctx.setCommand(MysqlCommand.COM_SLEEP); }
(二)Parse
ConnectProcessor 接收到 SQL 之后会进行 analyze ,Apache Doris SQL 解析使用的 Parse 是 Java CUP Parser,语法规则 定义的文件在 sql_parser.cup。
感兴趣的同学可以详细看一下 StatementBase 类
- analyze 方法, 返回 List(这里主要是语法解析)
// 解析 origin,返回 list<stmt> private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt); // 使用 CUP&FLEX 生成的解析器解析语句 SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode()); SqlParser parser = new SqlParser(input); try {return SqlParserUtils.getMultiStmts(parser); } catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e); } catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt); LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e); if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e); } } catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error, // should be removed this try-catch clause future. throw new AnalysisException("Internal Error, maybe syntax error or this is a bug"); } }
因为本文讲述的是查询语句(不同类型会转换成不通 Stmt,比如 InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等),最后我们会得到 QueryStmt,originStmt 会转换成 QueryStmt,QueryStmt 通常是用 SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement 组成
(三)Analyze
SQL 语句被解析成 AST 之后,会被交给 StmtExecutor 。StmtExecutor 会首先对 AST 进行语法和语义分析,大概会做下面的事情:
- 检查并绑定 Cluster, Database, Table, Column 等元信息。
- SQL 的合法性检查:窗口函数不能 DISTINCT,HLL 和 Bitmap 列不能 sum, count, where 中不能有 grouping 操作等。
- SQL 重写:比如将 select * 扩展成 select 所有列,count distinct 查询重写等。
- Table 与 Column 别名处理。
- 为 Tuple, Slot, Expr 等分配唯一 ID。
- 函数参数的合法性检测。
- 表达式替换。
- 类型检查,类型转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)。
主要代码:
analyzeAndGenerateQueryPlan 方法 --> parsedStmt.analyze(analyzer);
(四)Rewrite
- analyzeAndGenerateQueryPlan 方法(部分代码,此处不做重点讲解)
StmtExecutor 在对 AST 进行语法和语义分析后,会让 ExprRewriter 根据 ExprRewriteRule 进行一次 Rewrite。目前 Doris 的重写规则比较简单,主要是进行了常量表达式的化简和谓词的简单处理。常量表达式的化简是指 1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。
如果重写后,有部分节点被成功改写,比如, 1 > 2 被改写成 Flase,那么就会再触发一次语法和语义分析的过程。
对于有子查询的 SQL,StmtRewriter 会进行重写,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。
if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter(); rewriter.reset(); if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter); } // explan 标签 ExplainOptions explainOptions = parsedStmt.getExplainOptions(); boolean reAnalyze = false; parsedStmt.rewriteExprs(rewriter); reAnalyze = rewriter.changed(); if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt); reAnalyze = true; } if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;} } if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands(); for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;} } } if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt(); if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;} } if (reAnalyze) { // 对重写语句进行处理 List<Type> origResultTypes = Lists.newArrayList(); for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType()); } List<String> origColLabels = Lists.newArrayList(parsedStmt.getColLabels()); // 重写语句进行 analyzer analyzer = new Analyzer(context.getCatalog(), context); // 重写语句 analyzer 信息 parsedStmt.reset(); parsedStmt.analyze(analyzer); // 恢复原始结果类型和列标签 parsedStmt.castResultExprs(origResultTypes); parsedStmt.setColLabels(origColLabels); if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql()); } if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions); } } }
(五)SingleNodePlan
经过 parse、Analyze、Rewrite 阶段后,AST 会生成 singleNodePlanner,源码如下:
singleNodePlanner = new SingleNodePlanner(plannerContext); PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
单机 Plan 由 SingleNodePlanner 执行,输入是 AST,输出是单机物理执行 Plan, Plan 中每个节点是一个 PlanNode。
SingleNodePlanner 核心任务就是根据 AST 生成 OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode 等。
Doris 在生成单机 Plan 的时候主要进行了以下工作或优化 :
- Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化
//Slot物化,处理 Base表 analyzer.materializeSlots(queryStmt.getBaseTblResultExprs()); // Slot物化 处理 where 语句的子查询 selectStmt.materializeRequiredSlots(analyzer);
- 投影下推:BE 在 Scan 时只会 Scan 必须读取的列
projectPlanNode(resultSlotIds, root);
- 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点
pushDownPredicates(analyzer, selectStmt);
- 分区,分桶裁剪:比如建表时按照 UserId 分桶,每个分区 100 个分桶,那么当不包含 or 的 Filter 条件包含 UserId ==xxx 时,Doris 就只会将查询发送 100 个分桶中的一个发送给 BE,可以大大减少不必要的数据读取
- Join Reorder:对于 join操作,在保证结果不变的情况,通过规则计算最优(最少资源)join 操作。
createCheapestJoinPlan(analyzer, refPlans);
- Sort + Limit 优化成 TopN(FE 进行useTopN标识,BE标识执行)
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
- MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的 MaterializedView
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
- 向量化执行引擎选择:基于现代CPU的特点与火山模型的执行特点,重新设计列式存储系统的SQL执行引擎,从而提高了CPU在SQL执行时的效率,提升了SQL查询的性能。
if (VectorizedUtil.isVectorized()) { singleNodePlan.convertToVectoriezd(); }
- Runtime Filter Join:Doris 在进行 Hash Join 计算时会在右表构建一个哈希表,左表流式的通过右表的哈希表从而得出 Join 结果。而 RuntimeFilter 就是充分利用了右表的 Hash 表,在右表生成哈希表的时,同时生成一个基于哈希表数据的一个过滤条件,然后下推到左表的数据扫描节点
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
创建 singleNodePlanner 主要代码:createSingleNodePlan()
(六)DistributedPlan
分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由 PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。
每个 PlanFragment 由 PlanNodeTree 和 Data Sink 组成,我们从上图的 Plan Fragment 2 可以看出,由 AggregationNode、HashJoinNode 和 DataSink。Plan 分布式化的方法是增加 ExchangeNode,执行计划树会以 ExchangeNode 为边界拆分为 PlanFragment。
ExchangeNode 主要是用于 BE 之间的数据交换与共享,类似 Spark 和 MR 中的 Shuffle。
各个 Fragment 的数据流转和最终的结果发送依赖:DataSink。比如 DataStreamSink 会将一个 Fragment 的数据发送到另一个 Fragment 的 ExchangeNode,ResultSink 会将查询的结果集发送到 FE。
每个 PlanFragment 可以在每个 BE 节点生成 1 个或多个执行实例,不同执行实例处理不同的数据集,通过并发来提升查询性能。
DistributedPlanner 中最主要的工作是决定 Join 的分布式执行策略:Shuffle Join,Bucket Join,Broadcast Join,Colocate Join,和增加 Aggregation 的 Merge 阶段。
决定 Join 的分布式执行策略的逻辑如下:
如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join 如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。
如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join 如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。
(七)Schedule
生成了 Plan Fragment Tree 之后,Apache Doris FE 通过 Coordinator 类对 Fragment 进行分配、分发步骤,主要涉及的方法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。
- computeScanRangeAssignment():主要逻辑对fragment合理分配,尽可能保证每个BE节点的请求都是平均。
- computeFragmentExecParams():处理Fragment执行参数。
- sendFragment():发送Fragment至BE节点,
(八)Execute
Doris 的查询执行模式 Volcano 模式,不过做了 Batch 的优化,不同的 operator 之间以 RowBatch 的方式传输数据。
BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理。FragmentMgr::exec_plan_fragment 会启动一个线程由 PlanFragmentExecutor 具体执行一个 plan fragment。PlanFragmentExecutor 会根据 plan fragment 创建一个 ExecNode 树,FE 每个 PlanNode 都会对应 ExecNode 的一个子类。
PlanFragmentExecutor::get_next_internal 会驱动整个 ExecNode 树的执行,会自顶向下调用每个 ExecNode 的 get_next 方法,最终数据会从 ScanNode 节点产生,向上层节点传递,每个节点都会按照自己的逻辑处理 RowBatch。PlanFragmentExecutor 在拿到每个 RowBatch 后,如果是中间结果,就会将数据传输给其他 BE 节点,如果是最终结果,就会将数据传输给 FE 节点。
五、参考献文
- Apache Doris Join原理
- Apache Doris 存储层设计
https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html
- Apache Doris 元数据设计
- Apache Doris 查询原理
https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258
六、实践分享
- Apache Doris 在网易互娱的应用实践
- Apache Doris 在知乎用户画像与实时数据的架构与实践
- Apache Doris 物化视图与索引在京东的典型应用
- Apache Doris Join 实现与调优实践
七、总结
本文主要介绍查询 SQL 在 Apache Doris Fe 节点经历 parse、analyze、rewrite、GenerateQueryPlan、schedule、send 等阶段处理。Apache Doris Fe 的 parse、analyze、rewrite 阶段和其他数据库处理过程差不多,本文主要讲解的核心是 GenerateQueryPlan、schedule、send 阶段的原理。我们可以深度了解 Apache Doris Fe 节点对查询 SQL 的优化操作,以及未来遇到相关性能问题不会无从下手。