开发者社区> 问答> 正文

使用1.1.4版本canal同步数据到Kafka,创建3个实例。重启后其中一个实例会报错 Faile

使用1.1.4版本canal,创建3个实例。重启后其中一个实例会报错 canal instance报错:ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms; server段报

WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-2] Error while fetching metadata with correlation id 2681 : {baidd_t1=INVALID_TOPIC_EXCEPTION} 但是 Kafka是正常的;

原提问者GitHub用户yutaoChina

展开
收起
后端老大 2023-04-26 16:54:42 270 0
1 条回答
写回答
取消 提交回答
  • 我是这样降Kafka版本的

    org.apache.kafka kafka_2.11 0.9.0.1

    org.apache.kafka kafka-clients 0.9.0.1

    然后仿照原生包的CanalKafkaProducer,写了个CustomerCanalKafkaProducer,在配置com.alibaba.otter.canal.connector.core.spi.CanalMQProducer中使用了此类。

    现在奇怪的是,之前只有本地单机时,运行完全没有问题。但是当我在另外网段的机器上建了两个集群的四个Server时,在本地单机实例上述的异常就发生了,但是另外机器的四个实例都是正常的。

    我现在只能在另外两台机器上测试,本地测试不了。

    这个议题如果按照下面的办法尝试解决的话可以关闭了。我找到了原因:服务器端Kafka版本为0.9.0.1。客户端因为使用了Springboot,导致其版本为高版本的Kafka客户端,就会产生上述的错误。

    可以参考下面改法:

    <properties> <canal.version>1.1.5</canal.version> <kafka.version>0.9.0.1</kafka.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal</artifactId> <version>${canal.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> </dependencies>
    

    然后配置自定义的CustomerCanalKafkaProducer。这个类可以仿照原生的com.alibaba.otter.canal.connector.kafka.producer.CanalKafkaProducer进行微调降版本。

    主要改动点注释掉producer.flush();还有其它报文格式的自定义。

    原回答者GitHub用户vollen725

    2023-04-26 19:32:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载