浅谈 Apache Doris FE 处理查询 SQL 源码解析

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 浅谈 Apache Doris FE 处理查询 SQL 源码解析

一、前言

在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment

二、名词解释

  • FE:Frontend,即 Doris 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
  • BE:Backend,即 Doris 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
  • slot:计算槽,是一个资源单位, 只有给 task 分配了一个 slot 之后, 这个 task 才可以运行
  • planNode : 逻辑算子
  • planNodeTree: 逻辑执行计划

三、执行流程

640.jpg

四、Apache Doris 查询原理

(一)SQL 接收

本文只说 mysql 协议如何接收 SQL 语句, 如果感兴趣的同学可以看看 Apache Doris FE Web 的 Rest Api。Apache Doris 兼容 Mysql 协议,用户可以通过 Mysql 客户端和其他支持 Mysql 协议的工具向 Doris 发送查询请求。MysqlServer Listener() 负责监听客户端发送来的 Mysql 连接请求,每个连接请求都被封装成一个 ConnectContext 对象,并被提交给 ConnectScheduler。ConnectScheduler 会维护一个线程池,每个 ConnectContext 会在线程池中由一个 ConnectProcessor 线程处理。

  • MysqlServer 类 Listener 处理:
private class Listener implements Runnable {
        @Override
        public void run(){while (running && serverChannel.isOpen()) {
                SocketChannel clientChannel;
                try {clientChannel = serverChannel.accept();
                    if (clientChannel == null) {continue;}
                    // 构建 ConnectContext 对象
                    ConnectContext context = new ConnectContext(clientChannel);
                    // catelog 日志
                    context.setCatalog(Catalog.getCurrentCatalog());
                    // 向 ExecutorService 提交 new LoopHandler(context) ==>(源码)executor.submit(new LoopHandler(context))
                    if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
                        // clear up context
                        context.cleanup();}
                } catch (IOException e) {
                    // ClosedChannelException
                    // AsynchronousCloseException
                    // ClosedByInterruptException
                    // Other IOException, for example "to many open files" ...
                    LOG.warn("Query server encounter exception.", e);
                    try {Thread.sleep(100);
                    } catch (InterruptedException e1) {// Do nothing}
                } catch (Throwable e) {
                    // NotYetBoundException
                    // SecurityException
                    LOG.warn("Query server failed when calling accept.", e);
                }
            }
        }
    }
  • ExecutorService 线程 LoopHandler 处理:
@Override
        public void run() {
            try {
                // Set thread local info
                context.setThreadLocalInfo();
                context.setConnectScheduler(ConnectScheduler.this);
                // authenticate check failed.
                if (!MysqlProto.negotiate(context)) {return;}
                if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);
                } else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
                    MysqlProto.sendResponsePacket(context);
                    return;
                }
                context.setStartTime();
                ConnectProcessor processor = new ConnectProcessor(context);
                processor.loop();} catch (Exception e) {
                // for unauthorized access such lvs probe request, may cause exception, just log it in debug level
                if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);
                } else {LOG.debug("connect processor exception because", e);
                }
            } finally {unregisterConnection(context);
                context.cleanup();}
        }
  • processOnce(读取 Mysql 客户端的 sql) 方法
// 处理 mysql 的请求
    public void processOnce()throws IOException {ctx.getState().reset();
        executor = null;
        // 重置 MySQL 协议的序列号
        final MysqlChannel channel = ctx.getMysqlChannel();
        channel.setSequenceId(0);
        // 从通道读取数据包 ==>SQL
        try {packetBuf = channel.fetchOnePacket();
            if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
                throw new IOException("Error happened when receiving packet.");
            }
        } catch (AsynchronousCloseException e) {
            // when this happened, timeout checker close this channel
            // killed flag in ctx has been already set, just return
            return;
        }
        // 下发 SQL 
        dispatch();
        // finalize
        finalizeCommand();
        ctx.setCommand(MysqlCommand.COM_SLEEP);
    }

(二)Parse

ConnectProcessor 接收到 SQL 之后会进行 analyze ,Apache Doris SQL 解析使用的 Parse 是 Java CUP Parser,语法规则 定义的文件在 sql_parser.cup。

