别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!

简介: 别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!

是否遇到过忙碌餐厅里的场景?一群服务小哥哥和小姐姐来回奔波,却总有顾客等得不耐烦。其实数据库查询也是这样,如果不好好安排"工作流程",查询请求也会跟餐厅里的客人一样干着急。
Apache Doris的并行执行好比是餐厅里的总管家,它把大任务(Plan)分成几个区域(Fragment),每个区域都有专门的服务小组(Pipeline),每个小组又分配了具体的服务同学(Task),服务的同学再进行具体的服务行为(Operator)。
通过这种科学的分工方式,数据处理从"排队等待"变成了"多窗口并行"。这就是Doris中的Pipeline执行模型,一个让数据处理不再"望队兴叹"的方案!

image.png

背景介绍

Doris的并行执行模型是一种Pipeline 执行模型,主要参考了Hyper论文中Pipeline的实现方式:

https://db.in.tum.de/~leis/papers/morsels.pdf

Pipeline 执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。

它的具体设计、实现和效果可以参阅 DSIP-027 以及 DSIP-035。

Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML 语句的并行处理。

物理计划

为了更好的理解Pipeline 执行模型,首先需要介绍一下物理查询计划中两个重要的概念:PlanFragment和PlanNode。

我们使用下面这条SQL 作为例子:

SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);

FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就是一个PlanNode:

image.png

  • ScanNode A :扫描节点,从本地磁盘读取表A的数据
  • ScanNode B :扫描节点,从本地磁盘读取表B的数据
  • JoinNode :基于A.k2 = B.k2 对A和B表进行Join计算
  • AggNode :即 GROUP BY k1的聚合计算
  • SortNode :即 ORDER BY SUM(v1)的排序计算

由于Doris 是一个MPP的架构,每个查询都会尽可能的让所有的BE 都参与进来并行执行,来降低查询的延时。

所以还需要将上述逻辑计划拆分为一个物理计划,拆分物理计划基本上就是在逻辑计划中插入了DataSink(数据发送的节点)和ExchangeNode(接收Sink发送数据的节点),通过这两个Node完成了数据在多个BE 之间的Shuffle。

拆分完成后,每个PlanFragment 相当于包含了一部分PlanNode,可以作为一个独立的任务发送给BE,每个BE 完成了PlanFragment内包含的PlanNode的计算后,通过DataSink和ExchangeNode 这两个算子把数据shuffle到其他BE上来进行接下来的计算。

image.png

所以Doris的规划分为3层:

PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。

FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。

PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等

Pipeline 执行

PlanFragment 是FE 发往BE 执行任务的最小单位。

BE可能会收到同一个Query的多个不同的PlanFragment,每个PlanFragment都会被单独的处理。在收到PlanFragment 之后,BE会把PlanFragment 拆分为多个Pipeline,进而启动多个PipelineTask 来实现并行执行,提升查询效率。

image.png

Pipeline

一个Pipeline 有一个SourceOperator 和 一个SinkOperator 以及中间的多个其他Operator组成。

SourceOperator 代表从外部读取数据,可以是一个表(OlapTable),也可以是一个Buffer(Exchange)。

SinkOperator 表示数据的输出,输出可以是通过网络shuffle到别的节点,比如DataStreamSinkOperator,也可以是输出到HashTable,比如Agg算子,JoinBuildHashTable等。

image.png

多个Pipeline 之间实际是有依赖关系的,以JoinNode为例,他实际被拆分到了2个Pipeline 里。其中Pipeline-0是读取Exchange的数据,来构建HashTable;Pipeline-1 是从表里读取数据,来进行Probe。

这2个Pipeline 之间是有关联关系的,只有Pipeline-0运行完毕之后才能执行Pipeline-1。这两者之间的依赖关系,称为Dependency。当Pipeline-0 运行完毕后,会调用Dependency的set_ready 方法通知 Pipeline-1 可执行。

PipelineTask

Pipeline 实际还是一个逻辑概念,他并不是一个可执行的实体。在有了Pipeline之后,需要进一步的把Pipeline 实例化为多个PipelineTask。将需要读取的数据分配给不同的PipelineTask 最终实现并行处理。

