浅谈 Apache Doris FE 处理查询 SQL 源码解析

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 浅谈 Apache Doris FE 处理查询 SQL 源码解析

一、前言

在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment

二、名词解释

  • FE:Frontend,即 Doris 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
  • BE:Backend,即 Doris 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
  • slot:计算槽,是一个资源单位, 只有给 task 分配了一个 slot 之后, 这个 task 才可以运行
  • planNode : 逻辑算子
  • planNodeTree: 逻辑执行计划

三、执行流程

640.jpg

四、Apache Doris 查询原理

(一)SQL 接收

本文只说 mysql 协议如何接收 SQL 语句, 如果感兴趣的同学可以看看 Apache Doris FE Web 的 Rest Api。Apache Doris 兼容 Mysql 协议,用户可以通过 Mysql 客户端和其他支持 Mysql 协议的工具向 Doris 发送查询请求。MysqlServer Listener() 负责监听客户端发送来的 Mysql 连接请求,每个连接请求都被封装成一个 ConnectContext 对象,并被提交给 ConnectScheduler。ConnectScheduler 会维护一个线程池,每个 ConnectContext 会在线程池中由一个 ConnectProcessor 线程处理。

  • MysqlServer 类 Listener 处理:
private class Listener implements Runnable {
        @Override
        public void run(){while (running && serverChannel.isOpen()) {
                SocketChannel clientChannel;
                try {clientChannel = serverChannel.accept();
                    if (clientChannel == null) {continue;}
                    // 构建 ConnectContext 对象
                    ConnectContext context = new ConnectContext(clientChannel);
                    // catelog 日志
                    context.setCatalog(Catalog.getCurrentCatalog());
                    // 向 ExecutorService 提交 new LoopHandler(context) ==>(源码)executor.submit(new LoopHandler(context))
                    if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
                        // clear up context
                        context.cleanup();}
                } catch (IOException e) {
                    // ClosedChannelException
                    // AsynchronousCloseException
                    // ClosedByInterruptException
                    // Other IOException, for example "to many open files" ...
                    LOG.warn("Query server encounter exception.", e);
                    try {Thread.sleep(100);
                    } catch (InterruptedException e1) {// Do nothing}
                } catch (Throwable e) {
                    // NotYetBoundException
                    // SecurityException
                    LOG.warn("Query server failed when calling accept.", e);
                }
            }
        }
    }
  • ExecutorService 线程 LoopHandler 处理:
@Override
        public void run() {
            try {
                // Set thread local info
                context.setThreadLocalInfo();
                context.setConnectScheduler(ConnectScheduler.this);
                // authenticate check failed.
                if (!MysqlProto.negotiate(context)) {return;}
                if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);
                } else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
                    MysqlProto.sendResponsePacket(context);
                    return;
                }
                context.setStartTime();
                ConnectProcessor processor = new ConnectProcessor(context);
                processor.loop();} catch (Exception e) {
                // for unauthorized access such lvs probe request, may cause exception, just log it in debug level
                if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);
                } else {LOG.debug("connect processor exception because", e);
                }
            } finally {unregisterConnection(context);
                context.cleanup();}
        }
  • processOnce(读取 Mysql 客户端的 sql) 方法
// 处理 mysql 的请求
    public void processOnce()throws IOException {ctx.getState().reset();
        executor = null;
        // 重置 MySQL 协议的序列号
        final MysqlChannel channel = ctx.getMysqlChannel();
        channel.setSequenceId(0);
        // 从通道读取数据包 ==>SQL
        try {packetBuf = channel.fetchOnePacket();
            if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
                throw new IOException("Error happened when receiving packet.");
            }
        } catch (AsynchronousCloseException e) {
            // when this happened, timeout checker close this channel
            // killed flag in ctx has been already set, just return
            return;
        }
        // 下发 SQL 
        dispatch();
        // finalize
        finalizeCommand();
        ctx.setCommand(MysqlCommand.COM_SLEEP);
    }

(二)Parse

ConnectProcessor 接收到 SQL 之后会进行 analyze ,Apache Doris SQL 解析使用的 Parse 是 Java CUP Parser,语法规则 定义的文件在 sql_parser.cup。

感兴趣的同学可以详细看一下 StatementBase 类

  • analyze 方法, 返回 List(这里主要是语法解析)
// 解析 origin,返回 list<stmt>
    private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);
        // 使用 CUP&FLEX 生成的解析器解析语句
        SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
        SqlParser parser = new SqlParser(input);
        try {return SqlParserUtils.getMultiStmts(parser);
        } catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
        } catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);
            LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
            if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);
            }
        } catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
            // should be removed this try-catch clause future.
            throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
        }
    }

因为本文讲述的是查询语句(不同类型会转换成不通 Stmt,比如 InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等),最后我们会得到 QueryStmt,originStmt 会转换成 QueryStmt,QueryStmt 通常是用 SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement 组成

(三)Analyze

