为什么需要? int batchSize = 1024; ClientIdentity identity = xxx; CanalServerWithEmbedded server = new CanalServerWithEmbedded(); // here, it needs return all complete transactions, but the data maybe truncated in two invokes server.getWithoutAck(identity, batchSize);
怎么实现 现在在MemoryEventStoreWithBuffer#doGet:
long memsize = 0; if (batchMode.isItemSize()) { end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence; for (; next <= end; next++) { Event event = entries[getIndex(next)]; if (ddlIsolation && isDdl(event.getEventType())) { if (entrys.size() == 0) { entrys.add(event); end = next; } else { end = next - 1; } break; } else { // changes here: should determine whether event in a transaction entrys.add(event); } } }
示例
batchSize < 5, return tx1. get index moved to 4 batchSize >=5 && batchSize < 10, just return tx1. get index moved to 4 batchSize >= 10 && batchSize < 13, just return tx1,tx2. get index moved to 10
原提问者GitHub用户xiaoma20082008
canal设计的时候并没有严格按照transaction来获取events,但通常情况下小事务是满足事务边界获取
目前MemoryEventStoreWithBuffer的设计,在parser后写入eventstore之前是有一个transactionBuffer用来积攒事务数据,一个事务的数据会一次性写入eventStore(这样可以确保在getWithoutACK时可以一次性看见整个事务的index,如果符合batchSize的条件,整个事务是会一起返回)
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。