Adaptive and Big Data Scale Parallel Execution in Oracle

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
简介: 在上篇文章中,主要讨论了SQL Server的MPP数仓系统PDW的分布式优化过程,PolarDB的并行优化从中有所借鉴,本篇文章主要看下这篇介绍Oracle并行执行策略的paper,因为在PolarDB的分布式执行策略中,有很多与其有所重叠。

在上篇文章中,主要讨论了SQL Server的MPP数仓系统PDW的分布式优化过程,PolarDB的并行优化从中有所借鉴,本篇文章主要看下这篇介绍Oracle并行执行策略的paper,因为在PolarDB的分布式执行策略中,有很多与其有所重叠。

总体介绍

这篇paper重要介绍了Oracle在单机/RAC环境下,对原有的并行执行策略做了一些重要的改进,包括

  1. 改进了并行执行方式,引入了multi-stage parallelization,使其具有更好的扩展性,更能抵抗data skew。
  2. 基于执行中实时收集的准确信息而自适应的调整并行执行方式,避免过于依赖优化器的estimation而产生次优的计划,提高执行效率。
  3. 对一些串行执行的算子,也实现了并行化。

paper中主要讨论了分析型查询最为常见的3个算子:group by / window function / join 。

Parallelism in Oracle

Oracle的并行执行在刚推出时是比较简单的,只是针对table scan算子实现了并行化,后续不断完善,实现了更加丰富的multiple-slice plan。(值得一提的是,PolarDB的并行查询也遵循类似的发展路线,从单机内的表级并行,到单机多slice并行,再到多机分布式并行

根据paper的描述和一些相关资料 介绍,Oracle的并行优化将operators划分为若干slice,用DFO(Data Flow Operator) 来描述plan segment之间的数据流动,一个DFO描述了一组producer -> consumer的关系。

有全局的QC(Query Coordinator)来负责生成执行计划,下发计划片段,并调度各个worker process执行,plan中串行的部分也可以在QC上完成。为了能够限制并行执行占用的CPU资源,避免并发query的相互干扰,QC在调度中会保证,任一时间只有一组producer + consumer在工作,当producer完成子计划的执行和数据传输,其worker进程将被复用作为后续新的consumer,而原来的consumer将成为新的producer向后传递数据,这样就严格限定了CPU的占用数量最大只能是DoP * 2。(由此也可以看出,在消费端是有一定的buffer机制的)

Data redistribution的方式有broadcast / hash / range / random等,一般情况下,数据如何分发在优化阶段optimizer就已静态决定。

Handle Group By

  • 原有的并行执行方式

先做Group By pushdown,在下层DFO的本地,各自做local aggregation,然后再按照group by key做redistribution,然后再做二次aggregation,这种方法称为Group By PushDown (GPD)。

image.png

是否做GPD,由优化器估计的input rows estimation和group num estimation来决定,如果local aggregation可以有比较好的reduction效果(聚合效果较好),就可以减少redistribute的数据量,且可以一定程度抵抗data skew,但代价则是多了一轮local aggregation的计算,因此这里有个trade-off,效果好坏取决于local aggregation是否能很好地缩减数据量。

同时,为了避免disk IO的代价,在做pushdown local aggregation时,一旦memory不足,就停止聚集,直接发送已有部分聚集结果,后续rows直接pass-through发送,相当于关闭了GPD,这样不会影响数据的正确性因为上面还有2次聚集。

这种方法在MPP系统中很常见,也是PolarDB的一种可选的并行执行方式,但其存在2个主要问题

  1. 依赖optimizer estimation来决定是否GPD,因此可能不准确,尤其是local aggregation后的cardinality。
  2. 判断是否关闭GPD的时机和条件不合理:如果GPD确实没有太多收益(reduction不足),但memory一直足够,则也会一直做GPD,浪费了资源。 如果GPD是有收益(reduction明显),但突然由于并发query到来内存不足,就关闭掉了GPD,后续无法恢复,也不合理。
  • 新并行方式的改进

1)不依赖于优化器估计,在执行中根据收集的input rows / group num的准确统计,动态决定是否GPD。