SQL 语句被解析成 AST 之后,会被交给 StmtExecutor 。StmtExecutor 会首先对 AST 进行语法和语义分析,大概会做下面的事情:

  1. 检查并绑定 Cluster, Database, Table, Column 等元信息。
  2. SQL 的合法性检查:窗口函数不能 DISTINCT,HLL 和 Bitmap 列不能 sum, count, where 中不能有 grouping 操作等。
  3. SQL 重写:比如将 select * 扩展成 select 所有列,count distinct 查询重写等。
  4. Table 与 Column 别名处理。
  5. 为 Tuple, Slot, Expr 等分配唯一 ID。
  6. 函数参数的合法性检测。
  7. 表达式替换。
  8. 类型检查,类型转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)。

主要代码:

analyzeAndGenerateQueryPlan 方法 -->  parsedStmt.analyze(analyzer);

(四)Rewrite

  • analyzeAndGenerateQueryPlan 方法(部分代码,此处不做重点讲解)

StmtExecutor 在对 AST 进行语法和语义分析后,会让 ExprRewriter 根据 ExprRewriteRule 进行一次 Rewrite。目前 Doris 的重写规则比较简单,主要是进行了常量表达式的化简和谓词的简单处理。常量表达式的化简是指 1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。

如果重写后,有部分节点被成功改写,比如, 1 > 2 被改写成 Flase,那么就会再触发一次语法和语义分析的过程。

对于有子查询的 SQL,StmtRewriter 会进行重写,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。

if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();
            rewriter.reset();
            if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);
            }
            // explan 标签
            ExplainOptions explainOptions = parsedStmt.getExplainOptions();
            boolean reAnalyze = false;
            parsedStmt.rewriteExprs(rewriter);
            reAnalyze = rewriter.changed();
            if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
                reAnalyze = true;
            }
            if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}
            }
            if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
                for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}
                }
            }
            if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
                if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}
            }
            if (reAnalyze) {
                // 对重写语句进行处理
                List<Type> origResultTypes = Lists.newArrayList();
                for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());
                }
                List<String> origColLabels =
                        Lists.newArrayList(parsedStmt.getColLabels());
                // 重写语句进行 analyzer
                analyzer = new Analyzer(context.getCatalog(), context);
                // 重写语句 analyzer 信息
                parsedStmt.reset();
                parsedStmt.analyze(analyzer);
                // 恢复原始结果类型和列标签
                parsedStmt.castResultExprs(origResultTypes);
                parsedStmt.setColLabels(origColLabels);
                if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());
                }
                if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);
                }
            }
        }

(五)SingleNodePlan

经过 parse、Analyze、Rewrite 阶段后,AST 会生成 singleNodePlanner,源码如下:

singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();

单机 Plan 由 SingleNodePlanner 执行,输入是 AST,输出是单机物理执行 Plan, Plan 中每个节点是一个 PlanNode。

SingleNodePlanner 核心任务就是根据 AST 生成 OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode 等。

Doris 在生成单机 Plan 的时候主要进行了以下工作或优化

  1. Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化
//Slot物化,处理 Base表
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
// Slot物化 处理 where 语句的子查询
selectStmt.materializeRequiredSlots(analyzer);
  1. 投影下推:BE 在 Scan 时只会 Scan 必须读取的列
projectPlanNode(resultSlotIds, root);
  1. 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点
pushDownPredicates(analyzer, selectStmt);
  1. 分区,分桶裁剪:比如建表时按照 UserId 分桶,每个分区 100 个分桶,那么当不包含 or 的 Filter 条件包含 UserId ==xxx 时,Doris 就只会将查询发送 100 个分桶中的一个发送给 BE,可以大大减少不必要的数据读取
  2. Join Reorder:对于 join操作,在保证结果不变的情况,通过规则计算最优(最少资源)join 操作。
createCheapestJoinPlan(analyzer, refPlans);
  1. Sort + Limit 优化成 TopN(FE 进行useTopN标识,BE标识执行)
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
  1. MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的 MaterializedView
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
  1. 向量化执行引擎选择:基于现代CPU的特点与火山模型的执行特点,重新设计列式存储系统的SQL执行引擎,从而提高了CPU在SQL执行时的效率,提升了SQL查询的性能。
if (VectorizedUtil.isVectorized()) {
            singleNodePlan.convertToVectoriezd();
    }
  1. Runtime Filter Join:Doris 在进行 Hash Join 计算时会在右表构建一个哈希表,左表流式的通过右表的哈希表从而得出 Join 结果。而 RuntimeFilter 就是充分利用了右表的 Hash 表,在右表生成哈希表的时,同时生成一个基于哈希表数据的一个过滤条件,然后下推到左表的数据扫描节点
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());

创建 singleNodePlanner 主要代码:createSingleNodePlan()

(六)DistributedPlan

分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由 PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。

640.jpg

每个 PlanFragment 由 PlanNodeTree 和 Data Sink 组成,我们从上图的 Plan Fragment 2 可以看出,由 AggregationNode、HashJoinNode 和 DataSink。Plan 分布式化的方法是增加 ExchangeNode,执行计划树会以 ExchangeNode 为边界拆分为 PlanFragment。

