浅谈 Apache Doris FE 处理查询 SQL 源码解析

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 浅谈 Apache Doris FE 处理查询 SQL 源码解析

一、前言

在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment

二、名词解释

  • FE:Frontend,即 Doris 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
  • BE:Backend,即 Doris 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
  • slot:计算槽,是一个资源单位, 只有给 task 分配了一个 slot 之后, 这个 task 才可以运行
  • planNode : 逻辑算子
  • planNodeTree: 逻辑执行计划

三、执行流程

640.jpg

四、Apache Doris 查询原理

(一)SQL 接收

本文只说 mysql 协议如何接收 SQL 语句, 如果感兴趣的同学可以看看 Apache Doris FE Web 的 Rest Api。Apache Doris 兼容 Mysql 协议,用户可以通过 Mysql 客户端和其他支持 Mysql 协议的工具向 Doris 发送查询请求。MysqlServer Listener() 负责监听客户端发送来的 Mysql 连接请求,每个连接请求都被封装成一个 ConnectContext 对象,并被提交给 ConnectScheduler。ConnectScheduler 会维护一个线程池,每个 ConnectContext 会在线程池中由一个 ConnectProcessor 线程处理。

  • MysqlServer 类 Listener 处理:
private class Listener implements Runnable {
        @Override
        public void run(){while (running && serverChannel.isOpen()) {
                SocketChannel clientChannel;
                try {clientChannel = serverChannel.accept();
                    if (clientChannel == null) {continue;}
                    // 构建 ConnectContext 对象
                    ConnectContext context = new ConnectContext(clientChannel);
                    // catelog 日志
                    context.setCatalog(Catalog.getCurrentCatalog());
                    // 向 ExecutorService 提交 new LoopHandler(context) ==>(源码)executor.submit(new LoopHandler(context))
                    if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
                        // clear up context
                        context.cleanup();}
                } catch (IOException e) {
                    // ClosedChannelException
                    // AsynchronousCloseException
                    // ClosedByInterruptException
                    // Other IOException, for example "to many open files" ...
                    LOG.warn("Query server encounter exception.", e);
                    try {Thread.sleep(100);
                    } catch (InterruptedException e1) {// Do nothing}
                } catch (Throwable e) {
                    // NotYetBoundException
                    // SecurityException
                    LOG.warn("Query server failed when calling accept.", e);
                }
            }
        }
    }
  • ExecutorService 线程 LoopHandler 处理:
@Override
        public void run() {
            try {
                // Set thread local info
                context.setThreadLocalInfo();
                context.setConnectScheduler(ConnectScheduler.this);
                // authenticate check failed.
                if (!MysqlProto.negotiate(context)) {return;}
                if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);
                } else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
                    MysqlProto.sendResponsePacket(context);
                    return;
                }
                context.setStartTime();
                ConnectProcessor processor = new ConnectProcessor(context);
                processor.loop();} catch (Exception e) {
                // for unauthorized access such lvs probe request, may cause exception, just log it in debug level
                if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);
                } else {LOG.debug("connect processor exception because", e);
                }
            } finally {unregisterConnection(context);
                context.cleanup();}
        }
  • processOnce(读取 Mysql 客户端的 sql) 方法
// 处理 mysql 的请求
    public void processOnce()throws IOException {ctx.getState().reset();
        executor = null;
        // 重置 MySQL 协议的序列号
        final MysqlChannel channel = ctx.getMysqlChannel();
        channel.setSequenceId(0);
        // 从通道读取数据包 ==>SQL
        try {packetBuf = channel.fetchOnePacket();
            if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
                throw new IOException("Error happened when receiving packet.");
            }
        } catch (AsynchronousCloseException e) {
            // when this happened, timeout checker close this channel
            // killed flag in ctx has been already set, just return
            return;
        }
        // 下发 SQL 
        dispatch();
        // finalize
        finalizeCommand();
        ctx.setCommand(MysqlCommand.COM_SLEEP);
    }

(二)Parse

ConnectProcessor 接收到 SQL 之后会进行 analyze ,Apache Doris SQL 解析使用的 Parse 是 Java CUP Parser,语法规则 定义的文件在 sql_parser.cup。

感兴趣的同学可以详细看一下 StatementBase 类

  • analyze 方法, 返回 List(这里主要是语法解析)
// 解析 origin,返回 list<stmt>
    private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);
        // 使用 CUP&FLEX 生成的解析器解析语句
        SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
        SqlParser parser = new SqlParser(input);
        try {return SqlParserUtils.getMultiStmts(parser);
        } catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
        } catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);
            LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
            if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);
            }
        } catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
            // should be removed this try-catch clause future.
            throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
        }
    }

