Incorporating Partitioning and Parallel Plans into the SCOPE optimizer

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 这篇paper中讨论是的Microsoft的cosmos DB,其本身是一个海量数据的大规模计算平台,有些类似hadoop,使用的是一种类SQL的脚本,叫做SCOPE,针对SCOPE的优化器负责生成最优的执行计划。在1998年前后Microsoft基本丢弃了Sybase原有的优化器实现,并由Graefe主导重写了基于cascades的优化器。因此和Microsoft所有其他的数据库产品一样,SCOPE optimizer也是基于Cascades的transformation-based的优化器。

这篇paper中讨论是的Microsoft的cosmos DB,其本身是一个海量数据的大规模计算平台,有些类似hadoop,使用的是一种类SQL的脚本,叫做SCOPE,针对SCOPE的优化器负责生成最优的执行计划。在1998年前后Microsoft基本丢弃了Sybase原有的优化器实现,并由Graefe主导重写了基于cascades的优化器。因此和Microsoft所有其他的数据库产品一样,SCOPE optimizer也是基于Cascades的transformation-based的优化器。

本paper介绍的是如何在SCOPE的优化过程中,无缝接入对于并行计划的考虑,同时利用functional dependency +等价列等概念,利用partitioning/grouping/ordering等信息尽量减少/避免分区,排序,分组等操作,提高执行效率。PolarDB的并行优化方案中,在属性统一描述 + 兼容性判断 + 属性推导等方面,参考了这篇paper的思路。

综述

概略来说,它扩展了cascades中property的概念,把partitioning/grouping/sorting,用一个统一的structural property来进行描述,并利用属性的兼容性来生成更高效的并行plan。例如下面这个简单的例子:

image.png

对于上图中的SQL query,最基础直观的并行执行方式如(a)所示,两个分区表先通过在join key上的repartition操作,形成co-locate join,并行join后根据后续group by key做二次repartition,完成aggregation的并行计算。这是一个合理的plan,但由于有多次的repartition而不那么高效。但如果基于R.a = S.a这个join条件,只在R.a / S.a上做第一轮repartition,且存在这样的functional dependency: R.c -> R.a(R.c是R的主键),则并行join后的结果也在R.c上分布,可以省去第二次repartition直接完成join。

可以看到通过利用FD等来推导分布的兼容性,就可以生成更简化的plan。

这篇paper就是以一种系统化的方式,统一且规范的描述了partitioning/grouping/sorting三方面的属性极其兼容性推导,然后融合到cascades的optimization rules中。此外,还引入了一系列利用FD / 数据约束的inference rules

并行plan + exchange算子

data exchange operator是一个描述数据重新分布的算子,在并行/MPP环境下,exchange是保证数据并行处理的基本逻辑操作,具体实现中,exchange包含发(partition) + 收(merge)两个操作,在多台机器上同时执行。

分发拓扑

image.png

这里主要考虑Initial Partitioning(1->n) , Full Repartitioning(m -> n), Full Merge (n -> 1) 这三种

Partitioning 方案

一个公共前提是,Paritition本身是FIFO的方式,所以如果原来前后的两个tuple r1, r2,在分区之后如果还在一个分区内,则仍然保持这种前后关系。

  1. Hash Partitioning:基于partition key的hash value进行分发,partition之间无序,且不保证数据的均匀。
  2. Range Partitioning: 将partition key的domain分成若干不相交的range,partition之间整体有序,也不保证数据均匀。
  3. Non-deterministic Partitioning: 例如Round-robin/random分发,可以较好应对data skew。
  4. Broadcast: 将全量数据分发到所有目标节点,适合数据量较小的场景。

Merging方案