同一个Pipeline的多个PipelineTask 之间的Operator 完全相同,他们的区别在于Operator的状态不一样,比如读取的数据不一样,构建出的HashTable 不一样,这些不一样的状态,我们称之为LocalState。

每个PipelineTask 最终都会被提交到一个线程池中作为独立的任务执行。在Dependency 这种触发机制下,可以更好的利用多核CPU,实现充分的并行。

Operator

在大多数时候,Pipeline 中的每个Operator 都对应了一个PlanNode,但是有一些特殊的算子除外:

  • JoinNode,被拆分为JoinBuildOperator和JoinProbeOperator
  • AggNode 被拆分为AggSinkOperator和AggSourceOperator
  • SortNode 被拆分为SortSinkOperator 和 SortSourceOperator

基本原理是,对于一些breaking 算子(需要把所有的数据都收集齐之后才能运算的算子),把灌入数据的部分拆分为Sink,然后把从这个算子里获取数据的部分称为Source。

Scan 并行化

扫描数据是一个非常重的IO 操作,它需要从本地磁盘读取大量的数据(如果是数据湖的场景,就需要从HDFS或者S3中读取,延时更长),需要比较多的时间。

所以我们在ScanOperator 中引入了并行扫描的技术,ScanOperator会动态的生成多个Scanner,每个Scanner 扫描100w-200w 行左右的数据,每个Scanner 在做数据扫描时,完成相应的数据解压、过滤等计算任务,然后把数据发送给一个DataQueue,供ScanOperator 读取。

image.png

通过并行扫描的技术可以有效的避免由于分桶不合理或者数据倾斜导致某些ScanOperator 执行时间特别久,把整个查询的延时都拖慢的问题。

Local Shuffle

在Pipeline执行模型中,Local Exchange作为一个Pipeline Breaker出现,是在本地将数据重新分发至各个执行任务的技术。

它把上游Pipeline输出的全部数据以某种方式(HASH / Round Robin)均匀分发到下游Pipeline的全部Task中。解决执行过程中的数据倾斜的问题,使执行模型不再受数据存储以及plan的限制。

接下来我们举例来说明Local Exchange的工作逻辑。

我们用上述例子中的Pipeline-1为例子进一步阐述Local Exchange如何可以避免数据倾斜。

image.png

如上图所示,首先,通过在Pipeline 1中插入Local Exchange,我们把Pipeline 1进一步拆分成Pipeline 1-0和Pipeline 1-1。

此时,我们不妨假设当前并发等于3(每个Pipeline有3个task),每个task读取存储层的一个bucket,而3个bucket中数据行数分别是1,1,7。

则插入Local Exchange前后的执行变化如下:

image.png

从图右可以看出,HashJoin和Agg算子需要处理的数据量从(1,1,7)变成了(3,3,3)从而避免了数据倾斜。

在Doris中,Local Exchange根据一系列规则来决定是否被规划,例如当查询耗时比较大的Join、聚合、窗口函数等算子需要被执行时,我们就需要使用Local Exchange来尽可能避免数据倾斜。

结语

观Doris并行之道,如观一出精妙戏剧。

Pipeline化整为零,却又聚零为整;看似繁复,实则有序。CPU多核齐飞,似群贤毕至;数据分工有序,如众星拱月。这不正印证了"人心齐,泰山移"的古训?Pipeline执行模型,不过是让数据流转遵循自然之道!

下期,我们将一起探讨其它更有趣有用有价值的内容,敬请期待!

