开发者社区> 问答> 正文

关于EventTransactionBuffer与MemoryEventStoreWithBuffe

我现在的理解 EventTransactionBuffer sink将event分发给EventTransactionBuffer,如果收集到一个完整的transaction,或者达到了buffer的size上限就会触发flush,将缓存数据通过callback经过filter等操作以后发送到MemoryEventStoreWithBuffer.

MemoryEventStoreWithBuffer 接收到一个List(应该就是刚刚发过来的list事务),检查如果空位空间足够,就遍历list中的数据,逐条加入entries中。

每一次get数据的时候,是根据传入的batchSize来拿的,拿完以后会计算消息的PositionRange(这次get的头尾LogPosition),会set可被ack的点为这段消息中的最后一个TRANSACTIONBEGIN或者TRANSACTIONEND。这个ack会在元数据管理updateCursor时传进去,为了让cursor中总是记录的事务的开始或者结束。

我的疑问 现在我们在开发的东西,是希望以Transaction来作为单位来将消息发给下游的(如果过大的transaction会分割成多个transaction包,里面记录一个布尔值表示是否是事务结束包),现在主要矛盾是MemoryEventStoreWithBuffer里面存储的粒度是entry,所以根据batchSIze来拿的话拿出去可能同个事务中数据是会被分2次拿到的。

如果是从MemoryEventStoreWithBuffer拿出数据之后,再嵌套一个类似EventTransactionBuffer的缓存, ack的时候也还要保留原来的batchId来ack。

目前觉得可以去实现一个自己的MemoryTransactionStoreWithBuffer,里面的缓存的粒度用transaction包,这样每次get出来都是以transaction包为单位的形式,之后可以继承CanalInstanceWithManager覆盖里面的initEventStore方法,将MemoryEventStoreWithBuffer替换为自己实现的MemoryTransactionStoreWithBuffer。

想请教一下当时canal设计的考虑,以后有没有考虑提供按transaction包为单位的put,get,ack。然后我目前的想法是不是合理,不知道我上面的表述是不是够清楚,望回复。

原提问者GitHub用户zwangbo

展开
收起
绿子直子 2023-05-09 10:36:42 83 0
1 条回答
写回答
取消 提交回答
  • 要考虑大事务的情况,一般我们内部不建议这么玩。很多工作你可以在client层面来支持,比如client层面本身支持异步ack之后,你可以自己搞一个buffer,凑足一个事务后回调业务

    原回答者GitHub用户agapple

    2023-05-10 10:01:04
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载