有一种场景,业务上先insert一条数据,再update这条数据。otter进行同步时,update的这个操作丢失了,日志中也没有报错信息。是node在消费时因为多线程原因导致先操作了update失败,然后insert了数据吗?
在同一批EventData中,insert和update的合并有个bug,RowKey这个对象在判断相等性和计算hashcode时,是直接调用EventColumn的相关方法,这样会把EventColumn的isUpdate属性计算进去,造成insert和update合并不成一个insert。
代码没有全部看完,发现一个问题, MessageParser中,在解析update时,会在eventdata中放入keys以及oldkeys(oldkeys isupdate 为true,并且只在eventtype为update,!oldKeys.equals(keys)才会设置oldkeys):
} else if (eventType.isUpdate()) { // 获取变更前的主键. for (Column column : beforeColumns) { if (isKey(tableHolder, tableName, column)) { oldKeyColumns.put(column.getName(), copyEventColumn(column, true, tableHolder)); // 同时记录一下new // key,因为mysql5.6之后出现了minimal模式,after里会没有主键信息,需要在before记录中找 keyColumns.put(column.getName(), copyEventColumn(column, true, tableHolder)); } else { if (needAllColumns && entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE) { // 针对行记录同步时,针对oracle记录一下非主键的字段,因为update时针对未变更的字段在aftercolume里没有 notKeyColumns.put(column.getName(), copyEventColumn(column, isRowMode, tableHolder)); } } } if (!keyColumns.isEmpty()) { eventData.setKeys(keys); if (eventData.getEventType().isUpdate() && !oldKeys.equals(keys)) { // update类型,如果存在主键不同,则记录下old // keys为变更前的主键 eventData.setOldKeys(oldKeys); } 但是在DbLoadMerger中打印日志发现 keys和oldkeys是一样的(isupdate 为 false),这不和逻辑 在transform过程中,·RowDataTransformer中,对于oldkeys的处理:
List<EventColumn> tnewPks = new ArrayList<EventColumn>();
List<EventColumn> toldPks = new ArrayList<EventColumn>();
for (int i = 0; i < pks.size(); i++) {
EventColumn newPk = pks.get(i);
EventColumn oldPk = oldPks.get(i);
// 转化new pk
EventColumn tnewPk = translateColumn(data, newPk, tableHolder, dataMediaPair, translateColumnNames);
if (tnewPk != null) {
tnewPks.add(tnewPk);
// 转化old pk,这里不能再用translateColumnNames了,因为转化new
// pk已经remove过一次view name了
toldPks.add(translateColumn(tnewPk, oldPk.getColumnValue(), dataMediaPair));
}
}
再次将oldkeys的isUpdate字段更新为keys的isUpdate的value:false,正确是应该仍然为oldkeys的值吧
isUpdate 正确的值应该 当前值isUpdate=false old值 isUpdate=true 所有才会导致EventColumn的hashcode 不一致 EventColumn public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((columnName == null) ? 0 : columnName.hashCode()); result = prime * result + columnType; result = prime * result + ((columnValue == null) ? 0 : columnValue.hashCode()); result = prime * result + index; result = prime * result + (isKey ? 1231 : 1237); result = prime * result + (isNull ? 1231 : 1237); result = prime * result + (isUpdate ? 1231 : 1237); return result; } DbLoadMerger.RowKey public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((keys == null) ? 0 : keys.hashCode()); --直接调用EventColumn.hashcode result = prime * result + ((schemaName == null) ? 0 : schemaName.hashCode()); result = prime * result + ((tableId == null) ? 0 : tableId.hashCode()); result = prime * result + ((tableName == null) ? 0 : tableName.hashCode()); return result; } 最终的结果导致insert +update 没有合并 成一条insert 语句去执行 再多线程的情况下导致就有可能导致update语句不执行 特别是数据库insert 后马上做update 具体可以看DbLoadAction.doTwoPhase
问题的根源在于RowDataTransformer // 转化new pk EventColumn tnewPk = translateColumn(data, newPk, tableHolder, dataMediaPair, translateColumnNames); if (tnewPk != null) { tnewPks.add(tnewPk); // 转化old pk,这里不能再用translateColumnNames了,因为转化new // pk已经remove过一次view name了 toldPks.add(translateColumn(tnewPk, oldPk.getColumnValue(), dataMediaPair)); }
修改后 //modify by yuyiding 20180725 主键的isupdate还是oldpk的isupdate EventColumn transEventColumn = translateColumn(tnewPk, oldPk.getColumnValue(), dataMediaPair); transEventColumn.setUpdate(oldPk.isUpdate()); toldPks.add(transEventColumn);
已经测试过insert +update merge成了一条insert (这个改动目前看不会影响别的) 请帮忙确认下这个改动的影响 🙏
原提问者GitHub用户 victorqian
修改的思路:将pk的isUpdate设置为true,因为pk的update有内置的逻辑来处理. 当时调整应该是为DRDS做的一个变更导致 新版本已修复
原回答者GitHub用户
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。