将来自多个数据源的分发数据,汇总到单一节点。

  1. Random Merge: 从任一Input上获取数据,各input内部的顺序可以保留,各input之间的顺序无法保留。
  2. Sort Merge: 只有当各输入,在sort列上各自有序时,输出可以保证全体有序,使用例如多路归并的排序算法。
  3. Concat Merge: 一个input一个input的处理,各个input内部顺序可以保留,各input之间无法保证。
  4. Sort Concat Merge: 先确定各个input之间的顺序,然后按序对各input做处理,各个input内部顺序可以保留,input之间也可以全局有序。

optimizer中利用property

在框架中,query expression用来描述某个特定的算子子树,其中的算子包括physical / logical operator,logical operator描述算子的操作类型,而physical operator则确定算子使用的物理算法。优化过程分为2种特定操作,Logical exploration和physical optimization。logical exploration应用transformation rules生成新的logical expressions,而physical optimization应用implementation rules将logical operator转换为physical operator。

如下算法描述了给定一个初始query expression以及对最终输出的property requirements后,超级简化的递归优化过程:

image.png

上图中有下划线的几个函数是优化中和physical property最为相关的几个操作:

Determining child required properties

parent物理算子会对当前算子的输出施加某种property requirement,这个req必须被满足。例如如果parent是使用ordered group by的计算方式,会要求当前算子在group by key上具有有序性。同时当前算子的物理实现也会对其input children算子的输出提出某种property requirement,也同样需要被满足。

这个函数就是用来决定children的property requirement,它由父算子对当前算子的requirment和当前算子的物理实现决定。

Deriving delivered properties

这个函数根据输入数据的物理属性和当前算子的物理实现算法,推导出其输出数据的物理属性。

Property matching

一旦输出物理属性被推导出来,就需要判断其与当前算子的property requirement是否match,如果无法match则当前plan就是无效的,需要被丢弃。

注意所谓match并不要求完全一致,这里有一定的兼容性规则,后面会具体说明。

规范化描述

Functional dependency / 约束 / 等价性

FD的含义是: 一组column set R与一组column set S,如果对其中任意两个tuple,其在R上的值相同,则在S上的值一定相同,则R -> S。

FD可能来源于几个方面:

  1. R -> R’ ,只要R’是R的子集
  2. key约束,一个relation的主键可以FD决定relation的所有其他列
  3. 等值谓词 col1 = col2 ,意味着 col1 -> col2 并且 col2 -> col1
  4. 等值常量 col1 = const,意味着 image.png
  5. grouping column,在做完group by后,grouping column成为结果的key

column等价类的含义在之前的文章 已经提及了,如果对于一个relation中的所有tuple,在某些column set上都具有相同的值,则这组column set构成column等价类,等价类中也可以包含常量,这和MySQL中的MEP概念一致。

structural property

用一个统一结构来描述partitioning ,grouping, sorting这三方面的物理属性,属性根据其作用域分为了2个类别:parititioning是全局属性,描述全局的数据如何分布;grouping/sorting则是局部属性,描述了每个partition内部,数据的物理特性。(paper中使用了很多复杂的数学符号,其实概念是很简单的,为了便于说明这里就不一一列举了,只是口述下概念)

因此是property是从global/local两个维度,综合起来考虑。其中grouping是一个列的集合,分组列之间没有顺序要求,sorting则是一个列的列表,列之间的前后顺序不可变。

  1. 局部的property使用一组固定顺序的Action序列 {A1, A2 ... Am} 来描述,表示一个Action一个Action的进行操作,每个action都是基于前面action序列的结果进行操作,不破坏前面序列的属性:比如A1是分组{C1, C2},表示在C1,C2上分组,A2是排序C3,表示在每个C1,C2的分组内,进行排序。其中每个action要么是针对一组column做分组操作,要么针对单个column做排序操作。
  2. 全局的property描述partitioning方式,主要包含2种:ordered / non-ordered,non-ordered partition只能保证,在partition column上具有相同value的tuple会在同一个partition中;而ordered partition还可以额外保证,不同partition会覆盖disjoint的key range且partition间有序,也就是说,一个partition内tuple全部都小于另一个partition中的tuple。

