开发者社区> 问答> 正文

MemoryEventStoreWithBuffer#get应尽可能返回完整的事务,并且不要截断事务

为什么需要? int batchSize = 1024; ClientIdentity identity = xxx; CanalServerWithEmbedded server = new CanalServerWithEmbedded(); // here, it needs return all complete transactions, but the data maybe truncated in two invokes server.getWithoutAck(identity, batchSize);

怎么实现 现在在MemoryEventStoreWithBuffer#doGet:

long memsize = 0; if (batchMode.isItemSize()) { end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence; for (; next <= end; next++) { Event event = entries[getIndex(next)]; if (ddlIsolation && isDdl(event.getEventType())) { if (entrys.size() == 0) { entrys.add(event); end = next; } else { end = next - 1; } break; } else { // changes here: should determine whether event in a transaction entrys.add(event); } } }

示例

提问74.png

提问75.png

batchSize < 5, return tx1. get index moved to 4 batchSize >=5 && batchSize < 10, just return tx1. get index moved to 4 batchSize >= 10 && batchSize < 13, just return tx1,tx2. get index moved to 10

原提问者GitHub用户xiaoma20082008

展开
收起
后端老大 2023-04-26 16:55:30 155 0
1 条回答
写回答
取消 提交回答
  • canal设计的时候并没有严格按照transaction来获取events,但通常情况下小事务是满足事务边界获取

    目前MemoryEventStoreWithBuffer的设计,在parser后写入eventstore之前是有一个transactionBuffer用来积攒事务数据,一个事务的数据会一次性写入eventStore(这样可以确保在getWithoutACK时可以一次性看见整个事务的index,如果符合batchSize的条件,整个事务是会一起返回)

    原回答者GitHub用户agapple

    2023-04-26 19:34:03
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载