开发者社区> 问答> 正文

canal集成kafka,配置多个destinations,造成kafka事务异常

环境信息

canal version 1.1.3 mysql version 5.7.22 kafka version 1.1.1

问题描述

canal集成kafka,配置多个destinations,造成kafka事务异常。 每一个destination起一个线程订阅canal,当线程1执行完beginTransaction()时,线程2执行beginTransaction()就会造成kafka事务异常,如下:

exception trace:

ERROR com.alibaba.otter.canal.server.CanalMQStarter - TransactionalId canal-transactional-id: Invalid transition attempted from state IN_TRANSACT ION to state IN_TRANSACTION org.apache.kafka.common.KafkaException: TransactionalId canal-transactional-id: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:587) ~[kafka-clients-1.1.1.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:106) ~[canal.server-1.1.3.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:182) [canal.server-1.1.3.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:22) [canal.server-1.1.3.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:224) [canal.server-1.1.3.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

原提问者GitHub用户nieyuan1991

展开
收起
云上静思 2023-05-04 13:20:48 143 0
2 条回答
写回答
取消 提交回答
  • 之前设计的时候考虑写出MQ主要是单例模式,对于多个队列同时的事务发送目前是有问题.

    规避的办法:

    1、一个canal server只配置一个发送队列 2、关闭kafka的事务消息模式 3、针对每个队列,单独使用一个producer

    后面我会调整一下kafka的并发模型,去掉transaction模型.

    大致的思路:

    1、提前计算一批数据的mq partition 2、多个partition队列的数据采用并行提交,单个partition采用kafka的batch提交模式,采用异步模式+callback,确保partition分区的最后一条数据的callbak后才认为提交完成

    原回答者GitHub用户agapple

    2023-05-05 10:46:22
    赞同 展开评论 打赏
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    这个问题可能是由于多个线程同时访问Kafka事务管理器导致的。您可以尝试以下几个方法:

    1、将多个destination的线程合并为一个线程,这样可以避免多个线程同时访问Kafka事务管理器。

    2、如果您需要使用多个线程,可以尝试使用Kafka的多个TransactionalId来管理事务,每个线程使用不同的TransactionalId来提交事务。

    3、您还可以尝试使用Kafka的分区来管理事务,将不同的destination分配到不同的分区中,这样可以避免多个线程同时访问同一个分区。

    希望能够解决你的问题

    2023-05-04 13:53:12
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载
云效助力企业集成安全到DevOps中 立即下载