感兴趣的同学可以详细看一下 StatementBase 类

  • analyze 方法, 返回 List(这里主要是语法解析)
// 解析 origin,返回 list<stmt>
    private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);
        // 使用 CUP&FLEX 生成的解析器解析语句
        SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
        SqlParser parser = new SqlParser(input);
        try {return SqlParserUtils.getMultiStmts(parser);
        } catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
        } catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);
            LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
            if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);
            }
        } catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
            // should be removed this try-catch clause future.
            throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
        }
    }

因为本文讲述的是查询语句(不同类型会转换成不通 Stmt,比如 InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等),最后我们会得到 QueryStmt,originStmt 会转换成 QueryStmt,QueryStmt 通常是用 SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement 组成

(三)Analyze

SQL 语句被解析成 AST 之后,会被交给 StmtExecutor 。StmtExecutor 会首先对 AST 进行语法和语义分析,大概会做下面的事情:

  1. 检查并绑定 Cluster, Database, Table, Column 等元信息。
  2. SQL 的合法性检查:窗口函数不能 DISTINCT,HLL 和 Bitmap 列不能 sum, count, where 中不能有 grouping 操作等。
  3. SQL 重写:比如将 select * 扩展成 select 所有列,count distinct 查询重写等。
  4. Table 与 Column 别名处理。
  5. 为 Tuple, Slot, Expr 等分配唯一 ID。
  6. 函数参数的合法性检测。
  7. 表达式替换。
  8. 类型检查,类型转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)。

主要代码:

analyzeAndGenerateQueryPlan 方法 -->  parsedStmt.analyze(analyzer);

(四)Rewrite

  • analyzeAndGenerateQueryPlan 方法(部分代码,此处不做重点讲解)

StmtExecutor 在对 AST 进行语法和语义分析后,会让 ExprRewriter 根据 ExprRewriteRule 进行一次 Rewrite。目前 Doris 的重写规则比较简单,主要是进行了常量表达式的化简和谓词的简单处理。常量表达式的化简是指 1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。

如果重写后,有部分节点被成功改写,比如, 1 > 2 被改写成 Flase,那么就会再触发一次语法和语义分析的过程。

对于有子查询的 SQL,StmtRewriter 会进行重写,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。

if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();
            rewriter.reset();
            if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);
            }
            // explan 标签
            ExplainOptions explainOptions = parsedStmt.getExplainOptions();
            boolean reAnalyze = false;
            parsedStmt.rewriteExprs(rewriter);
            reAnalyze = rewriter.changed();
            if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
                reAnalyze = true;
            }
            if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}
            }
            if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
                for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}
                }
            }
            if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
                if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}
            }
            if (reAnalyze) {
                // 对重写语句进行处理
                List<Type> origResultTypes = Lists.newArrayList();
                for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());
                }
                List<String> origColLabels =
                        Lists.newArrayList(parsedStmt.getColLabels());
                // 重写语句进行 analyzer
                analyzer = new Analyzer(context.getCatalog(), context);
                // 重写语句 analyzer 信息
                parsedStmt.reset();
                parsedStmt.analyze(analyzer);
                // 恢复原始结果类型和列标签
                parsedStmt.castResultExprs(origResultTypes);
                parsedStmt.setColLabels(origColLabels);
                if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());
                }
                if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);
                }
            }
        }

(五)SingleNodePlan

经过 parse、Analyze、Rewrite 阶段后,AST 会生成 singleNodePlanner,源码如下:

singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();

单机 Plan 由 SingleNodePlanner 执行,输入是 AST,输出是单机物理执行 Plan, Plan 中每个节点是一个 PlanNode。

SingleNodePlanner 核心任务就是根据 AST 生成 OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode 等。

Doris 在生成单机 Plan 的时候主要进行了以下工作或优化

  1. Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化
//Slot物化,处理 Base表
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
// Slot物化 处理 where 语句的子查询
selectStmt.materializeRequiredSlots(analyzer);
  1. 投影下推:BE 在 Scan 时只会 Scan 必须读取的列
