TPC-H作为一个OLAP的基准测试,相关的优化文章很多[1,2]。TPCH 100GB测试集在6种不同公有云实例规格下的性能数据:
规格 | 8c32g * 2 | 8c32g * 4 | 16c64g * 2 | 16c64g * 3 | 16c64g * 4 | 16c64g * 6 |
总耗时(秒) | 99.82 | 48.55 | 50.31 | 33.00 | 25.54 | 20.56 |
TPCH 1TB测试集在3种不同公有云实例规格下的性能数据:
规格 | 3*16C128GB | 4*16C128GB | 6*16C128GB |
总耗时(秒) | 508.76 | 408.45 | 233.93 |
在之前的文章中,我们介绍了PolarDB-X列存查询引擎[3]。尽管追求TPC-H跑分并非最终目标,但成绩过于不理想也难以接受。在调研过程中我们发现,各大数据库厂商通常会在其官方网站上公布测试步骤和结果,但很少详细介绍执行计划。大概理由也很简单:对于优化器来说,尤其是join reorder,TPC-H被公认为一个相对简单的基准测试[4]。
这个现状导致我们不得不花了大量时间在各个数据库上重复搭建数据库环境、构造TPC-H数据、导入TPC-H数据、分析TPC-H执行计划、出具调研报告的过程。为了减少大家的工作量,通过本文,我们从官方视角解读PolarDB-X的TPC-H 1T列存执行计划。 需要说明的点
- 执行计划基于polardb-2.4.0_5.4.19-20240527_xcluster5.4.19-20240527 explain analyze的结果,对应的执行计划文本均已开源[5],后续版本可能会变化,以开源文本为准。
- 图中的数字为相应算子的输出行数。有两行数字的,第二行为runtime filter过滤掉的行数。
- 除非标明reverse,hash join默认都是右孩子会建哈希表,左孩子做探测。
- 为了方便展示,省略了图中的project算子。
- TPC-H通常会有order by limit,本身没有什么计算量,图中树根处会简化成一个算子。
1. 前置知识
1.1 优化器
PolarDB-X的优化器[6]包含RBO、CBO,其中CBO是标准的Cascade style Top-Down优化器;基数估计是传统的基于独立性假设的估计模型;统计信息[7]包括直方图、TopN、NDV。当前版本的优化器架构为
PolarDB-X中的列存是索引的形态,在优化器中会以索引选择的方式进行列存索引的自动使用。当CBO优化出的最优单机物理执行计划B为AP查询且可以走列存索引时,会进入列存优化器:
- 将解析器解析出逻辑执行计划A中的行存表替换为列存索引。
- 列存RBO进行应用子查询消除、条件推导、列裁剪等规则优化执行计划。
- 列存CBO进行逻辑变换、物理变换,并基于代价产生最优分布式执行计划。
列存优化器与MPP优化器都会产生分布式执行计划,不同点在于MPP优化器在物理执行计划B的基础上插入shuffle算子产生分布式执行计划,而列存优化器基于逻辑执行计划A直接产生最优分布式执行计划。
MPP优化器基于执行计划B进行优化的原因是行存查询性能的决定性因素是DN,数据分布并不重要。同时与单机执行计划保持同样的执行路径,可以避免因执行计划差异导致MPP执行比单机执行更慢。
列存优化器基于执行计划A进行优化的原因是数据重分布作为列存查询速度的决定性因素,必须在join reorder进行考虑。
1.2 分布式执行计划
PolarDB-X的计算节点是MPP架构,执行器会将同样的数据调度到同一个计算节点处理,方便复用缓存。调度的粒度可以是文件也可以是分区。对于分区级的调度,分区键相同的数据必然落到同一个计算节点,这就是所谓的partition wise。这种调度与表的分区方式相关,可以作为表的partition wise属性被优化器感知,并利用其最小化数据的重分布。
每个分区被分配到一个计算节点,这是经典的Balls into Bins问题。为了降低maximum load,在随机调度不均时会重新采用two choices的做法[8],用两倍的空间换取指数级的倾斜率下降。
1.3 Join
PolarDB-X的列存CBO会穷举所有可能的join顺序[9],同时提供semijoin-join交换、agg-join交换等能力,确保join order合理。由于列存CBO的搜索空间为单机CBO✖️MPP CBO,为了避免优化器过于耗时,超过10张表时不会做join reorder。 Runtime filter对于Join的性能提升显著。
PolarDB-X的runtime filter并不是优化器产生的,代价模型也不会考虑runtime filter的过滤效果。
runtime filter是由执行器自适应构建的,所以只有真实执行才能知道是否使用了runtime filter,explain analyze时才会无法展示。
1.4 物理算子
对于Join算子,PolarDB-X支持Hash Join、Sort Merge Join、BKA Join和Nested loop Join。行存优化器会选择上述算子中代价最低的;列存优化器则会优先选择Hash Join,无法使用Hash Join就使用Nested loop Join。
对于Agg算子,PolarDB-X支持Hash Agg和Sort Agg。行存优化器会选择上述算子中代价最低的;列存优化器则只会用Hash Agg。
对于Window算子,PolarDB-X支持Hash Window和Sort Window。
Hash window无法覆盖所有的场景,所以列存优化器与行存优化器一样会在Hash Window和Sort Window中选择代价更低的物理算子。
2. TPC-H优化点
所有的优化点以及对应的查询罗列如下
优化点 | 可应用的查询 |
partition wise | Q2、Q3、Q4、Q9、Q10、Q12、Q13、Q14、Q16、Q18、Q21 |
列裁剪 | Q1-Q22 |
常量折叠 | Q1、Q4、Q5、Q6、Q14、Q15、Q19、Q20 |
两阶段agg | Q1、Q4、Q5、Q6、Q7、Q8、Q9、Q11、Q12、Q13、Q14、Q15、Q16、Q17、Q19、Q21、Q22 |
基数估计 | Q9、Q13 |
Reverse semi hash join | Q4、Q21 |
Reverse anti hash join | Q21、Q22 |
Join-Agg转Join-Agg-Semijoin | Q20 |
Group join | Q13 |
Hash window | Q17 |
2.1 partition wise
以Q3为例,lineitem按l_orderkey分片,orders按o_orderkey分片,customer按c_custkey分片。
select 1 from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority
对于orders join customer,将customer做广播,orders join customer可以保持o_orderkey的分片属性。进一步 lineitem join (orders join customer),lineitem有l_orderkey的分片属性,orders join customer有o_orderkey的分片属性,可以做partition wise join,并保持l_orderkey的分片属性。最后的group by l_orderkey, o_orderdate, o_shippriority,由于输入保持l_orderkey的分片属性,可以做partition wise agg。
PolarDB-X的broadcast join是基于Calcite的passThroughTrait实现的,例如广播右表,会将join以及左孩子的分布属性设置为any,右孩子的分布属性设置为broadcast。
void columnarBroadCastRight(Join join, RelNode leftChild, RelNode rightChild) { join.Distribution = ANY; leftChild.Distribution = ANY; rightChild.Distribution = BROADCAST; return; }
若右孩子的分布属性为broadcast,passThroughTrait会将需要的join分布属性传递到左孩子,从而一步步向下传递直到列存索引上。
void passThroughTrait( Distribution required, Join join, RelNode leftChild, RelNode rightChild) { RelDistribution distribution = required.Distribution; if (rightChild.Distribution == BROADCAST) { for (int key : required.getKeys()) { if (key >= leftInputFieldCount) { return; } } join.Distribution = required; leftChild.Distribution = required; } }
除了broadcast,也可以用exchange。例如Q3,将orders按o_custkey做exchange来获得o_custkey的分片属性,customer有c_custkey的分片属性,orders join customer可以做partition wise join。接着将orders join customer按o_orderkey做exchange,lineitem join (orders join customer)可以做partition wise join。
由于这个执行计划代价需要exchange两次,代价高于上述broadcast的计划,CBO并没有采用。
columnarBroadCastRight与passThroughTrait共同作用的问题是broadcast join可能PassThrough所有输出的列,导致搜索空间急剧膨胀,所以必须尽早裁剪掉不需要的broadcast join。
裁剪逻辑如下:对于A join B,A join broadcast(B)必然可以表示为shuffle(shuffle(A) join shuffle(B),所以若broadcast(B)的代价大于shuffle(A)+shuffle(B)+shuffle(A join B)的代价,就可以避免优化器产生冗余的A join broadcast(B),因此不再需要PassThrough A的任何一列。
2.2 列裁剪
为了降低大规模数据处理任务中的shuffle成本,需要在shuffle操作之前通过project算子对列进行裁剪,以减少冗余列。join操作后通常会引入冗余列,增加shuffle的数据量,从而导致性能下降。shuffle算子的NET成本公式被设计为NET_COST = rowSize * rowCount / NET_BUFFER_SIZE,其中rowSize指的是每行的数据量。显然,较大的rowSize直接增加了shuffle的代价,CBO会自然选择经过列裁剪的执行计划。
public RelOptCost computeSelfCost() { double rowCount = getRowCount(); long rowSize = estimateRowSize(getInput().getRowType()); if (distribution == BROADCAST) { rowCount = rowCount * parallelism; } cpuCost=...; netCost = rowSize * rowCount / NET_BUFFER_SIZE); return costFactory.makeCost(rowCount, cpuCost, 0, 0, Math.ceil(netCost); }
列裁剪的实现要点有三个:
- 列存RBO将project算子尽量下推。
- 列存CBO进行逻辑算子变换时保留project算子。
- 代价模型考虑shuffle时的列宽。
2.3 常量折叠
常量折叠就是优化器阶段将可以直接计算的常量提前计算出来,例如Q1的过滤条件l_shipdate <= '1998-12-01' - interval '118' day会被优化器转成l_shipdate <= '1998-08-05'。
由于常量折叠只会应用一次,对性能没有要求,直接调用标量函数的计算接口即可。 需要注意PolarDB-X的解析器会将常量进行参数化,方便plan cache进行匹配。Q1的过滤条件被参数化成l_shipdate <= ? - interval ? day,常量折叠成l_shipdate <= '1998-08-05'后执行计划无法复用,因此常量折叠与plan cache不兼容。默认情况下列存plan cache关闭,常量折叠打开。若打开了列存plan cache,常量折叠会自动关闭。
2.4 两阶段agg
Q1的group by l_returnflag, l_linestatus需要拆成两阶段,避免大量的数据shuffle。先做分片级的partial agg group by l_returnflag, l_linestatus,再做global agg group by l_returnflag, l_linestatus,shuffle数据量从58.5亿降低到了576行。但如果agg本身产生的结果很多,两阶段agg有可能无法减少数据的shuffle量,反而引入了冗余的agg计算,所以需要通过基数估计确定两阶段agg是否有收益。 由于partial agg是分片级的,会有结果集放大的问题,基数估计的公式需要做下调整:rows为输入的总行数,bins为agg的rowcount估算。考虑每个分片的情况,每个分片上有 ⌈rowsshard⌉
行数据,共有bins个不同的分组,这也是Balls into Bins,rowcount估算为
bins×[1−(1−1bins)⌈rowsshard⌉]×shard
2.5 基数估计
对于x join y on x.a = y.b and x.c = y.d,基数估计计算公式为
x.rows∗y.rows/max(max(ndv(x.a),ndv(y.b)),max(ndv(x.c),ndv(y.d)))
。Q9中的join条件 ps_suppkey = l_suppkey and ps_partkey = l_partkey涉及到partsupp的复合主键,需要在join的基数估计时改为
x.rows∗y.rows/max(ndv(x.a,x.c),ndv(y.b,y.d))
。 Q9的过滤条件p_name like '%goldenrod%',目前没有专门的处理,采用heuristic,过滤性设置为5%。 Q13的group by c_count,c_count是count(o_orderkey)group by c_custkey的结果,无法估计ndv,采用heuristic,ndv设置为100。
2.6 Reverse semi hash join
Q4中exists子查询转成semi join后变成了orders semi join lineitem。 用orders semi hash join lineitem,流程为
- 用lineitem构建38亿行的哈希表。
- orders的5700万数据探查哈希表,输出匹配的行,共5200万。
用orders reverse semi hash join lineitem,流程为
- 用orders构建5700万行哈希表。
- lineitem的38亿行数据探查哈希表,输出未被标记的哈希表记录并标记其已被匹配,共5200万。
由于探查哈希表效率远高于构建哈希表,当前场景下reverse semi hash join效率更高。
对于 B semi hash join A ,显然可以使用A构建runtime filter过滤B的元素。
对于 B reverse semi hash join A ,也可以利用B构建runtime filter过滤A的元素。
对此给出证明。 证明 设A用runtime filter过滤后的结果集为C,要证明
B reverse semi hash join A⇔B reverse semi hash join C
,只需要证明 B∩A⇔B∩C 。 由于 C⊂A ,反向显然成立。 考虑正向, ∀x,x∈B∩A ,由runtime filter的性质, x∈B⇒filterB(x)=true ,所以 x∈C ,即 ∀x,x∈B∩A⇒x∈B∩C 。
引入runtime filter的orders reverse semi hash join lineitem的流程为
- 用orders构建5700万行哈希表。
- 用orders的runtime filter过滤lineitem,得到2.4亿行。
- lineitem的2.4亿行数据探查哈希表,输出未被标记的哈希表记录并标记其已被匹配,共输出5200万。
由于runtime filter探查效率远高于探查哈希表,当前场景下引入runtime filter后效率更高。
2.7 Reverse anti hash join
Q21中not exists子查询转成anti join,变成了xx anti join lineitem,这里的xx为linitem、supplier、nation三表join的结果。 用xx anti hash join lineitem,流程为
- 用lineitem构建38亿行的哈希表。
- xx的1.5亿行数据探查哈希表并输出不匹配的数据,共1350万行。
用xx reverse anti hash join lineitem,流程为
- 用xx构建1.5亿行的哈希表。
- lineitem的38亿数据探查哈希表并标记匹配。
- 遍历哈希表最后输出哈希表中未被标记的数据,共1350万行。
由于探查哈希表效率远高于构建哈希表,当前场景下reverse anti hash join效率更高。
对于 B reverse anti hash join A ,可以利用B的runtime filter过滤A的元素。对此给出证明。 证明 设过滤后的结果集为C,要证明 B reverse anti hash join A⇔B reverse anti hash join C ,只需要证明
B∖A⇔B∖C 。 由于 C⊂A ,正向显然成立。 考虑反向, 且∀x,x∈B∖C,x∈B且x∉C , 由runtime filter的性质, x∈B⇒filterB(x)=true , 或x∉C⇔x∉A或filterB(x)=false ,即 且或x∈B∖C⇒filterB(x)=true且(x∉A或filterB(x)=false)⇒x∉A , ∀x,x∈B∖C⇒x∈B∖A 。
引入runtime filter的xx reverse anti hash join lineitem的流程为
- 用xx构建1.5亿行的哈希表。
- 用xx的runtime filter过滤lineitem,得到5.7亿行。
- lineitem的5.7亿数据探查哈希表并标记匹配。
- 遍历哈希表最后输出哈希表中未被标记的数据,共1350万行。
由于runtime filter探查效率远高于探查哈希表,当前场景下引入runtime filter后效率更高。
2.8 Join-Agg转Join-Agg-SemiJoin
目前,PolarDB-X的runtime filter在join的probe端无法穿透shuffle操作。这一限制对Q20查询的性能带来了负面影响:在lineitem表上的agg计算出大量冗余结果,在agg之前使用runtime filter将其筛选掉可以提升性能。然而,lineitem表和agg操作之间的shuffle阻碍了runtime filter的下推。为了解决此问题,我们引入了一个CBO规则:将join的右孩子节点复制,并与agg的输入进行semi join,从而起到runtime filter的作用。
2.9 Group join
group join限制过于严苛,且可能有负面作用,导致实际应用并不多,具体可参考这篇文章[10]。PolarDB-X当前对group join限制为
- 只支持等值join。
- 对于inner join、left join,group by的列完全匹配左孩子的join key,agg运算的列属于右孩子。
- 对于right join,group by的列完全匹配右孩子的join key,agg运算的列属于左孩子。
- group by的列全局唯一。
全局唯一的限制主要是为了避免group join导致中间结果集放大的问题。对于Q13,可以应用group join。
select c_custkey, count(o_orderkey) as c_count from customer left outer join orders on c_custkey = o_custkey and o_comment not like '%special%packages%' group by c_custkey
group join的执行流程
- 对customer表用c_custkey建哈希表。
- orders的数据探查哈希表并计算count(o_orderkey)。
- 遍历哈希表,用null填充未匹配记录的agg结果,输出agg结果。
2.10 Hash window
将Q17的子查询转window节省一次lineitem的扫描,进一步可以使用hash window。由于并行度的区别,PolarDB-X上的hash window比sort window更高效。window转hash window需要满足的三个条件为
- Unbounded window。
- 没有order by。
- 不包含window agg,如ROW_NUMBER、RANK。
3. 总结
本文从官方的角度逐条解析PolarDB-X在TPC-H列存执行计划的设计要点。这些要点不仅包含了各项优化的原理,还提供了相关的证明与代码实现,希望帮助读者更深入地理解PolarDB-X的列存优化器。 在下一期中,我们将探讨PolarDB-X的行列混合执行计划。
4. 引用
[1] 链接1
[2] 链接2
[3] 链接3
[4] 链接4
[5] 链接5
[6] 链接6
[7] 链接7
[8] 链接8
[9] 链接9
[10] 链接10
5. 附录
TPCH执行计划汇总 Q1
Q2
Q3
Q4
Q5
Q6
Q7
Q8
Q9
Q10
Q11
Q12
Q13
Q14
Q15
Q16
Q17
Q18
Q19
Q20
Q21
Q22
作者:升雨