环境信息
canal version 1.1.3 mysql version 5.7.22 kafka version 1.1.1
问题描述
canal集成kafka,配置多个destinations,造成kafka事务异常。 每一个destination起一个线程订阅canal,当线程1执行完beginTransaction()时,线程2执行beginTransaction()就会造成kafka事务异常,如下:
exception trace:
ERROR com.alibaba.otter.canal.server.CanalMQStarter - TransactionalId canal-transactional-id: Invalid transition attempted from state IN_TRANSACT ION to state IN_TRANSACTION org.apache.kafka.common.KafkaException: TransactionalId canal-transactional-id: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:587) ~[kafka-clients-1.1.1.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:106) ~[canal.server-1.1.3.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:182) [canal.server-1.1.3.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:22) [canal.server-1.1.3.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:224) [canal.server-1.1.3.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
原提问者GitHub用户nieyuan1991
之前设计的时候考虑写出MQ主要是单例模式,对于多个队列同时的事务发送目前是有问题.
规避的办法:
1、一个canal server只配置一个发送队列 2、关闭kafka的事务消息模式 3、针对每个队列,单独使用一个producer
后面我会调整一下kafka的并发模型,去掉transaction模型.
大致的思路:
1、提前计算一批数据的mq partition 2、多个partition队列的数据采用并行提交,单个partition采用kafka的batch提交模式,采用异步模式+callback,确保partition分区的最后一条数据的callbak后才认为提交完成
原回答者GitHub用户agapple
这个问题可能是由于多个线程同时访问Kafka事务管理器导致的。您可以尝试以下几个方法:
1、将多个destination的线程合并为一个线程,这样可以避免多个线程同时访问Kafka事务管理器。
2、如果您需要使用多个线程,可以尝试使用Kafka的多个TransactionalId来管理事务,每个线程使用不同的TransactionalId来提交事务。
3、您还可以尝试使用Kafka的分区来管理事务,将不同的destination分配到不同的分区中,这样可以避免多个线程同时访问同一个分区。
希望能够解决你的问题
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。