projectPlanNode(resultSlotIds, root);
  1. 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点
pushDownPredicates(analyzer, selectStmt);
  1. 分区,分桶裁剪:比如建表时按照 UserId 分桶,每个分区 100 个分桶,那么当不包含 or 的 Filter 条件包含 UserId ==xxx 时,Doris 就只会将查询发送 100 个分桶中的一个发送给 BE,可以大大减少不必要的数据读取
  2. Join Reorder:对于 join操作,在保证结果不变的情况,通过规则计算最优(最少资源)join 操作。
createCheapestJoinPlan(analyzer, refPlans);
  1. Sort + Limit 优化成 TopN(FE 进行useTopN标识,BE标识执行)
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
  1. MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的 MaterializedView
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
  1. 向量化执行引擎选择:基于现代CPU的特点与火山模型的执行特点,重新设计列式存储系统的SQL执行引擎,从而提高了CPU在SQL执行时的效率,提升了SQL查询的性能。
if (VectorizedUtil.isVectorized()) {
            singleNodePlan.convertToVectoriezd();
    }
  1. Runtime Filter Join:Doris 在进行 Hash Join 计算时会在右表构建一个哈希表,左表流式的通过右表的哈希表从而得出 Join 结果。而 RuntimeFilter 就是充分利用了右表的 Hash 表,在右表生成哈希表的时,同时生成一个基于哈希表数据的一个过滤条件,然后下推到左表的数据扫描节点
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());

创建 singleNodePlanner 主要代码:createSingleNodePlan()

(六)DistributedPlan

分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由 PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。

640.jpg

每个 PlanFragment 由 PlanNodeTree 和 Data Sink 组成,我们从上图的 Plan Fragment 2 可以看出,由 AggregationNode、HashJoinNode 和 DataSink。Plan 分布式化的方法是增加 ExchangeNode,执行计划树会以 ExchangeNode 为边界拆分为 PlanFragment。

ExchangeNode 主要是用于 BE 之间的数据交换与共享,类似 Spark 和 MR 中的 Shuffle。

各个 Fragment 的数据流转和最终的结果发送依赖:DataSink。比如 DataStreamSink 会将一个 Fragment 的数据发送到另一个 Fragment 的 ExchangeNode,ResultSink 会将查询的结果集发送到 FE。

每个 PlanFragment 可以在每个 BE 节点生成 1 个或多个执行实例,不同执行实例处理不同的数据集,通过并发来提升查询性能。

DistributedPlanner 中最主要的工作是决定 Join 的分布式执行策略:Shuffle Join,Bucket Join,Broadcast Join,Colocate Join,和增加 Aggregation 的 Merge 阶段。

决定 Join 的分布式执行策略的逻辑如下:

如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join 如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。

如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join 如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。

640.jpg

(七)Schedule

生成了 Plan Fragment Tree 之后,Apache Doris FE 通过 Coordinator 类对 Fragment 进行分配、分发步骤,主要涉及的方法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。

640.jpg

  • computeScanRangeAssignment():主要逻辑对fragment合理分配,尽可能保证每个BE节点的请求都是平均。
  • computeFragmentExecParams():处理Fragment执行参数。
  • sendFragment():发送Fragment至BE节点,

(八)Execute

Doris 的查询执行模式 Volcano 模式,不过做了 Batch 的优化,不同的 operator 之间以 RowBatch 的方式传输数据。

BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理。FragmentMgr::exec_plan_fragment 会启动一个线程由 PlanFragmentExecutor 具体执行一个 plan fragment。PlanFragmentExecutor 会根据 plan fragment 创建一个 ExecNode 树,FE 每个 PlanNode 都会对应 ExecNode 的一个子类。

PlanFragmentExecutor::get_next_internal 会驱动整个 ExecNode 树的执行,会自顶向下调用每个 ExecNode 的 get_next 方法,最终数据会从 ScanNode 节点产生,向上层节点传递,每个节点都会按照自己的逻辑处理 RowBatch。PlanFragmentExecutor 在拿到每个 RowBatch 后,如果是中间结果,就会将数据传输给其他 BE 节点,如果是最终结果,就会将数据传输给 FE 节点。

