作者:越寒
简介
背景
Hash join是解决复杂join的一个重要手段,但其无法避免拉取左右两端的数据到计算层进行计算,导致某些场景下执行效率不高。作为一种补充,bka join则可以利用OLTP数据库中的索引,通过join构造inner表的predicate命中表索引,在某些场景下有比较好的join效率。PolarDB-X是面向HTAP设计的分布式数据库,在复杂查询时也会重点考虑利用数据库的索引信息来提升join的查询效率,因此有了本文的semi bka join。
要解决的问题
在这篇源码解读中,我们要解决的是类似这样一条SQL的执行效率问题,select * from t1 where c1 in (select c1 from t2);其中t2为一张大表,t2表的c2列上有索引,t1为一张小表,且匹配后数据量很小。
Q:这个问题重要么?
A:其实还挺重要的,这代表了一类经典的TP型SQL,客户用的也比较多。
经典执行模式的不足
比如hash join,sort merge join,nested loop join,都需要把两张表的数据都拉取到计算节点,而t2是一张大表,导致执行效率不高。
解决方案
避免拉取t2这张大表的数据,引入semi bka join的执行模式,后面会详细展开。
讲述方式
为了避免大家在阅读源码时迷失在大量的细节之中,我们会用一种重构的形式来逐步构建semi bka join。同时,有些内容对于理解该部分设计,可能并没有太大的作用,比如代码中对于新分区表与老分区表的if eles判断,因此在重构中删除了该部分;最后,我们对于一些逻辑进行了重构,以便能够更加清晰的进行讲述。 综上,感兴趣的朋友可以参考我在https://github.com/wcf2333/galaxysql.git的代码提交记录,结合本篇文章阅读,体验会更好。
在分支refactor_semi_join分支上,移除了对于全局二级索引和新分区表的支持,精简了执行器的一些处理;
在wcf2333_build_semi_bka_join分支上,基于refactor_semi_join分支,首先移除了semi bka join的实现,然后逐步进行了丰富和实现,步骤可见提交记录,如下。
remove semi bka join and materialized semi join
add the simplest optimizer rule for semi bka join
add the simplest executor for semi bka join
support multi column in lookup predicate
enhance optimizer
support stream lookup join
support dynamic pruning
support single sharding key when table rule is not simple
support lookup predicate with multi-column when pruning
注:本文中bka(batch key access)和lookup表达相同的含义
目标
在写这篇文章时,是期望大家可以在学到一些东西之后,能够在一个暂时没有实现该功能的系统上真正实现一个semi bka join。它足够鲁棒,包含大量的细节,理想情况下足以在生产环境中使用。这也是为什么会花挺大力气采用重构的方式来写这篇文章的原因。 显然,一篇文章显然是不足以实现上述目标的,所以不出意外的话陆陆续续还会有几篇,我们期望回答所有重要的细节。因此,如果大家有什么问题的话,欢迎提问,无论是留言还是直接联系我,我们会在后续的文章中把大家的问题都囊括其中。 提醒:我们会在本篇中介绍一些细节,有些细节不进行debug可能不太容易理解,所以感兴趣的朋友可以搭建debug环境边阅读边调试。
前置知识
在理解semi bka join的详细设计和实现时,不可避免的要对其相关组件进行介绍,从中我们挑了两个重要的来展开介绍一下,hash join的核心设计与如何接入异步执行框架。
**
HashJoin的核心设计**
我们之所以要讲解hash join的核心设计,是因为bka join与hash join有很多非常像的地方。理解了hash join,有助于我们更好的理解bka join。同时,这样的对比学习,可能会让大家有更多的收获。
hash join执行流程
hash join的左边是Outer端(探测hash table的一端),右边是inner端(构建hash table的一端),执行流程如下所示。
流程中的几个核心问题
1.保存在hash table中的是什么?
public class ConcurrentRawHashTable implements Hash {
public static final int NOT_EXISTS = -1;
/**
* The array of keys (buckets)
*/
private final AtomicIntegerArray keys;
/**
* The mask for wrapping a position counter
*/
private final int mask;
/**
* The current table size.
*/
private final int n;
}
该行在缓存数据中的位置(keys中保存的int值),而非join key的值或者记录本身(所以为了输出完整数据,我们需要缓存build端的数据,即ChunksIndex buildChunks)
**
2.如何解决哈希碰撞?**
拉链的形式来解决哈希碰撞(所以你会看到positionLinks和hashTable总是如影随形)。根据保存在哈希表中的位置,获取build端的相应行,进而检测记录是否真正匹配。
ConcurrentRawHashTable hashTable;
int[] positionLinks;
int matchedPosition = hashTable.get(hashCode);
while (matchedPosition != LIST_END) {
if (buildKeyChunks.equals(matchedPosition, keyChunk, position)) {
break;
}
matchedPosition = positionLinks[matchedPosition];
}
3.如何匹配结果并输出?
build端构建hash table之后,probe端开始流式的探测并输出,可见AbstractBufferedJoinExec的doNextChunk和nextRow方法,整体流程如下:
遍历probe端的结果,拿到probe端join key的hash code,首先去hash table中找到对应的位置(matchInit方法)
判断position是否有效(matchValid方法),并检测join条件是否匹配(checkJoinCondition)
若匹配,则输出相应结果(buildJoinRow方法)
随后一直从链表中获取同一hash code对应的build chunk中的位置(matchNext方法),跳转至步骤2直至拿到的位置不合法
注:在wcf2333_build_semi_bka_join分支上对AbstractBufferedJoinExec进行了部分重构,逻辑可能会更清晰一些,有兴趣的朋友可以参考。
matchedPosition = matchInit(probeJoinKeyChunk, probeKeyHashCode, probePosition);
for (; matchValid(matchedPosition);
matchedPosition = matchNext(matchedPosition, probeJoinKeyChunk, probePosition))
{
if (!checkJoinCondition(buildChunks, probeChunk, probePosition, matchedPosition)) {
continue;
}
buildJoinRow();
}
细节的强化:我们现在对上述流程中的步骤三(输出结果)进行审视,原因是join的类型并非只有inner,还有outer join,semi join与anti join,其各自的特点分别如下:
outer join:如果进行probe的这行无法匹配右表,则右表中相应字段填充为null
semi join:只输出左表内容,且一旦匹配,则无需继续探测链表中余下的记录是否能够继续匹配
anti join:同semi join,但需要注意的是condition中不包含anti的语义,因此一旦condition匹配,则anti join一定不匹配,但是condition不匹配时,anti join未必匹配,此时需要进一步判断condition的结果是否为null。例如where a not in (select b from t2),则condition为a = b,null = 3的结果为null,这会导致condition不匹配,但是却不应该输出。此外,还需要注意not exists与not in转换而来的anti join的区别,整体来讲anti join的这部分并不太容易理解,非必要可暂时不对该部分进行理解,下次文章期望能够对该部分进行重构。
4. 对于semi和anti时的一点优化,即doSpecialCheckForSemiJoin方法 如果构建哈希表的结果为空,则semi join的输出结果为空,anti join的输出结果为probe端。 如果anti join中为一列且build chunk中该列含有null值时,则anti join的输出结果为空。
扩展:如果需要支持多列时anti join的pass nothing优化,可以参考如下进行扩展,即检查该行中参与构建哈希表的所有字段是否均为null。
protected static boolean checkNullRecord(Chunk chunk) {
int blockCount = chunk.getBlockCount();
int position = chunk.getBlock(0).getPositionCount();
boolean hasNullRecord = IntStream.range(0, position).anyMatch(
pos -> IntStream.range(0, blockCount).allMatch(t -> chunk.getBlock(t).isNull(pos)));
return hasNullRecord;
}
对于null safe equal的处理? null safe equal:用<=>表示,用于判断join中null = null是否成立。 null safe equal时将build端的null值放入哈希表中,否则构建哈希表时过滤掉null值,如下:
boolean[] nullPos = getNullPos(keyChunk, ignoreNullBlocks);
for (int offset = 0; offset < keyChunk.getPositionCount(); offset++, position++) {
if (!nullPos[offset]) {
int next = hashTable.put(position, hashes[offset]);
positionLinks[position] = next;
}
}
异步框架下算子状态的切换
大致思路:通常情况下,如果我们能够拿到数据进行处理,则该算子一定不会结束,状态为非阻塞状态;反之,如果拿不到数据,则应进一步判断为block状态还是finish状态(当然,实际上我们会有Producer和Consumer,将来可以写一篇文章来聊聊这其中的几个接口与状态切换,非本文重点,按下不表)
Semi BKA join的设计与实现
好了,现在我们要正式开始一步步的构建semi bka join了。
添加整个框架
我们首先实现semi bka join优化器部分,其次是执行器部分,即将LogicalSemiJoin转换为SemiBkaJoin,涉及的commit如下。
add the simplest optimizer rule for semi bka join
add the simplest executor for semi bka join
support multi column in lookup predicate
enhance optimizer
优化器部分
1.我们需要一个新的RelNode类型,SemiBkaJoin,派生自LogicalSemiJoin。
2.我们需要一个LogicalSemiJointoSemiBkaJoin的规则,以便将下述的RelNode树进行转换。
tips:在一条规则中放一个开关,总不是一件太坏的事情,这样如果该条规则有问题,可以更快的禁掉。
3.在RuleToUse中添加该规则,使得优化器的RBO阶段可以使用该规则。
执行器部分
我们先来对比一下hash join和lookup join的执行流程。
hash join
lookup join
通过对上图的执行过程进行分析,我们可以将bka join的执行过程分为如下几个阶段。原来的代码中这几个阶段不是非常清晰,我在之前提到的代码分支上对这部分代码进行了重构,感兴趣的同学可以参考。
protected enum LookupJoinStatus {
CONSUMING_OUTER,
INIT_INNER_LOOKUP,
CACHE_INNER_RESULT,
BUILD_HASH_TABLE,
PROBE_AND_OUTPUT
}
接下来我们对这几个关键状态进行一些解释
CONSUMING_OUTER:拉取outer端数据并进行缓存
INIT_INNER_LOOKUP:根据outer端数据构建lookup predicate并下发物理SQL,拉取过滤后的数据
CACHE_INNER_RESULT:不停的获取inner端数据并缓存
BUILD_HASH_TABLE:根据缓存的inner端数据构建哈希表
PROBE_AND_OUTPUT:根据缓存的outer端数据,探测并输出
Q:现在我们来想一下如何接入异步框架,也就是算子何时会处于阻塞,非阻塞以及最终的结束状态。
A:首先,只有在PROBE_AND_OUTPUT阶段或者outer端无数据时,该算子才可能是结束状态;其次,只有CONSUMING_OUTER和CACHE_INNER_RESULT可能会出现block状态,block状态主要是为了避免获取DN数据时阻塞,整体流程图如下。
Q:lookup join时下发的物理SQL只有在执行时才真正确定,这与通常情况很不同,因为通常下发的物理SQL在拿到执行计划时就已经确定了,如何解决?
A:第一步,生成物理SQL时,检查是否带有lookup的标,如果有,则在where条件中添加一个'bka_magic' = 'bka_magic'的占位符;第二步,执行时用lookup condition对占位符'bka_magic' = 'bka_magic'进行替换。
一个思考:有兴趣的朋友可以看一下,LogicalView中的isMGetEnabled用于控制执行时生成lookup所需的物理SQL,expandView用于控制lookup执行时对于DN数据的扫描,是否真的有必要使用两个开关?
支持多列in查询
例如,select * from t1 where (c1,c2) in (select (c1,c3) from t2); 可以参考LookupConditionBuilder.buildMultiCondition方法,比较简单,不再赘述。
优化器增强:限制使用Semi BKA join的场景
在这里我们需要考虑的一个核心问题是,哪些场景不能使用Semi BKA join?
比如select * from t1 where c1 in (select md5(c2) from t2),调用相应的接口我们会发现md5(c2)并非是t2中的列,因此这种情况我们不再使用BKA join。
脑洞一下:这里面其实有个可以思考的问题,下发select xx_func(c2) from t2 where xx_func(c2) in (...)这样的SQL,在某些情况下应该也是会有比较好的性能的,比如存储节点上具有函数索引。但这需要对优化器和执行器进行更为精细的设计,同时还需要考虑收益和可维护性的问题,但不管怎么样,我觉得这是个有意思的问题。
优化Semi BKA join
其实至此,我们已经拿到了一个可以用的Semi BKA join了,但它的性能在某些场景下会有一定的优化空间,因此接下来我们对其进行一些优化,涉及如下commit。
support stream lookup join
support dynamic pruning
support single sharding key when table rule is not simple
support lookup predicate with multi-column when pruning
增强执行器:增加分批处理的能力
如上所述,我们需要先拉取outer端的所有数据,然后构造predicate获取inner端数据,获取完所有的inner端数据并使用其构建了哈希表之后,才可以使用缓存的outer端数据进行探测并向上层流式的吐数据。
Q:这会带来一定的问题,比如,这使得相比于hash join,吐数据的时间被延后了;再比如,这样构造出来的predicate中的in值一般会非常多,存储节点会更倾向于全表扫描而非索引扫描。
A:既然我们觉得outer端完全阻塞住了不太好,那就让他流式起来好了,即outer端的数据量一旦超过一个阈值,我们就先拿这部分的数据走一个完整的join流程并对外输出结果。 于是我们涉及到如下这样一个略微复杂一点的流程:
思考一下:相比于把batchSize做成BufferQueue的字段,调用BufferQueue.pop()拉取batchSize行数据,我们还有一种方式,BufferQueue中不包含batchSize字段,使用BufferQueue.pop(batchSize)的方式拉取batchSize行数据。为什么要提这个呢,因为这样可以让BufferQueue更加纯粹,且为执行时batchSize的动态调整留下了空间,虽然这种自适应的调整不一定很靠谱:((这是另一个有意思的问题了)。
支持动态裁剪
上述方案中,执行时predicate中in列表中的值会下发到每一个分片,比如outer端查出来的数据为1,2,拼成的物理SQL为select c1 from t2_physical_table_name where c1 in (1,2),且需要下发到所有分片。
我们进一步假设t2有八个分片,标号为t2_{00-08},c1为分片键,c1值为1的记录只可能出现在t2_01分片,c1值为2的记录只可能出现在t2_02分片,则理想情况下我们只需要下发两条物理SQL。
即一条至t2_01分片的select c1 from t2_01 where c1 in (1),一条至t2_02分片的select c1 from t2_02 where c1 in (2)。也就是我们可以做分片裁剪和物理SQL中in值的裁剪。
篇幅有限,我们决定把对这部分设计的详细介绍放到下次,有兴趣的朋友可以先自行结合以下三个commit和给出的SQL例子进行对照理解。
support dynamic pruning
support single sharding key when table rule is not simple
support lookup predicate with multi-column when pruning
/*+TDDL: SEMI_BKA_JOIN(t1, t2)*/ select * from t1 where c1 in (select c2 from t2);
/*+TDDL: SEMI_BKA_JOIN(t1, t2)*/ select * from t1 where c1 in (select c2 from t3);
/*+TDDL: SEMI_BKA_JOIN(t1, t2)*/ select * from t1 where (c1,c3) in (select c2,c3 from t3);
CREATE TABLE `t1` (
`c1` int(11) DEFAULT NULL,
`c2` int(11) DEFAULT NULL,
`c3` int(11) DEFAULT NULL,
KEY `auto_shard_key_c1` USING BTREE (`c1`),
KEY `auto_shard_key_c2` USING BTREE (`c2`)
) ENGINE = InnoDB dbpartition by hash(`c1`) tbpartition by hash(`c2`) tbpartitions 2;
CREATE TABLE `t2` (
`c1` int(11) DEFAULT NULL,
`c2` int(11) DEFAULT NULL,
`c3` int(11) DEFAULT NULL,
KEY `auto_shard_key_c2` USING BTREE (`c2`)
) ENGINE = InnoDB dbpartition by hash(`c2`) tbpartition by hash(`c2`) tbpartitions 2;
CREATE TABLE `t3` (
`c1` int(11) DEFAULT NULL,
`c2` int(11) DEFAULT NULL,
`c3` int(11) DEFAULT NULL,
KEY `auto_shard_key_c2` USING BTREE (`c2`),
KEY `auto_shard_key_c3` USING BTREE (`c3`)
) ENGINE = InnoDB dbpartition by hash(`c2`) tbpartition by hash(`c3`) tbpartitions 2;
insert into t1 values (1,1,1), (2,2,2), (null, null, null);
insert into t2 values (1,1,1), (1,1,1), (null, null, null);
insert into t3 values (1,1,1), (null, null, null);
如何让CBO选择BKA join?
Q:首先,为什么相比于hash join,BKA join会更快,以及在哪些场景下会快?
A:相比于hash join,bka join在某些情况下可以避免拉取大量的数据,本质上在于hash join无法避免拉取两张表的数据,唯一能决定的是使用小表构建哈希表,大表流式探测;而BKA join可以通过构造predicate的形式,只拉取小表的全量数据,同时只拉取大表中匹配的数据。但是分批之后,bka join会导致网络交互次数增多,同时需要评估下发的物理SQL在存储节点上的执行效率。
Q:其次,semi bka join相比于semi hash join,为什么会快?
A:看起来这个问题和上面是相同的,其实有一些差异。因为semi join时输出的一定是左端,在现有的实现下,semi hash join时一定会使用右端构建哈希表,当右端数据量大时,代价会很大。
思考:Semi hash join如何实现小表构建哈希表,而非永远使用子查询中的表构建哈希表? 在我们现在的执行器模式之下,这是无法实现的,大家有兴趣的可以debug并思考一下原因,以及如何支持这种执行模式。
如何测试
测试还是一个蛮重要的东西,我觉得可能至少应该包括如下场景:
- 执行模式:tp_local, ap_local, mpp, cursor下推
- 左/右表数据:空表,三行普通值,三行null值,三行普通值+三行null值,随机数据(包含随机比例的重复值+null值)
- 列的个数:单列(sharding列),单列(非sharding列),两列(全部为sharding列),两列(一列sharding列+一列非sharding列)
- 子查询的类型,in, exists, some, any, all, not in, not exists, scalar_query
- 子查询中列的对齐情况,对齐,不对齐,主要测试涉及到下推场景时是否正确
- 子查询中列是否严格非null,测试优化器对于一些场景的处理是否符合预期
- 子查询中的条件,某些条件下可为空,只有join key等值,join key等值+join key非等值,join key等值+普通condition
- 子查询中涉及列的类型,全类型测试 9. 保护性case:比如> all转成的anti join绝不允许使用物化semi join等
总结
在本文中,我们主要介绍了hash join的执行流程,并从近乎零开始构建了semi bka join的执行模式。同时,我们在文中提到了很多问题,有兴趣的朋友可以进行思考和交流。当然,如文中所述,我们还遗留了一些内容,关于这部分内容,我们会在下次文章中填坑,同时会结合更多场景优化更多细节。
本文来源:PolarDB-X知乎号