开发者社区> 问答> 正文

cdc代码报错

我运行ververica/flink-cdc-connectors git上的demo代码,报错:

2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from RUNNING to FAILED.

java.lang.AbstractMethodError: org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V

at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)

at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)

at io.debezium.embedded.EmbeddedEngine. (EmbeddedEngine.java:583)

at io.debezium.embedded.EmbeddedEngine. (EmbeddedEngine.java:80)

at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)

at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)

at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)

at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

源码:

public class CdcTest {

public static void main(String[] args) throws Exception {

SourceFunction sourceFunction = MySQLSource. builder()

.hostname("localhost")

.port(3306)

.databaseList("sohay") // monitor all tables under inventory database

.username("root")

.password("123456")

.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String

.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute();

}

}*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-07 11:19:48 1411 0
1 条回答
写回答
取消 提交回答
  • 环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。*来自志愿者整理的flink邮件归档

    2021-12-07 11:22:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载