因为本文讲述的是查询语句(不同类型会转换成不通 Stmt,比如 InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等),最后我们会得到 QueryStmt,originStmt 会转换成 QueryStmt,QueryStmt 通常是用 SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement 组成

(三)Analyze

SQL 语句被解析成 AST 之后,会被交给 StmtExecutor 。StmtExecutor 会首先对 AST 进行语法和语义分析,大概会做下面的事情:

  1. 检查并绑定 Cluster, Database, Table, Column 等元信息。
  2. SQL 的合法性检查:窗口函数不能 DISTINCT,HLL 和 Bitmap 列不能 sum, count, where 中不能有 grouping 操作等。
  3. SQL 重写:比如将 select * 扩展成 select 所有列,count distinct 查询重写等。
  4. Table 与 Column 别名处理。
  5. 为 Tuple, Slot, Expr 等分配唯一 ID。
  6. 函数参数的合法性检测。
  7. 表达式替换。
  8. 类型检查,类型转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)。

主要代码:

analyzeAndGenerateQueryPlan 方法 -->  parsedStmt.analyze(analyzer);

(四)Rewrite

  • analyzeAndGenerateQueryPlan 方法(部分代码,此处不做重点讲解)

StmtExecutor 在对 AST 进行语法和语义分析后,会让 ExprRewriter 根据 ExprRewriteRule 进行一次 Rewrite。目前 Doris 的重写规则比较简单,主要是进行了常量表达式的化简和谓词的简单处理。常量表达式的化简是指 1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。

如果重写后,有部分节点被成功改写,比如, 1 > 2 被改写成 Flase,那么就会再触发一次语法和语义分析的过程。

对于有子查询的 SQL,StmtRewriter 会进行重写,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。

if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();
            rewriter.reset();
            if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);
            }
            // explan 标签
            ExplainOptions explainOptions = parsedStmt.getExplainOptions();
            boolean reAnalyze = false;
            parsedStmt.rewriteExprs(rewriter);
            reAnalyze = rewriter.changed();
            if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
                reAnalyze = true;
            }
            if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}
            }
            if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
                for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}
                }
            }
            if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
                if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}
            }
            if (reAnalyze) {
                // 对重写语句进行处理
                List<Type> origResultTypes = Lists.newArrayList();
                for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());
                }
                List<String> origColLabels =
                        Lists.newArrayList(parsedStmt.getColLabels());
                // 重写语句进行 analyzer
                analyzer = new Analyzer(context.getCatalog(), context);
                // 重写语句 analyzer 信息
                parsedStmt.reset();
                parsedStmt.analyze(analyzer);
                // 恢复原始结果类型和列标签
                parsedStmt.castResultExprs(origResultTypes);
                parsedStmt.setColLabels(origColLabels);
                if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());
                }
                if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);
                }
            }
        }

(五)SingleNodePlan

经过 parse、Analyze、Rewrite 阶段后,AST 会生成 singleNodePlanner,源码如下:

singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();

单机 Plan 由 SingleNodePlanner 执行,输入是 AST,输出是单机物理执行 Plan, Plan 中每个节点是一个 PlanNode。

SingleNodePlanner 核心任务就是根据 AST 生成 OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode 等。

Doris 在生成单机 Plan 的时候主要进行了以下工作或优化

  1. Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化
//Slot物化,处理 Base表
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
// Slot物化 处理 where 语句的子查询
selectStmt.materializeRequiredSlots(analyzer);
  1. 投影下推:BE 在 Scan 时只会 Scan 必须读取的列
projectPlanNode(resultSlotIds, root);
  1. 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点
pushDownPredicates(analyzer, selectStmt);
  1. 分区,分桶裁剪:比如建表时按照 UserId 分桶,每个分区 100 个分桶,那么当不包含 or 的 Filter 条件包含 UserId ==xxx 时,Doris 就只会将查询发送 100 个分桶中的一个发送给 BE,可以大大减少不必要的数据读取
  2. Join Reorder:对于 join操作,在保证结果不变的情况,通过规则计算最优(最少资源)join 操作。
createCheapestJoinPlan(analyzer, refPlans);
  1. Sort + Limit 优化成 TopN(FE 进行useTopN标识,BE标识执行)
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
  1. MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的 MaterializedView
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
  1. 向量化执行引擎选择:基于现代CPU的特点与火山模型的执行特点,重新设计列式存储系统的SQL执行引擎,从而提高了CPU在SQL执行时的效率,提升了SQL查询的性能。