2)只要reduction比较有效,就继续进行GPD。

3)如果内存不足,采用batch flushing,将已在内存中的group batch发送出去,清空内存后,继续build新的patch。

4)针对高并发的场景(memory可用资源会变化),引入hybrid batch flushing(HBF),当内存不足时,不立即发送已有batch,而是保持在内存中,对后续data row,如果仍在batch group中,则继续更新这组的aggregation结果,否则直接发送出去。同时实时跟踪batch的reduction情况,如果reduction不好,则直接发送该batch,构造新batch,也可以类似LRU,替换batch中命中低的group,保留最频繁的group,达到好的reduction。

这样面对内存波动时,即使突然内存不足也不会立即停止GPD,而且保存batch观察一阵再决定。

5)判断是否开始调整GPD的时机,不再是之前的memory不足时,而是更早,当batch大小达到L2 cache size时,根据动态收集的reduction情况,如果效果好,则不做限制,否则进入HBF mode,不再增加占用内存,如果HBF后观察一段发现reduction效果始终不好,则停止GPD。

可以看到执行方式灵活了很多,有了很强的基于数据的自适应能力。

Handle Window Function

在很多分析场景中,window function被越来越多的使用,其基本概念可以参考MySQL手册

12.21.2 Window Function Concepts and Syntax

而且,目前大多数commercial optimizer和一些优秀的开源系统中,都提供了用window function做query transformation的能力,Oracle自然不例外,内部用wf做self-join elimination和subquery unnesting。(PolarDB MySQL在MySQL传统优化器上进行了增强,也实现了subquery unnesting to window function)

和MySQL类似,window function在Oracle中也是通过sorting来实现的,这里称为window sorting。根据其语义特点,wf被分为了3个类别:

reporting window function

针对一个partition内做聚集计算,例如

SELECT /*y:year q:quarter m:month d:day*/
y, q, m, d, sales,
SUM(sales) OVER (PBY y,q,m) msales,
SUM(sales) OVER (PBY y,q) qsales,
SUM(sales) OVER (PBY y) ysales
FROM fact f;
  • 原有并行方式

基于common partition by key做data distribution,在各个worker process中独立计算window sort。

image.png

其主要问题是,扩展性受限于partition key NDV限制,如果NDV < DoP,则会有些worker process没有事做。

  • 新的并行方式:

加入了2种新的并行执行方式,都会将wf的并行变为多阶段模式。

1)extended partition key

由于执行是利用common partition key做分布,可以通过扩展加入更多partition key来增加NDV,从而增加并行度(也可以一定程度抵抗data skew),利用extended partition key做redistribution之后,在各个worker上则只有原partition的部分数据,但仍按原partition做window sort计算,得到局部聚集的结果。

image.png

上图中Window sort表示了基于部分数据聚集的过程,由于扩展了partition key,每个window sort进程只能看到属于一个partition的一部分聚集结果,这是不完整的,需要在上层的Window Consolidator获取全局结果,这个汇总过程分为2个步骤:

step.1 各个Window sort process将本地聚集结果broadcast到上层的window consolidator进程组中,consolidator processes各自计算全量的聚集结果,并构建hash table : partition key -> 聚集结果,注意这里是指发送聚集结果(partition key + 局部聚集结果),而不是全部表数据。

step.2 将原始data row,random分发到上层window consolidator中,由于每个consolidator进程都已经有了最终的结果,只要到hash table中look up一下,拼接上即可。

2)window pushdown

与extended partition key类似,只是各个worker process先在table scan的同时,做window sort计算,这时也仍然得到的是部分partition数据的local aggregation结果。

然后broadcast -> consolidate -> random 分发 -> lookup拼接最终结果,步骤相同。

image.png

新的并行方式增加了一次数据distribution ,和hash table lookup的成本,但可以扩展更高的并行度,从而获得更好性能。