把以上2个方面结合起来就构成了对structural property的规范描述:

image.png

前面是分区操作,其中 image.png表示有序/无序的全局分区,后面则是一系列操作序列,用来构建分区内局部属性。而相关处理就在这2个正交的维度上,各自独立进行。

例如一个relation有C1, C2, C3列以及结构化属性{ image.png },也就是说在C1上分区,而在每个分区内,数据首先在{C1, C2} 列上分组,而在每个分组内,按C3列有序。如下数据

image.png

Inference Rules

Rule 1. 局部属性可以从尾部truncate掉,这个很容易理解,因为每个action总是基于前序的action来操作,因此前序部分总可以被保留。

Rule 2. 全局属性可以expand,如果数据在C1列上做partition,它同样在{C1, C2}列上做partition,因为具有相同(c1,c2)组合值的元素具有相同c1值,因此必然在同一分区。

image.png

更广泛来说,在C1…Cm上分布,可以推导出在C1…Cm,Cm+1分布。

Rule 3. 在C列上有序,可以推导出在C列上分组,反过来不成立

Rule 4. 利用FD,可以尽量化简grouping列或者order列

image.png

对于group列集合,如果其中一些列可以functional的决定其他一些列,被决定的列可以从group列集合中去除。这里没有对列的顺序要求。

image.png

对于order列的集合,如果Cj的前缀部分可以functional的决定Cj,则Cj可以从order列集合中去除。注意这里是有列的顺序要求的,必须是前缀。

Rule 5. 对于local属性,还有一种化简方式

image.png

如果A1…Ai-1的操作序列所影响的列,可以完全决定Ai操作所影响的列,则Ai操作是无用的,可以去掉。

生成structural property

Partition operator产生的property

前面已经提到,Partition本身是FIFO的方式,因此任一个分区内,tuple的顺序在partition前后是不会改变的,也就是partition只会影响全局属性,不会影响局部属性。exchange的output property会集成input property的局部属性,而改变其全局属性。

image.png

  1. 如果做hash distribution,则输出在partition列上呈现分组特定(相同数据位于同一分区)。
  2. 如果做range partition,则partition间整体有序,即ordered partition。
  3. 如果是随机分发,则 image.png表示没有特定属性
  4. Broadcast时, image.png 表示所有数据重复

Merge operator产生的property

Merge操作会产生针对多个输入产生单一输出,其局部属性取决于input的局部属性和merge操作的类型。

image.png

  1. 对于non-ordered partitioning输入

random merge无法保留任何局部属性

Sort merge当输入的局部属性的有序列和全局的排序列一致,可以保证全局有序,否则无序

Concat merge当输入的局部属性已经在分区列上进行了分组,concat之后这个分组可以保留

Sort Concat merge同上

2. 对于ordered partitioning (range)输入:

random merge无法保留任何局部属性

Sort merge当输入的局部属性的有序列和全局的排序列一致,可以保证全局有序,否则无序

Concat merge当输入的局部属性已经在分区列上进行了分组,concat之后这个分组可以保留

Sort Concat merge,如果局部属性的排序列就是全局的分区列,则可以保证全局有序

Repartition operator产生的property

repartition就是partition + merge的组合操作,是一个完整的exchange算子实现,因此其产生的属性也是前面2项的各种组合,如下图所示

image.png

生成property requirement

不同的算子的不同物理实现算法,会根据算子本身是串行或并行执行,对其输入的数据流的 property有不同的要求,列举在下图中

image.png

从上图可以看出,table scan/select/project对于输入无要求,主要是order by/group by/join。

串行输入

  1. hash group by对输入无要求
  2. stream group by要求输入在分组列上已完成具有分组属性
  3. NL/HS join对输入无要求
  4. MergeJoin 要求输入在join列上有序