if (VectorizedUtil.isVectorized()) {
            singleNodePlan.convertToVectoriezd();
    }
  1. Runtime Filter Join:Doris 在进行 Hash Join 计算时会在右表构建一个哈希表,左表流式的通过右表的哈希表从而得出 Join 结果。而 RuntimeFilter 就是充分利用了右表的 Hash 表,在右表生成哈希表的时,同时生成一个基于哈希表数据的一个过滤条件,然后下推到左表的数据扫描节点
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());

创建 singleNodePlanner 主要代码:createSingleNodePlan()

(六)DistributedPlan

分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由 PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。

640.jpg

每个 PlanFragment 由 PlanNodeTree 和 Data Sink 组成,我们从上图的 Plan Fragment 2 可以看出,由 AggregationNode、HashJoinNode 和 DataSink。Plan 分布式化的方法是增加 ExchangeNode,执行计划树会以 ExchangeNode 为边界拆分为 PlanFragment。

ExchangeNode 主要是用于 BE 之间的数据交换与共享,类似 Spark 和 MR 中的 Shuffle。

各个 Fragment 的数据流转和最终的结果发送依赖:DataSink。比如 DataStreamSink 会将一个 Fragment 的数据发送到另一个 Fragment 的 ExchangeNode,ResultSink 会将查询的结果集发送到 FE。

每个 PlanFragment 可以在每个 BE 节点生成 1 个或多个执行实例,不同执行实例处理不同的数据集,通过并发来提升查询性能。

DistributedPlanner 中最主要的工作是决定 Join 的分布式执行策略:Shuffle Join,Bucket Join,Broadcast Join,Colocate Join,和增加 Aggregation 的 Merge 阶段。

决定 Join 的分布式执行策略的逻辑如下:

如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join 如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。

如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join 如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。

640.jpg

(七)Schedule

生成了 Plan Fragment Tree 之后,Apache Doris FE 通过 Coordinator 类对 Fragment 进行分配、分发步骤,主要涉及的方法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。

640.jpg

  • computeScanRangeAssignment():主要逻辑对fragment合理分配,尽可能保证每个BE节点的请求都是平均。
  • computeFragmentExecParams():处理Fragment执行参数。
  • sendFragment():发送Fragment至BE节点,

(八)Execute

Doris 的查询执行模式 Volcano 模式,不过做了 Batch 的优化,不同的 operator 之间以 RowBatch 的方式传输数据。

BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理。FragmentMgr::exec_plan_fragment 会启动一个线程由 PlanFragmentExecutor 具体执行一个 plan fragment。PlanFragmentExecutor 会根据 plan fragment 创建一个 ExecNode 树,FE 每个 PlanNode 都会对应 ExecNode 的一个子类。

PlanFragmentExecutor::get_next_internal 会驱动整个 ExecNode 树的执行,会自顶向下调用每个 ExecNode 的 get_next 方法,最终数据会从 ScanNode 节点产生,向上层节点传递,每个节点都会按照自己的逻辑处理 RowBatch。PlanFragmentExecutor 在拿到每个 RowBatch 后,如果是中间结果,就会将数据传输给其他 BE 节点,如果是最终结果,就会将数据传输给 FE 节点。

五、参考献文

  • Apache Doris Join原理

https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F

  • Apache Doris 存储层设计

https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html

  • Apache Doris 元数据设计

https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584

  • Apache Doris 查询原理

https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258

六、实践分享

七、总结

本文主要介绍查询 SQL 在 Apache Doris Fe 节点经历 parse、analyze、rewrite、GenerateQueryPlan、schedule、send 等阶段处理。Apache Doris Fe 的 parse、analyze、rewrite 阶段和其他数据库处理过程差不多,本文主要讲解的核心是 GenerateQueryPlan、schedule、send 阶段的原理。我们可以深度了解 Apache Doris Fe 节点对查询 SQL 的优化操作,以及未来遇到相关性能问题不会无从下手。

相关文章
|
1天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
2天前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
21 10
|
2天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
2天前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
6天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
28 2
|
10天前
|
SQL 数据库 开发者
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
|
17天前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
17天前
|
SQL 监控 数据库
SQL语句是否都需要解析及其相关技巧和方法
在数据库管理中,SQL(结构化查询语言)语句的使用无处不在,它们负责数据的查询、插入、更新和删除等操作
|
6天前
|
SQL 关系型数据库 MySQL
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
14 0
|
17天前
|
SQL 数据可视化 BI
SQL语句及查询结果解析:技巧与方法
在数据库管理和数据分析中,SQL语句扮演着至关重要的角色