有人遇到过这个问题吗,我用的flink1.7,oracle-cdc 2.4.1 ,oracle版本12c,我创建了一张cdc表,然后直接print输出,在源表里新插入数据时报file is not a valid field name这个错,网上说什么flink字段类型和数据库不匹配,可是我字段类型是匹配的啊
我尝试吧oracle-cdc 2.4改成了3.1.1,运行起来代码直接报错:
java.lang.ClassNotFoundException: org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder
代码demo如下:
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 9091);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
env.enableCheckpointing(3000);
env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建源端逻辑表
String createSourceTableSQL = "CREATE TABLE TEST3 (" +
" ID bigint , "+
" NAME string, "+
" AGE bigint, "+
" PRIMARY KEY (ID) NOT ENFORCED "+
") WITH (" +
" 'connector' = 'oracle-cdc', " +
" 'hostname' = '10.26.73.98', " +
" 'port' = '1521', " +
" 'username' = 'cdc_cdl', " +
" 'password' = 'cdc_cdl', " +
" 'database-name' = 'testdb', " +
" 'schema-name' = 'TEST', " +
" 'table-name' = 'CDL_TEST3', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'debezium.log.mining.strategy' = 'online_catalog', " +
" 'debezium.log.mining.continuous.mine' = 'true'," +
" 'debezium.database.tablename.case.insensitive'='false' " +
");";
tableEnv.executeSql(createSourceTableSQL);
tableEnv.executeSql("select * from TEST3").print();
maven如下:
8
8
1.17.1
2.12
org.apache.flink
flink-table-runtime
${flink.version}
org.apache.flink
flink-runtime-web
${flink.version}
org.apache.flink
flink-connector-jdbc
3.1.0-1.17
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
</dependency>
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。