ExchangeNode 主要是用于 BE 之间的数据交换与共享,类似 Spark 和 MR 中的 Shuffle。

各个 Fragment 的数据流转和最终的结果发送依赖:DataSink。比如 DataStreamSink 会将一个 Fragment 的数据发送到另一个 Fragment 的 ExchangeNode,ResultSink 会将查询的结果集发送到 FE。

每个 PlanFragment 可以在每个 BE 节点生成 1 个或多个执行实例,不同执行实例处理不同的数据集,通过并发来提升查询性能。

DistributedPlanner 中最主要的工作是决定 Join 的分布式执行策略:Shuffle Join,Bucket Join,Broadcast Join,Colocate Join,和增加 Aggregation 的 Merge 阶段。

决定 Join 的分布式执行策略的逻辑如下:

如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join 如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。

如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join 如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。

640.jpg

(七)Schedule

生成了 Plan Fragment Tree 之后,Apache Doris FE 通过 Coordinator 类对 Fragment 进行分配、分发步骤,主要涉及的方法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。

640.jpg

  • computeScanRangeAssignment():主要逻辑对fragment合理分配,尽可能保证每个BE节点的请求都是平均。
  • computeFragmentExecParams():处理Fragment执行参数。
  • sendFragment():发送Fragment至BE节点,

(八)Execute

Doris 的查询执行模式 Volcano 模式,不过做了 Batch 的优化,不同的 operator 之间以 RowBatch 的方式传输数据。

BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理。FragmentMgr::exec_plan_fragment 会启动一个线程由 PlanFragmentExecutor 具体执行一个 plan fragment。PlanFragmentExecutor 会根据 plan fragment 创建一个 ExecNode 树,FE 每个 PlanNode 都会对应 ExecNode 的一个子类。

PlanFragmentExecutor::get_next_internal 会驱动整个 ExecNode 树的执行,会自顶向下调用每个 ExecNode 的 get_next 方法,最终数据会从 ScanNode 节点产生,向上层节点传递,每个节点都会按照自己的逻辑处理 RowBatch。PlanFragmentExecutor 在拿到每个 RowBatch 后,如果是中间结果,就会将数据传输给其他 BE 节点,如果是最终结果,就会将数据传输给 FE 节点。

五、参考献文

  • Apache Doris Join原理

https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F

  • Apache Doris 存储层设计

https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html

  • Apache Doris 元数据设计

https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584

  • Apache Doris 查询原理

https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258

六、实践分享

七、总结

本文主要介绍查询 SQL 在 Apache Doris Fe 节点经历 parse、analyze、rewrite、GenerateQueryPlan、schedule、send 等阶段处理。Apache Doris Fe 的 parse、analyze、rewrite 阶段和其他数据库处理过程差不多,本文主要讲解的核心是 GenerateQueryPlan、schedule、send 阶段的原理。我们可以深度了解 Apache Doris Fe 节点对查询 SQL 的优化操作,以及未来遇到相关性能问题不会无从下手。

相关文章
|
8天前
|
存储 SQL Apache
Apache Doris 创始人:何为“现代化”的数据仓库?
3.0 版本是 Apache Doris 研发路程中的重要里程碑,他将这一进展总结为“实时之路”、“统一之路”和“弹性之路”,详细介绍了所对应的核心特性的设计思考与应用价值,揭晓了 2025 年社区发展蓝图
Apache Doris 创始人:何为“现代化”的数据仓库?
|
10天前
|
SQL 存储 数据处理
别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!
别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!
53 1
别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!
|
9天前
|
SQL NoSQL Java
Java使用sql查询mongodb
通过使用 MongoDB Connector for BI 和 JDBC,开发者可以在 Java 中使用 SQL 语法查询 MongoDB 数据库。这种方法对于熟悉 SQL 的团队非常有帮助,能够快速实现对 MongoDB 数据的操作。同时,也需要注意到这种方法的性能和功能限制,根据具体应用场景进行选择和优化。
36 9
|
30天前
|
SQL 存储 人工智能
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
Vanna 是一个开源的 Python RAG(Retrieval-Augmented Generation)框架,能够基于大型语言模型(LLMs)为数据库生成精确的 SQL 查询。Vanna 支持多种 LLMs、向量数据库和 SQL 数据库,提供高准确性查询,同时确保数据库内容安全私密,不外泄。
105 7
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
|
2月前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
25天前
|
SQL 存储 Apache
Apache Doris 3.0.3 版本正式发布
亲爱的社区小伙伴们,Apache Doris 3.0.3 版本已于 2024 年 12 月 02 日正式发布。该版本进一步提升了系统的性能及稳定性,欢迎大家下载体验。
|
2月前
|
SQL Java
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
39 8
|
2月前
|
SQL 安全 PHP
PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全
本文深入探讨了PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全。
65 4
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
10天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析

热门文章

最新文章

推荐镜像

更多