相关文章
|
7月前
|
消息中间件 OLAP Kafka
Apache Doris 实时更新技术揭秘:为何在 OLAP 领域表现卓越?
Apache Doris 为何在 OLAP 领域表现卓越?凭借其主键模型、数据延迟、查询性能、并发处理、易用性等多方面特性的表现,在分析领域展现了独特的实时更新能力。
677 9
|
6月前
|
存储 自然语言处理 分布式计算
Apache Doris 3.1 正式发布:半结构化分析全面升级,湖仓一体能力再跃新高
Apache Doris 3.1 正式发布!全面升级半结构化分析,支持 VARIANT 稀疏列与模板化 Schema,提升湖仓一体能力,增强 Iceberg/Paimon 集成,优化存储引擎与查询性能,助力高效数据分析。
829 4
Apache Doris 3.1 正式发布:半结构化分析全面升级,湖仓一体能力再跃新高
|
7月前
|
存储 分布式计算 Apache
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
小米通过将 Apache Doris(数据库)与 Apache Paimon(数据湖)深度融合,不仅解决了数据湖分析的性能瓶颈,更实现了 “1+1>2” 的协同效应。在这些实践下,小米在湖仓数据分析场景下获得了可观的业务收益。
1266 9
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
|
7月前
|
人工智能 运维 监控
智能运维与数据治理:基于 Apache Doris 的 Data Agent 解决方案
本文基于 Apache Doris 数据运维治理 Agent 展开讨论,如何让 AI 成为 Doris 数据运维工程师和数据治理专家的智能助手,并在某些场景下实现对人工操作的全面替代。这种变革不仅仅是技术层面的进步,更是数据运维治理思维方式的根本性转变:从“被动响应”到“主动预防”,从“人工判断”到“智能决策”,从“孤立处理”到“协同治理”。
1158 11
智能运维与数据治理:基于 Apache Doris 的 Data Agent 解决方案
|
6月前
|
SQL 人工智能 数据挖掘
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
Apache Doris 4.0 原生集成 LLM 函数,将大语言模型能力深度融入 SQL 引擎,实现文本处理智能化与数据分析一体化。通过十大函数,支持智能客服、内容分析、金融风控等场景,提升实时决策效率。采用资源池化管理,保障数据一致性,降低传输开销,毫秒级完成 AI 分析。结合缓存复用、并行执行与权限控制,兼顾性能、成本与安全,推动数据库向 AI 原生演进。
605 0
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
|
7月前
|
SQL 存储 运维
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
本文介绍了 Apache Doris 在菜鸟的大规模落地的实践经验,菜鸟为什么选择 Doris,以及 Doris 如何在菜鸟从 0 开始,一步步的验证、落地,到如今上万核的规模,服务于各个业务线,Doris 已然成为菜鸟 OLAP 数据分析的最优选型。
446 2
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
|
7月前
|
SQL 存储 JSON
Apache Doris 2.1.10 版本正式发布
亲爱的社区小伙伴们,Apache Doris 2.1.10 版本已正式发布。2.1.10 版本对湖仓一体、半结构化数据类型、查询优化器、执行引擎、存储管理进行了若干改进优化。欢迎大家下载使用。
327 5
|
7月前
|
人工智能 自然语言处理 数据挖掘
Apache Doris 4.0 AI 能力揭秘(一):AI 函数之 LLM 函数介绍
在即将发布的 Apache Doris 4.0 版本中,我们正式引入了一系列 LLM 函数,将前沿的 AI 能力与日常的数据分析相结合,无论是精准提取文本信息,还是对评论进行情感分类,亦或生成精炼的文本摘要,皆可在数据库内部无缝完成。
496 0
Apache Doris 4.0 AI 能力揭秘(一):AI 函数之 LLM 函数介绍
|
8月前
|
SQL 人工智能 数据挖掘
Apache Doris + MCP:Agent 时代的实时数据分析底座
数据不再是静态的存储对象,而是流动的智能资源;数据库不再是单纯的存储系统,而是智能化的服务平台。Apache Doris 以其在 AI 方向的深度布局和技术创新,正在成为连接数据与智能的重要桥梁。
1598 0
Apache Doris + MCP:Agent 时代的实时数据分析底座
|
7月前
|
存储 人工智能 Apache
ApacheCon 2025中国开源年度报告:Apache Doris 国内第一
在 Apache 基金会管理的近 300 个顶级项目中,Doris 已经成为仅次于 Apache Airflow 的全球第二大影响力项目。
402 0

推荐镜像

更多