在上篇文章中,主要讨论了SQL Server的MPP数仓系统PDW的分布式优化过程,PolarDB的并行优化从中有所借鉴,本篇文章主要看下这篇介绍Oracle并行执行策略的paper,因为在PolarDB的分布式执行策略中,有很多与其有所重叠。
总体介绍
这篇paper重要介绍了Oracle在单机/RAC环境下,对原有的并行执行策略做了一些重要的改进,包括
- 改进了并行执行方式,引入了multi-stage parallelization,使其具有更好的扩展性,更能抵抗data skew。
- 基于执行中实时收集的准确信息而自适应的调整并行执行方式,避免过于依赖优化器的estimation而产生次优的计划,提高执行效率。
- 对一些串行执行的算子,也实现了并行化。
paper中主要讨论了分析型查询最为常见的3个算子:group by / window function / join 。
Parallelism in Oracle
Oracle的并行执行在刚推出时是比较简单的,只是针对table scan算子实现了并行化,后续不断完善,实现了更加丰富的multiple-slice plan。(值得一提的是,PolarDB的并行查询也遵循类似的发展路线,从单机内的表级并行,到单机多slice并行,再到多机分布式并行)
根据paper的描述和一些相关资料 介绍,Oracle的并行优化将operators划分为若干slice,用DFO(Data Flow Operator) 来描述plan segment之间的数据流动,一个DFO描述了一组producer -> consumer的关系。
有全局的QC(Query Coordinator)来负责生成执行计划,下发计划片段,并调度各个worker process执行,plan中串行的部分也可以在QC上完成。为了能够限制并行执行占用的CPU资源,避免并发query的相互干扰,QC在调度中会保证,任一时间只有一组producer + consumer在工作,当producer完成子计划的执行和数据传输,其worker进程将被复用作为后续新的consumer,而原来的consumer将成为新的producer向后传递数据,这样就严格限定了CPU的占用数量最大只能是DoP * 2。(由此也可以看出,在消费端是有一定的buffer机制的)
Data redistribution的方式有broadcast / hash / range / random等,一般情况下,数据如何分发在优化阶段optimizer就已静态决定。
Handle Group By
- 原有的并行执行方式
先做Group By pushdown,在下层DFO的本地,各自做local aggregation,然后再按照group by key做redistribution,然后再做二次aggregation,这种方法称为Group By PushDown (GPD)。
是否做GPD,由优化器估计的input rows estimation和group num estimation来决定,如果local aggregation可以有比较好的reduction效果(聚合效果较好),就可以减少redistribute的数据量,且可以一定程度抵抗data skew,但代价则是多了一轮local aggregation的计算,因此这里有个trade-off,效果好坏取决于local aggregation是否能很好地缩减数据量。
同时,为了避免disk IO的代价,在做pushdown local aggregation时,一旦memory不足,就停止聚集,直接发送已有部分聚集结果,后续rows直接pass-through发送,相当于关闭了GPD,这样不会影响数据的正确性因为上面还有2次聚集。
这种方法在MPP系统中很常见,也是PolarDB的一种可选的并行执行方式,但其存在2个主要问题
- 依赖optimizer estimation来决定是否GPD,因此可能不准确,尤其是local aggregation后的cardinality。
- 判断是否关闭GPD的时机和条件不合理:如果GPD确实没有太多收益(reduction不足),但memory一直足够,则也会一直做GPD,浪费了资源。 如果GPD是有收益(reduction明显),但突然由于并发query到来内存不足,就关闭掉了GPD,后续无法恢复,也不合理。
- 新并行方式的改进
1)不依赖于优化器估计,在执行中根据收集的input rows / group num的准确统计,动态决定是否GPD。
2)只要reduction比较有效,就继续进行GPD。
3)如果内存不足,采用batch flushing,将已在内存中的group batch发送出去,清空内存后,继续build新的patch。
4)针对高并发的场景(memory可用资源会变化),引入hybrid batch flushing(HBF),当内存不足时,不立即发送已有batch,而是保持在内存中,对后续data row,如果仍在batch group中,则继续更新这组的aggregation结果,否则直接发送出去。同时实时跟踪batch的reduction情况,如果reduction不好,则直接发送该batch,构造新batch,也可以类似LRU,替换batch中命中低的group,保留最频繁的group,达到好的reduction。
这样面对内存波动时,即使突然内存不足也不会立即停止GPD,而且保存batch观察一阵再决定。
5)判断是否开始调整GPD的时机,不再是之前的memory不足时,而是更早,当batch大小达到L2 cache size时,根据动态收集的reduction情况,如果效果好,则不做限制,否则进入HBF mode,不再增加占用内存,如果HBF后观察一段发现reduction效果始终不好,则停止GPD。
可以看到执行方式灵活了很多,有了很强的基于数据的自适应能力。
Handle Window Function
在很多分析场景中,window function被越来越多的使用,其基本概念可以参考MySQL手册
12.21.2 Window Function Concepts and Syntax
而且,目前大多数commercial optimizer和一些优秀的开源系统中,都提供了用window function做query transformation的能力,Oracle自然不例外,内部用wf做self-join elimination和subquery unnesting。(PolarDB MySQL在MySQL传统优化器上进行了增强,也实现了subquery unnesting to window function)
和MySQL类似,window function在Oracle中也是通过sorting来实现的,这里称为window sorting。根据其语义特点,wf被分为了3个类别:
reporting window function :
针对一个partition内做聚集计算,例如
SELECT /*y:year q:quarter m:month d:day*/ y, q, m, d, sales, SUM(sales) OVER (PBY y,q,m) msales, SUM(sales) OVER (PBY y,q) qsales, SUM(sales) OVER (PBY y) ysales FROM fact f;
- 原有并行方式
基于common partition by key做data distribution,在各个worker process中独立计算window sort。
其主要问题是,扩展性受限于partition key NDV限制,如果NDV < DoP,则会有些worker process没有事做。
- 新的并行方式:
加入了2种新的并行执行方式,都会将wf的并行变为多阶段模式。
1)extended partition key
由于执行是利用common partition key做分布,可以通过扩展加入更多partition key来增加NDV,从而增加并行度(也可以一定程度抵抗data skew),利用extended partition key做redistribution之后,在各个worker上则只有原partition的部分数据,但仍按原partition做window sort计算,得到局部聚集的结果。
上图中Window sort表示了基于部分数据聚集的过程,由于扩展了partition key,每个window sort进程只能看到属于一个partition的一部分聚集结果,这是不完整的,需要在上层的Window Consolidator获取全局结果,这个汇总过程分为2个步骤:
step.1 各个Window sort process将本地聚集结果broadcast到上层的window consolidator进程组中,consolidator processes各自计算全量的聚集结果,并构建hash table : partition key -> 聚集结果,注意这里是指发送聚集结果(partition key + 局部聚集结果),而不是全部表数据。
step.2 将原始data row,random分发到上层window consolidator中,由于每个consolidator进程都已经有了最终的结果,只要到hash table中look up一下,拼接上即可。
2)window pushdown
与extended partition key类似,只是各个worker process先在table scan的同时,做window sort计算,这时也仍然得到的是部分partition数据的local aggregation结果。
然后broadcast -> consolidate -> random 分发 -> lookup拼接最终结果,步骤相同。
新的并行方式增加了一次数据distribution ,和hash table lookup的成本,但可以扩展更高的并行度,从而获得更好性能。
具体采用original / extended partition key / window pushdown的哪一种策略,取决于优化器的estimation。如果原始partition key具有足够的NDV值,则采用原始并行方式,否则尝试扩展partition key并估算NDV,如果满足要求则采用extended partition key方式,如果扩展后,仍无法产生足够的NDV值,采用window pushdown 方式。
- 对optimizer error的处理
如果optimizer estimation不准确,可能会产生严重性能问题(worker空闲),因此需要执行中的自适应能力:
初始时仍根据NDV估计,决定做extended / pushdown,但后续执行中实时收集partition key的NDV,如果实际值足够大,则调整为原始的基于repartition key做data distribution的方式,这时原来的window sort步骤(extended/pushdown)变为pass-through,原来的window consolidator则完成实际的计算工作。
cumulative/ranking window function
类似MySQL window function中frame的概念,计算是在一个partition内的frame中进行的,frame定义了以当前行为基准的滑动窗口。
SELECT prod_id, date, sales, SUM(sales) OVER (PBY prod_id OBY date), RANK() OVER (PBY prod_id OBY sales) FROM fact f;
- 原有并行方式
Oracle在选择wf并行方式时有一条heuristic rule,将具有common partition key的wf放在一个slice内计算,这样,以上的2个wf的执行方式如下:
也是基于partition key做data distribution,在各个worker process中独立依次计算window sort,因此也有扩展性受限于NDV的问题。
- 新并行方式
和extended partition by的思路类似,现在有了现成的order by key,可以基于order by来扩展distribution key。决策也是在optimize阶段,基于NDV做出。以上述SQL为例,假设NDV(prod_id) < DoP,则计算NDV(prod_id, date)和NDV(prod_id, sales)。如果后两个满足DoP要求,则并行计划变为:
和reporting wf不同,这种wf要求输入数据的有序性,因此在第一步根据[prodid,date]进行partition时,需要做range partition!从而保证从全局来看,各个local window sort中的数据整体形成一个按[prod_id + date ]有序的序列,类似下图(假设dop = 4)
各个window sort进程中,数据按prod_id + date有序
为了后续的全局有序性,这里需要一个关键的同步过程:
1)每个window sort process先对负责的局部partition数据,计算first partition first row的结果 + last partition last row的结果,并发送给QC。
2)QC收集每个window sort的信息做consolidate,由于是range的,所以每个worker process的首/尾信息就足够,然后将合并之后的信息(如下图)发送回各个worker process。
3)每个worker process根据QC发回的每个worker process的首/尾信息,就可以汇总得到完整的partition聚集结果,以上图process 2为例,它收到的首/尾信息中包括了process1对于P1的汇总结果100和process 3/4中对于P2的汇总结果50/150,就能得到P1 , P2两个分组全局的聚集结果了。
paper中没有提到这种类型的wf怎么做adaptive的并行执行,应该思路和reporting wf类似,如果发现partition key的NDV足够大,就可以退回原始的执行方式,distribution采用partition by key做range partition,然后各个window sort直接计算整个partition结果,不需要QC参与。
Handle Join
SELECT t.year, t.quarter, f.sales FROM time_dim t, fact f WHERE t.time_key = f.time_key;
- 原始并行方式
通过optimizer对于Input两侧的cardinality estimation,决定是做hash-hash方式(数据量较大),还是broadcast-random方式(一侧数据量较少)。
但这种对于optimizer依赖较大,如果估计不准,比如高估了card,则会采用hash-hash,导致某些worker process工作量很小,如果低估了card,则采用broadcast-random,导致分发大量数据。
- 新并行方式
思路仍然是adaptive,在build侧加入"statistics collector"算子。这个算子会buffer住一定的rows(此时还没有分发+join),当rows数量到达一定阈值(2 * DOP )或input结束时,则通知QC数据量,QC根据各个worker process汇总的数据量,决定是做broadcast + random 还是hash + hash。
QC决策后会通知各个worker process,此后,statistics collector算子将变为pass-through,不起作用,这时就不需要依赖于optimizer estimation了。
之所以选择2* DOP的rows阈值,是为了让小表足够小,可以build cache-resident hash table做join。根据paper的介绍,目前自适应的方式是从hash - hash 调整为 broadcast - random,因为这样只需要去掉一个DFO,比较简单,反向调整则需要增加一个DFO,相对困难。(对比Figure 10/11)
Handle small tables
对于broadcast的方式,小表还是要发送数据的,而且随着DoP的升高,网络负载也成比例上涨,为了减少网络开销,利用Oracle RAC本身share disk的能力,将小表读入buffer pool并pin住禁止其换出,这样后续join可以在一个slice内完成,减少了一次网络分发和一组worker process。
Handle serialization point
在分布式计划中,可能会存在一些需要串行执行才能保证正确性的算子,称为serialization point,例如Top-N,UDF计算。。
- 原有处理方式
老版本中,对于这种serialization point,需要在QC上完成。这样有2个缺点
- 执行本身会消耗资源,因此这会影响QC调度整个并行计划
- 后续的operator即使可以并行,也只能在QC上串行执行,影响扩展性
- 新并行方式
新的并行方式引入了Back-to-parallel + Single-Server DFO的功能
1)Single-Server DFO:对于需要串行执行的算子,单独off-load到一个单process的DFO中执行,不影响QC。
2)Back-to-parallel: 如果serialization point之后的算子,optimizer估计可以做并行执行,则仍然分发数据到多个并行process中,后续仍可以并行执行。
总结
paper中讲解了Oracle优化后的并行执行策略,核心在于如何做自适应的调整 + 如何扩展并行度。但没有涉及太多细节,比如怎么在算子内buffer数据,具体如何调度,调整中的算法等。(吐槽一下Oracle的paper,历来只粗浅的描述原理,估计也是实现非常复杂难以展开)
在PolarDB多机并行(多RO节点,资源充足)执行环境下,paper中针对window function提高扩展性的思路还是很具有参考意义的。此外,PolarDB的并行执行目前还缺乏自适应调整的能力,这需要执行器具有统计feedback的能力。由于query feedback loop是查询优化器的一个重要组件,对于statistic维护,plan management,自适应计划调整都有重要意义,PolarDB优化器层已经在这个方面具有了一定能力,后续会不断完善。