五、参考献文

  • Apache Doris Join原理

https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F

  • Apache Doris 存储层设计

https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html

  • Apache Doris 元数据设计

https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584

  • Apache Doris 查询原理

https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258

六、实践分享

七、总结

本文主要介绍查询 SQL 在 Apache Doris Fe 节点经历 parse、analyze、rewrite、GenerateQueryPlan、schedule、send 等阶段处理。Apache Doris Fe 的 parse、analyze、rewrite 阶段和其他数据库处理过程差不多,本文主要讲解的核心是 GenerateQueryPlan、schedule、send 阶段的原理。我们可以深度了解 Apache Doris Fe 节点对查询 SQL 的优化操作,以及未来遇到相关性能问题不会无从下手。

相关文章
|
19天前
|
SQL 关系型数据库 MySQL
数据库开发之SQL简介以及DDL的详细解析
数据库开发之SQL简介以及DDL的详细解析
27 0
|
18天前
|
关系型数据库 Apache 流计算
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
本文介绍了如何将数据从 OceanBase 迁移到阿里云数据库 SelectDB 内核版 Apache Doris。提供 3 种数据同步方法 1. 使用 DataX,下载 DataX 并编写配置文件,通过 OceanBaseReader 和 DorisWriter 进行数据迁移。 2. 利用 Apache Doris 的 Catalog功 能,将 OceanBase 表映射到 Doris 并插入数据。 3. 通过Flink CDC,设置 OceanBase 环境,配置 Flink 连接器,实现实时数据同步。
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
|
5天前
|
存储 监控 Apache
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
网易的灵犀办公和云信利用 Apache Doris 改进了大规模日志和时序数据处理,取代了 Elasticsearch 和 InfluxDB。Doris 实现了更低的服务器资源消耗和更高的查询性能,相比 Elasticsearch,查询速度提升至少 11 倍,存储资源节省达 70%。Doris 的列式存储、高压缩比和倒排索引等功能,优化了日志和时序数据的存储与分析,降低了存储成本并提高了查询效率。在灵犀办公和云信的实际应用中,Doris 显示出显著的性能优势,成功应对了数据增长带来的挑战。
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
|
11天前
|
存储 SQL Apache
阿里云数据库内核 Apache Doris 基于 Workload Group 的负载隔离能力解读
阿里云数据库内核 Apache Doris 基于 Workload Group 的负载隔离能力解读
阿里云数据库内核 Apache Doris 基于 Workload Group 的负载隔离能力解读
|
18天前
|
SQL 分布式计算 资源调度
一文解析 ODPS SQL 任务优化方法原理
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。
103482 1
|
23天前
|
SQL 人工智能 编解码
NL2SQL实践系列(1):深入解析Prompt工程在text2sql中的应用技巧
NL2SQL实践系列(1):深入解析Prompt工程在text2sql中的应用技巧
NL2SQL实践系列(1):深入解析Prompt工程在text2sql中的应用技巧
|
23天前
|
Kubernetes 关系型数据库 Apache
Apache Doris 2.1.2 版本正式发布!
Apache Doris 2.1.2 版本正式发布!该版本提交了若干改进项以及问题修复,进一步提升了系统的性能及稳定性,欢迎大家下载体验!
|
1月前
|
Java 数据处理 调度
更高效准确的数据库内部任务调度实践,阿里云数据库SelectDB 内核 Apache Doris 内置 Job Scheduler 的实现与应用
Apache Doris 2.1 引入了内置的 Job Scheduler,旨在解决依赖外部调度系统的问题,提供秒级精确的定时任务管理。
|
1天前
|
SQL 存储 数据库连接
LabVIEW与SQL Server 2919 Express通讯
LabVIEW与SQL Server 2919 Express通讯
|
1天前
|
SQL Windows
安装SQL Server 2005时出现对性能监视器计数器注册表值执行系统配置检查失败的解决办法...
安装SQL Server 2005时出现对性能监视器计数器注册表值执行系统配置检查失败的解决办法...
11 4

推荐镜像

更多