错误:
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath
看意思是找到了两个一样的类:DynamicTableSinkFactory
代码如下: package org.apache.flink.examples;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.factories.DynamicTableSinkFactory;
public class CDC2ss2 { public static void main(String[] args) throws Exception { // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv; EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); tEnv = StreamTableEnvironment.create(env, settings); String src_sql = "CREATE TABLE userss (\n" + " user_id INT,\n" + " user_nm STRING\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '10.12.5.37',\n" + " 'port' = '3306',\n" + " 'username' = 'dps',\n" + " 'password' = 'dps1234',\n" + " 'database-name' = 'rpt',\n" + " 'table-name' = 'users'\n" + " )"; tEnv.executeSql(src_sql); // 创建表 String sink="CREATE TABLE sink (\n" + " user_id INT,\n" + " user_nm STRING,\n" + " primary key(user_id) NOT ENFORCED \n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" + " 'username' = 'dps',\n" + " 'password' = 'dps1234',\n" + " 'table-name' = 'sink'\n" + " )"; String to_print_sql="insert into sink select user_id ,user_nm from userss"; tEnv.executeSql(sink); tEnv.executeSql(to_print_sql); env.execute(); }
}
详细错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.
Table options are:
'connector'='jdbc'
'password'='dps1234'
'table-name'='sink'
'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
'username'='dps'
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 18 more
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.
Ambiguous factory classes are:
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 19 more
Process finished with exit code 1*来自志愿者整理的flink邮件归档
这个错误通常是由于在classpath中存在多个实现了DynamicTableSinkFactory接口的jdbc标识符的工厂类所导致的。这可能是由于您的项目中引入了多个不同版本的依赖项,或者是由于您的项目中存在多个JAR文件,其中包含了相同的类。
你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 JDCB
的 Connector? *来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。