具体采用original / extended partition key / window pushdown的哪一种策略,取决于优化器的estimation。如果原始partition key具有足够的NDV值,则采用原始并行方式,否则尝试扩展partition key并估算NDV,如果满足要求则采用extended partition key方式,如果扩展后,仍无法产生足够的NDV值,采用window pushdown 方式。

  • 对optimizer error的处理

如果optimizer estimation不准确,可能会产生严重性能问题(worker空闲),因此需要执行中的自适应能力:

初始时仍根据NDV估计,决定做extended / pushdown,但后续执行中实时收集partition key的NDV,如果实际值足够大,则调整为原始的基于repartition key做data distribution的方式,这时原来的window sort步骤(extended/pushdown)变为pass-through,原来的window consolidator则完成实际的计算工作。

cumulative/ranking window function

类似MySQL window function中frame的概念,计算是在一个partition内的frame中进行的,frame定义了以当前行为基准的滑动窗口。

SELECT prod_id, date, sales,
SUM(sales) OVER (PBY prod_id OBY date),
RANK() OVER (PBY prod_id OBY sales)
FROM fact f;
  • 原有并行方式

Oracle在选择wf并行方式时有一条heuristic rule,将具有common partition key的wf放在一个slice内计算,这样,以上的2个wf的执行方式如下:

image.png

也是基于partition key做data distribution,在各个worker process中独立依次计算window sort,因此也有扩展性受限于NDV的问题。

  • 新并行方式

和extended partition by的思路类似,现在有了现成的order by key,可以基于order by来扩展distribution key。决策也是在optimize阶段,基于NDV做出。以上述SQL为例,假设NDV(prod_id) < DoP,则计算NDV(prod_id, date)和NDV(prod_id, sales)。如果后两个满足DoP要求,则并行计划变为:

image.png

和reporting wf不同,这种wf要求输入数据的有序性,因此在第一步根据[prodid,date]进行partition时,需要做range partition!从而保证从全局来看,各个local window sort中的数据整体形成一个按[prod_id + date ]有序的序列,类似下图(假设dop = 4)

image.png

各个window sort进程中,数据按prod_id + date有序

为了后续的全局有序性,这里需要一个关键的同步过程:

1)每个window sort process先对负责的局部partition数据,计算first partition first row的结果 + last partition last row的结果,并发送给QC。

2)QC收集每个window sort的信息做consolidate,由于是range的,所以每个worker process的首/尾信息就足够,然后将合并之后的信息(如下图)发送回各个worker process。

image.png

3)每个worker process根据QC发回的每个worker process的首/尾信息,就可以汇总得到完整的partition聚集结果,以上图process 2为例,它收到的首/尾信息中包括了process1对于P1的汇总结果100和process 3/4中对于P2的汇总结果50/150,就能得到P1 , P2两个分组全局的聚集结果了。

paper中没有提到这种类型的wf怎么做adaptive的并行执行,应该思路和reporting wf类似,如果发现partition key的NDV足够大,就可以退回原始的执行方式,distribution采用partition by key做range partition,然后各个window sort直接计算整个partition结果,不需要QC参与。

Handle Join

SELECT t.year, t.quarter, f.sales
FROM time_dim t, fact f
WHERE t.time_key = f.time_key;
  • 原始并行方式

通过optimizer对于Input两侧的cardinality estimation,决定是做hash-hash方式(数据量较大),还是broadcast-random方式(一侧数据量较少)。

image.png

image.png

但这种对于optimizer依赖较大,如果估计不准,比如高估了card,则会采用hash-hash,导致某些worker process工作量很小,如果低估了card,则采用broadcast-random,导致分发大量数据。

  • 新并行方式

思路仍然是adaptive,在build侧加入"statistics collector"算子。这个算子会buffer住一定的rows(此时还没有分发+join),当rows数量到达一定阈值(2 * DOP )或input结束时,则通知QC数据量,QC根据各个worker process汇总的数据量,决定是做broadcast + random 还是hash + hash。

image.png

QC决策后会通知各个worker process,此后,statistics collector算子将变为pass-through,不起作用,这时就不需要依赖于optimizer estimation了。

