作者:潜璟
在上一篇源码阅读中,我们介绍了INSERT的执行流程。而INSERT IGNORE与INSERT不同,需要对插入值判断是否有Unique Key的冲突,并忽略有冲突的插入值。因此本文将进一步介绍PolarDB-X中INSERT IGNORE的执行流程,其根据插入的表是否有GSI也有所变化。
一、下推执行
如果插入的表只有一张主表,没有GSI,那么只需要将INSERT IGNORE直接发送到对应的物理表上,由DN自行忽略存在冲突的值。在这种情况下,INSERT IGNORE的执行过程和INSERT基本上相同,读者可以参考之前的源码阅读文章。
二、逻辑执行
而在有GSI的情况下,就不能简单地将INSERT IGNORE分别下发到主表和GSI对应的物理分表上,否则有可能出现主表和GSI数据不一致的情况。举个例子:
create table t1 (a int primary key, b int, global index g1(b) dbpartition by hash(b)) dbpartition by hash(a); insert ignore into t1 values (1,1),(1,2);
对于插入的两条记录,它们在主表上位于同一个物理表(a相同),但是在GSI上位于不同的物理表(b不相同),如果直接下发INSERT IGNORE的话,主表上只有(1,1)能够成功插入(主键冲突),而在GSI上(1,1)和(1,2)都能成功插入,于是GSI比主表多了一条数据。
针对这种情况,一种解决方案是根据插入值中的Unique Key,先到数据库中SELECT出有可能冲突的数据到CN,然后在CN判断冲突的值并删除。
进行SELECT的时候,最简单的方式就是将所有的SELECT直接发送到主表上,但是主表上可能没有对应的Unique Key,这就导致SELECT的时候会进行全表扫描,影响性能。所以在优化器阶段,我们会根据Unique Key是在主表还是GSI上定义的来确定相应的SELECT需要发送到主表还是GSI,具体代码位置:
com.alibaba.polardbx.optimizer.core.planner.rule.OptimizeLogicalInsertRule#groupUkByTable
protected Map>> groupUkByTable(LogicalInsertIgnore insertIgnore, ExecutionContext executionContext) { // 找到每个 Unique Key 在主表和哪些 GSI 中存在 Map> ukAllTableMap = new HashMap<>(); for (int i = 0; i < uniqueKeys.size(); i++) { List uniqueKey = uniqueKeys.get(i); for (Map.Entry>> e : writableTableUkMap.entrySet()) { String currentTableName = e.getKey().toUpperCase(); Map> currentUniqueKeys = e.getValue(); boolean found = false; for (Set currentUniqueKey : currentUniqueKeys.values()) { if (currentUniqueKey.size() != uniqueKey.size()) { continue; } boolean match = currentUniqueKey.containsAll(uniqueKey); if (match) { found = true; break; } } if (found) { ukAllTableMap.computeIfAbsent(i, k -> new ArrayList<>()).add(currentTableName); } } } // 确定是在哪一个表上进行 SELECT for (Map.Entry> e : ukAllTableMap.entrySet()) { List tableNames = e.getValue(); if (tableNames.contains(primaryTableName.toUpperCase())) { tableUkMap.computeIfAbsent(primaryTableName.toUpperCase(), k -> new ArrayList<>()) .add(uniqueKeys.get(e.getKey())); } else { final boolean onlyNonPublicGsi = tableNames.stream().noneMatch(tn -> GlobalIndexMeta.isPublished(executionContext, sm.getTable(tn))); boolean found = false; for (String tableName : tableNames) { if (!onlyNonPublicGsi && GlobalIndexMeta.isPublished(executionContext, sm.getTable(tableName))) { tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey())); found = true; break; } else if (onlyNonPublicGsi && GlobalIndexMeta.canWrite(executionContext, sm.getTable(tableName))) { tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey())); found = true; break; } } } } return tableUkMap; }
而到了执行阶段,我们在LogicalInsertIgnoreHandler中处理INSERT IGNORE。我们首先会进入getDuplicatedValues函数,其通过下发SELECT的方式查找表中已有的冲突的Unique Key的记录。我们将下发的SELECT语句中选择的列设置为(value_index, uk_index, pk)。其中value_index和uk_index均为的常量。
举个例子,假设有表:
CREATE TABLE `t` ( `id` int(11) NOT NULL, `a` int(11) NOT NULL, `b` int(11) NOT NULL, PRIMARY KEY (`id`), UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`) ) DBPARTITION BY HASH(`id`)
以及一条INSERT IGNORE语句:
INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);
假设在PolarDB-X中执行时,其会将Unique Key编号为:
0: id 1: g_i_a
INSERT IGNORE语句中插入的每个值分别编号为:
0: (1,2,3) 1: (2,3,4) 2: (3,4,5)
那么对于(2,3,4)的UNIQUE KEY构造的GSI上的SELECT即为:
# 查询 GSI SELECT 1 as `value_index`, 1 as `uk_index`, `id` FROM `g_i_a_xxxx` WHERE `a` in 3;
假设表中已经存在(5,3,6),那么这条SELECT的返回结果即为(1,1,5)。此外,由于不同的Unique Key的SELECT返回格式是相同的,所以我们会将同一个物理库上不同的SELECT查询UNION起来发送,以一次性得到多个结果,减少CN和DN之间的交互次数。只要某个Unique Key有重复值,我们就能根据value_index和uk_index确定是插入值的哪一行的哪个Unique Key是重复的。
当得到所有的返回结果之后,我们对数据进行去重。我们将上一步得到的冲突的的值放入一个SET中,然后顺序扫描所有的每一行插入值,如果发现有重复的就跳过该行,否则就将该行也加入到SET中(因为插入值之间也有可能存在相互冲突)。去重完毕之后,我们就得到了所有不存在冲突的值,将这些值插入到表中之后就完成了一条INSERT IGNORE的执行。
逻辑执行的执行流程:
com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute protected int doExecute(LogicalInsert insert, ExecutionContext executionContext, LogicalInsert.HandlerParams handlerParams) { // ... try { Map>> ukGroupByTable = insertIgnore.getUkGroupByTable(); List> deduplicated; List> duplicateValues; // 获取表中已有的 Unique Key 冲突值 duplicateValues = getDuplicatedValues(insertIgnore, LockMode.SHARED_LOCK, executionContext, ukGroupByTable, (rowCount) -> memoryAllocator.allocateReservedMemory( MemoryEstimator.calcSelectValuesMemCost(rowCount, selectRowType)), selectRowType, true, handlerParams); final List> batchParameters executionContext.getParams().getBatchParameters(); // 根据上一步得到的结果,去掉 INSERT IGNORE 中的冲突值 deduplicated = getDeduplicatedParams(insertIgnore.getUkColumnMetas(), insertIgnore.getBeforeUkMapping(), insertIgnore.getAfterUkMapping(), RelUtils.getRelInput(insertIgnore), duplicateValues, batchParameters, executionContext); if (!deduplicated.isEmpty()) { insertEc.setParams(new Parameters(deduplicated)); rams(new Parameters(deduplicated)); } else { // All duplicated return affectRows; } // 执行 INSERT try { if (gsiConcurrentWrite) { affectRows = concurrentExecute(insertIgnore, insertEc); } else { affectRows = sequentialExecute(insertIgnore, insertEc); } else { affectRows = sequentialExecute(insertIgnore, insertEc); } } catch (Throwable e) { handleException(executionContext, e, GeneralUtil.isNotEmpty(insertIgnore.getGsiInsertWriters())); } } finally { selectValuesPool.destroy(); } return affectRows; }
三、RETURNING优化
上一节提到的INSERT IGNORE的逻辑执行方式,虽然保证了数据的正确性,但是也使得一条INSERT IGNORE语句至少需要CN和DN的两次交互才能完成(第一次SELECT,第二次INSERT),影响了INSERT IGNORE的执行性能。
目前的DN已经支持了AliSQL的RETURNING优化,其可以在DN的INSERT IGNORE执行完毕之后返回成功插入的值。利用这一功能,PolarDB-X对INSERT IGNORE进行了进一步的优化:直接将INSERT IGNORE下发,如果在主表和GSI上全部成功返回,那么就说明插入值中没有冲突,于是就成功完成该条INSERT IGNORE的执行;否则就将多插入的值删除。
执行时,CN首先会根据上文中的语法下发带有RETURNING的物理INSERT IGNORE语句到DN,比如:
call dbms_trans.returning("a", "insert into t1_xxxx values(1,1)");
其中返回列是主键,用来标识插入的一批数据中哪些被成功插入了;t1_xxxx是逻辑表t1的一个物理分表。当主表和GSI上的所有INSERT IGNORE执行完毕之后,我们计算主表和GSI中成功插入值的交集作为最后的结果,然后删除多插入的值。这部分代码在
com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#getRowsToBeRemoved
private Map>> getRowsToBeRemoved(String tableName, Map List>> tableInsertedValues, List beforePkMapping, List pkColumnMetas) { final Map> tableInsertedPks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); final Map>>> tablePkRows = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); tableInsertedValues.forEach((tn, insertedValues) -> { final Set insertedPks = new TreeSet<>(); final List>> pkRows = new ArrayList<>(); for (List inserted : insertedValues) { final Object[] groupKeys = beforePkMapping.stream().map(inserted::get).toArray(); final GroupKey pk = new GroupKey(groupKeys, pkColumnMetas); insertedPks.add(pk); pkRows.add(Pair.of(pk, inserted)); } tableInsertedPks.put(tn, insertedPks); tablePkRows.put(tn, pkRows); }); // Get intersect of inserted values final Set distinctPks = new TreeSet<>(); for (GroupKey pk : tableInsertedPks.get(tableName)) { if (tableInsertedPks.values().stream().allMatch(pks -> pks.contains(pk))) { distinctPks.add(pk); } } // Remove values which not exists in at least one insert results final Map>> tableDeletePks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); tablePkRows.forEach((tn, pkRows) -> { final List> deletePks = new ArrayList<>(); pkRows.forEach(pkRow -> { if (!distinctPks.contains(pkRow.getKey())) { deletePks.add(pkRow.getValue()); } }); if (!deletePks.isEmpty()) { tableDeletePks.put(tn, deletePks); } }); return tableDeletePks; }
与上一节的逻辑执行的“悲观执行”相比,使用RETURNING优化的INSERT IGNORE相当于“乐观执行”,如果插入的值本身没有冲突,那么一条INSERT IGNORE语句CN和DN间只需要一次交互即可;而在有冲突的情况下,我们需要下发DELETE语句将主表或GSI中多插入的值删除,于是CN和DN间需要两次交互。可以看出,即便是有冲突的情况,CN和DN间的交互次数也不会超过上一节的逻辑执行。因此在无法直接下推的情况下,INSERT IGNORE的执行策略是默认使用RETURNING优化执行。
当然RETURNING优化的使用也有一些限制,比如插入的Value有重复主键时就不能使用,因为这种情况下无法判断具体是哪一行被成功插入,哪一行需要删除;具体可以阅读代码中的条件判断。当不能使用RETURNING优化时,系统会自动选择上一节中的逻辑执行方式执行该条INSERT IGNORE语句以保证数据的正确性。
使用RETURNING优化的执行流程:
com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute
protected int doExecute(LogicalInsert insert, ExecutionContext executionContext, LogicalInsert.HandlerParams handlerParams) { // ... // 判断能否使用 RETURNING 优化 boolean canUseReturning = executorContext.getStorageInfoManager().supportsReturning() && executionContext.getParamManager() .getBoolean(ConnectionParams.DML_USE_RETURNING) && allDnUseXDataSource && gsiCanUseReturning && !isBroadcast && !ComplexTaskPlanUtils.canWrite(tableMeta); if (canUseReturning) { canUseReturning = noDuplicateValues(insertIgnore, insertEc); } if (canUseReturning) { // 执行 INSERT IGNORE 并获得返回结果 final List allPhyPlan = new ArrayList<>(replaceSeqAndBuildPhyPlan(insertIgnore, insertEc, handlerParams)); getPhysicalPlanForGsi(insertIgnore.getGsiInsertIgnoreWriters(), insertEc, allPhyPlan); final Map>> tableInsertedValues = executeAndGetReturning(executionContext, allPhyPlan, insertIgnore, insertEc, memoryAllocator, selectRowType); // ... // 生成 DELETE final boolean removeAllInserted = targetTableNames.stream().anyMatch(tn -> !tableInsertedValues.containsKey(tn)); if (removeAllInserted) { affectedRows -= removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableInsertedValues); if (returnIgnored) { ignoredRows = totalRows; } } else { final List beforePkMapping = insertIgnore.getBeforePkMapping(); final List pkColumnMetas = insertIgnore.getPkColumnMetas(); // 计算所有插入值的交集 final Map>> tableDeletePks = getRowsToBeRemoved(tableName, tableInsertedValues, beforePkMapping, pkColumnMetas); affectedRows -= removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableDeletePks); if (returnIgnored) { ignoredRows += Optional.ofNullable(tableDeletePks.get(insertIgnore.getLogicalTable Name())).map(List::size) .orElse(0); } } handlerParams.optimizedWithReturning = true; if (returnIgnored) { return ignoredRows; } else { return affectedRows; } } else { handlerParams.optimizedWithReturning = false; } // ... }
最后以一个例子来展现RETURNING优化的执行流程与逻辑执行的不同。通过/*+TDDL:CMD_EXTRA(DML_USE_RETURNING=TRUE)*/这条HINT,用户可以手动控制是否使RETURNING优化。
首先建表并插入一条数据:
CREATE TABLE `t` ( `id` int(11) NOT NULL, `a` int(11) NOT NULL, `b` int(11) NOT NULL, PRIMARY KEY (`id`), UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`) ) DBPARTITION BY HASH(`id`); INSERT INTO t VALUES (1,3,3);
再执行一条INSERT IGNORE:
INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);
其中(1,2,3)与(1,3,3)主键冲突,(2,3,4)与(1,3,3)对于Unique Key g_i_a冲突。如果是RETURNING优化:
截屏2022-08-08 10.16.25
可以看到PolarDB-X先进行了INSERT IGNORE,再将多插入的数据删除:(1,2,3)在主表上冲突在UGSI上成功插入,(2,3,4)在UGSI上冲突在主表上成功插入,因此分别下发对应的DELETE到UGSI和主表上。
如果关闭RETURNING优化,逻辑执行:
截屏2022-08-08 10.18.07
可以看到PolarDB-X先进行了SELECT,再将没有冲突的数据(3,4,5)插入。
四、小结
本文介绍了PolarDB-X中INSERT IGNORE的执行流程。除了INSERT IGNORE之外,还有一些DML语句在执行时也需要进行重复值的判断,比如REPLACE、INSERT ON DUPLICATE KEY UPDATE等,这些语句在有GSI的情况下均采用了逻辑执行的方式,即先进行SELECT再进行判重、更新等操作,感兴趣的读者可以自行阅读相关代码。