我现在的理解 EventTransactionBuffer sink将event分发给EventTransactionBuffer,如果收集到一个完整的transaction,或者达到了buffer的size上限就会触发flush,将缓存数据通过callback经过filter等操作以后发送到MemoryEventStoreWithBuffer.
MemoryEventStoreWithBuffer 接收到一个List(应该就是刚刚发过来的list事务),检查如果空位空间足够,就遍历list中的数据,逐条加入entries中。
每一次get数据的时候,是根据传入的batchSize来拿的,拿完以后会计算消息的PositionRange(这次get的头尾LogPosition),会set可被ack的点为这段消息中的最后一个TRANSACTIONBEGIN或者TRANSACTIONEND。这个ack会在元数据管理updateCursor时传进去,为了让cursor中总是记录的事务的开始或者结束。
我的疑问 现在我们在开发的东西,是希望以Transaction来作为单位来将消息发给下游的(如果过大的transaction会分割成多个transaction包,里面记录一个布尔值表示是否是事务结束包),现在主要矛盾是MemoryEventStoreWithBuffer里面存储的粒度是entry,所以根据batchSIze来拿的话拿出去可能同个事务中数据是会被分2次拿到的。
如果是从MemoryEventStoreWithBuffer拿出数据之后,再嵌套一个类似EventTransactionBuffer的缓存, ack的时候也还要保留原来的batchId来ack。
目前觉得可以去实现一个自己的MemoryTransactionStoreWithBuffer,里面的缓存的粒度用transaction包,这样每次get出来都是以transaction包为单位的形式,之后可以继承CanalInstanceWithManager覆盖里面的initEventStore方法,将MemoryEventStoreWithBuffer替换为自己实现的MemoryTransactionStoreWithBuffer。
想请教一下当时canal设计的考虑,以后有没有考虑提供按transaction包为单位的put,get,ack。然后我目前的想法是不是合理,不知道我上面的表述是不是够清楚,望回复。
原提问者GitHub用户zwangbo
要考虑大事务的情况,一般我们内部不建议这么玩。很多工作你可以在client层面来支持,比如client层面本身支持异步ack之后,你可以自己搞一个buffer,凑足一个事务后回调业务
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。