并行输入

  1. hash group by要求输入的分区列是分组列的子集或相同。
  2. streaming group by除了这个要求外,还要求各个局部输入在分组列上已具有分组属性。
  3. NL/HS join要求输入的分区列,是join列的子集,且两侧的输入列要对应。
  4. MergeJoin 除了以上要求,还要求两侧的输入,在各个分区内按照join列有序

Property matching

有了output property和property requirement,下面就是两者之间如何匹配了。在评估output和requirement时,可以在global/local两个正交维度分别比较即可,方法类似于DB2中的order optimization的思路:

  1. 利用FD + 等价列,转换为最简且统一的normalized形式,具体就是首先用等价列中的head来统一替换其他列。
  2. 利用前面提到的各种推断规则,进行转换(truncate/expand/Co->Cg)和推导,判断是否可以从输出property => 要求property,如果可以,则说明两者match。

举个例子更容易说明白:

输出属性是

image.png

属性要求是

image.png

给定的FD是 { C6, C2 } -> { C3 },此外有两个等价列 { "C1", C6 }, { "C2", C7 },引号内的是head列。

先用等价列替换,变为

image.png

可以看到,这里不仅替换了属性描述本身,也替换了FD中的列信息!

然后应用{ C1, C2 } -> { C3 }这个FD,输出属性变为

image.png

先看全局分区属性,由于其可以expand,因此{C2,C1} => {C1,C2,C4}。

再看局部属性,由于其可以truncate,可以将尾部的C5排序去除掉,因此得到 image.png,同时由于inference rule 3,有序可以得到分组,因此得到 image.png

全局和局部属性都可以match,因此requirement可以被output满足。

Enforcer rules

在有了这些Property/Rules/Matching等之后,在Cascades的优化过程中,去集成parallel/serial的plan选项,就比较直接了:

  • 在枚举每个物理算子实现时,既要枚举串行的实现,也要枚举不同的并行实现,不同的并行方式,可以产生不同的输出property,也会对输入有不同的property要求
  • 利用enforcer rules,引入不同的enforcer (sort/exchange),并进行requirement的传导

enforcer rules包括对sorting/grouping/partition不同属性的处理,但思路都是一样的:

  1. 如果当前operator的物理实现,可以产生要求的property,则直接枚举该operator,然后递归到其child上,对child的要求就是该物理实现对于input的要求
  2. 如果当前operator的物理实现,只能维持输入的property,则枚举该operator,并递归到其child上,对child的要求,除了物理实现本身对于input的要求,还有透传下来的上层要求
  3. 如果当前operator的物理实现,无法产生要求的property,则枚举该operator,并在其上加入一个enforcer!这样改变了对该operator的输出要求,重新对该operator进行判断

以上过程其实就是cascades framework的处理流程的一部分,所有这些选择都要枚举到,然后基于cost选择最优。不过由于选项太多,会有一些heuristic,比如不同rules之间的优先级,从而做一些pruning。

总结

这篇paper提出了这种统一的property处理框架,并集成到了cascades的优化流程中。

核心要素就是物理属性的推断规则 + 算子输出属性的生成 + 对输入属性的要求 + match推导。PolarDB的并行优化器参考了cascades的设计思路,因此也具备多属性的统一描述结构,其处理方式很多都参考了这篇paper,不过目前还没有这么完善的推断规则,后续有持续改进的空间。

目录
相关文章
|
分布式计算 Java Spark
Optimizing Spark job parameters
Optimizing Spark job parameters
74 0
|
存储 Shell
Understanding parameters:理解参数(Parameter)
Understanding parameters:理解参数(Parameter)
90 0
|
分布式计算 Apache Spark
|
算法 数据挖掘 开发者
Partitioning Methods|学习笔记(二)
快速学习 Partitioning Methods
89 0
Partitioning Methods|学习笔记(二)
|
算法 数据挖掘 开发者
Partitioning Methods|学习笔记(一)
快速学习 Partitioning Methods
163 0
Partitioning  Methods|学习笔记(一)
|
关系型数据库 MySQL
|
MySQL 关系型数据库