这是MaxCompute有关SQL优化器原理的系列文章之一。我们会陆续推出SQL优化器有关优化规则和框架的其他文章。添加钉钉群“关系代数优化技术”(群号11719083)可以获取最新文章发布动态(二维码在文章末尾)。
本文主要描述MaxCompute优化器实现的Auto Hash Join的功能。
简介
在MaxCompute中,Join操作符的实现算法之一名为"Hash Join",其实现原理是,把小表的数据全部读入内存中,并拷贝多份分发到大表数据所在机器,在 map 阶段直接扫描大表数据与内存中的小表数据进行匹配。Hash join执行方式效率很高,但是要求小表数据足够小以便放到内存中,假如小表数据太大,则任务在执行过程中会报OutOfMemory错误。
在MapCompute中,可以使用MapJoin关键字来实现Hash join,如下所示:
select /* + mapjoin(b) */ a.* from table1 a join table2 b on a.col1 = b.col2;
// b表为小表
但是这种通过使用hint的方式还是不够智能。另外对于query复杂的情况,用户很可能因为无法确定join的某一路数据量大小而放弃使用mapjoin。在最新的MaxCompute SQL 2.0中,基于代价的优化器(Cost Based Optimizer,CBO)包含了一个自动优化join为hash join的优化规则。
实现原理
在CBO中会对所有的operator的cost进行估计,这个cost包含rowcount、cpu、内存等等。有了各个operator的cost,就能估计其对应输出数据量的大小,公式可以简单的认为是: data_size = rowcount * averageRowSize
。有了dataSize之后,就可以很容易知道这个任务是否适合使用HashJoin,其判定方法就是计算各个parent operator的data size之和是否小于某个阈值。假如估算出的data size在阈值范围之内,则会产生一个包含HashJoin的计划。同时对于Join,CBO也会产生一个普通的包含MergeJoin的计划,最后在这两个计划中选择cost最小的作为最优计划。
简单说来,在CBO中是否选择HashJoin作为最优计划的步骤有两个:
- Step1:估算join的输入数据量大小,判定是否产生一个包含HashJoin的计划
- Step2:对比HashJoin、MergeJoin相关计划的cost,选择cost最小的计划作为最优计划
举例,对如下sql进行优化:
select t1.name from
(select dt_bad_linenum as name from bad_tpch_customer) t1
join
(select c_name from tpch_customer) t2
on t1.name = t2.c_name;
上述sql在CBO中会翻译生成如下operator tree:
OdpsLogicalProject(name=[$0]): rowcount = 9000000.0, cumulative cost = {48000008.0 rows, 39000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 5
LogicalJoin(condition=[EQ($0, $1)], joinType=[inner]): rowcount = 9000000.0, cumulative cost = {39000008.0 rows, 30000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 4
OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1
OdpsLogicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_file,dt_bad_linenum,dt_bad_msg,dt_bad_code,dt_bad_data(5) {0, 1, 2, 3, 4}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 5.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 0
OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
OdpsLogicalTableScan(table=[[tpch_100gb.tpch_customer, c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment(8) {0, 1, 2, 3, 4, 5, 6, 7}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 15000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 2
从上可以看到,join的parent operator有两个:
OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1
OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
其中id为1的project其输出记录数是4行,且其输出列只有1列(bad_tpch_customer表中有5列),估算其输出数据量,认为其适合使用HashJoin,因此其产生的计划中包含两种:
- 计划1:HashJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {28500024.88 rows, 28500013.222723687326862 cpu, 270001607.0 io, 496.0 memory, 378.0 network}, id = 109
OdpsPhysicalHashJoin(type=[INNER], equi=[[($0,$1)]], mainstream=[1]): rowcount = 3.24, cumulative cost = {28500021.64 rows, 28500013.222723687326862 cpu, 270001548.0 io, 496.0 memory, 378.0 network}, id = 108
OdpsPhysicalStreamlineRead(order=[[]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 0.0 network}, id = 106
OdpsPhysicalStreamlineWrite(shuffle=[broadcast], order=[[]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 105
OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 104
OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 103
OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 102
OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 107
OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 99
- 计划2:MergeJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {55500024.88 rows, 471791423.394757487326862 cpu, 756001229.0 io, 336.0 memory, 270459000360.0 network}, id = 104
OdpsPhysicalMergeJoin(type=[INNER], equi=[[($0,$1)]]): rowcount = 3.24, cumulative cost = {55500021.64 rows, 471791423.394757487326862 cpu, 756001170.0 io, 336.0 memory, 270459000360.0 network}, id = 103
OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 360.0 network}, id = 99
OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 98
OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 97
OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 96
OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 95
OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 1.35E7, cumulative cost = {5.55E+7 rows, 458291406.5720338 cpu, 756000000.0 io, 18.0 memory, 270459000000.0 network}, id = 102
OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 1.35E7, cumulative cost = {4.20E+7 rows, 236645703.2860169 cpu, 513000000.0 io, 18.0 memory, 0.0 network}, id = 101
OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 100
OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 92
比较上述两个计划的cost,明显计划1的cost更小,因此选择包含HashJoin的计划1作为最优计划。
总结
AutoHashJoin的一个很大的好处是能让用户免参与的进行这个优化,同时对于一些复杂的query也更有可能使用HashJoin。但是,因为CBO无法完美估计数据量,会出现误判从而导致任务OOM的情况。针对这种情况,MaxCompute也进行了相应的调整,对于CBO误判导致HashJoin OOM的任务会关闭HashJoin rule来重试。
目前CBO中使用HashJoin的阈值比较保守,默认是25MB。主要原因是CBO对于数据量的估计有偏差,无法完美估计数据量,而估计不准的原因有两个:
- 数据是压缩存储的,CBO拿到的statistics不准
- CBO的估计算法有偏差
这两个问题也是CBO致力解决的问题。