之所以选择2* DOP的rows阈值,是为了让小表足够小,可以build cache-resident hash table做join。根据paper的介绍,目前自适应的方式是从hash - hash 调整为 broadcast - random,因为这样只需要去掉一个DFO,比较简单,反向调整则需要增加一个DFO,相对困难。(对比Figure 10/11)

Handle small tables

对于broadcast的方式,小表还是要发送数据的,而且随着DoP的升高,网络负载也成比例上涨,为了减少网络开销,利用Oracle RAC本身share disk的能力,将小表读入buffer pool并pin住禁止其换出,这样后续join可以在一个slice内完成,减少了一次网络分发和一组worker process。 

Handle serialization point

在分布式计划中,可能会存在一些需要串行执行才能保证正确性的算子,称为serialization point,例如Top-N,UDF计算。。

  • 原有处理方式

老版本中,对于这种serialization point,需要在QC上完成。这样有2个缺点

  1. 执行本身会消耗资源,因此这会影响QC调度整个并行计划
  2. 后续的operator即使可以并行,也只能在QC上串行执行,影响扩展性
  • 新并行方式

新的并行方式引入了Back-to-parallel + Single-Server DFO的功能 

1)Single-Server DFO:对于需要串行执行的算子,单独off-load到一个单process的DFO中执行,不影响QC。

2)Back-to-parallel: 如果serialization point之后的算子,optimizer估计可以做并行执行,则仍然分发数据到多个并行process中,后续仍可以并行执行。

总结

paper中讲解了Oracle优化后的并行执行策略,核心在于如何做自适应的调整 + 如何扩展并行度。但没有涉及太多细节,比如怎么在算子内buffer数据,具体如何调度,调整中的算法等。(吐槽一下Oracle的paper,历来只粗浅的描述原理,估计也是实现非常复杂难以展开)

在PolarDB多机并行(多RO节点,资源充足)执行环境下,paper中针对window function提高扩展性的思路还是很具有参考意义的。此外,PolarDB的并行执行目前还缺乏自适应调整的能力,这需要执行器具有统计feedback的能力。由于query feedback loop是查询优化器的一个重要组件,对于statistic维护,plan management,自适应计划调整都有重要意义,PolarDB优化器层已经在这个方面具有了一定能力,后续会不断完善。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
SQL 负载均衡 并行计算
Parallel SQL Execution in Oracle 10g 论文解读
这篇简短的paper从非常high level的角度描述了下Oracle 10g对于parallel query所做的重新设计和其中的一些优化,由于Oracle RAC特殊的share-disk架构,使其在并行计算上与普通的MPP数据库有一些不同,例如对于worker的调度和分配方式以及对于资源/数据的动态调整。
284 0
Parallel SQL Execution in Oracle 10g 论文解读
|
SQL Oracle 关系型数据库
|
SQL 监控 关系型数据库
ORACLE ORA-00020与parallel并行
  6月份巡检中,石家庄Oracle数据库告警日志发现ORA-00020: maximum number of processes (300) exceeded,提取告警时间段的AWR,观察发现:PX Deq: Slave Session Stats等待事件   观察sql统计信息发现insert-select并行的SQL语句,如下所示:  经过与开发核实,发现在一个存储过程包中,有10条insert-select并行sql语句,并且在存储过程的最后一个insert完成做提交。
1329 0
|
Oracle 关系型数据库 数据处理
通过图表分析oracle的parallel性能
并行特性在数据库里对于性能的提升很有帮助,尤其是大批量的数据处理。今天对于并行的性能情况进行了简单的图表分析。 为了能够比较合理的比较数据,对数据库里的2张大表进行了比对分析。
824 0
|
SQL Oracle 关系型数据库
【SQL】Using Oracle's Parallel Execution Features
The full list of Oracle parallel execution features currently includes the following Parallel Query Parallel DML P...
660 0
|
2月前
|
存储 Oracle 关系型数据库
Oracle数据库的应用场景有哪些?
【10月更文挑战第15天】Oracle数据库的应用场景有哪些?
200 64

推荐镜像

更多