开发者社区> 问答> 正文

用代码执行flink sql 报错

错误:

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath

看意思是找到了两个一样的类:DynamicTableSinkFactory

代码如下: package org.apache.flink.examples;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.factories.DynamicTableSinkFactory;

public class CDC2ss2 {     public static void main(String[] args) throws Exception {         // set up execution environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         StreamTableEnvironment tEnv;         EnvironmentSettings settings = EnvironmentSettings.newInstance()                 .useBlinkPlanner()                 .inStreamingMode()                 .build();         tEnv = StreamTableEnvironment.create(env, settings);         String src_sql = "CREATE TABLE userss (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING\n" +                 ") WITH (\n" +                 "      'connector' = 'mysql-cdc',\n" +                 "      'hostname' = '10.12.5.37',\n" +                 "      'port' = '3306',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'database-name' = 'rpt',\n" +                 "      'table-name' = 'users'\n" +                 "      )";         tEnv.executeSql(src_sql); // 创建表         String sink="CREATE TABLE sink (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING,\n" +                 "     primary key(user_id)  NOT ENFORCED \n" +                 ") WITH (\n" +                 "      'connector' = 'jdbc',\n" +                 "      'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'table-name' = 'sink'\n" +                 "      )";         String to_print_sql="insert into sink select user_id  ,user_nm   from userss";          tEnv.executeSql(sink);         tEnv.executeSql(to_print_sql);         env.execute();     }

}

详细错误:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.

Table options are:

'connector'='jdbc'

'password'='dps1234'

'table-name'='sink'

'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'

'username'='dps'

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)

at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)

at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)

at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)

... 18 more

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.

Ambiguous factory classes are:

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)

... 19 more

Process finished with exit code 1*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-06 12:32:50 2173 0
2 条回答
写回答
取消 提交回答
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    这个错误通常是由于在classpath中存在多个实现了DynamicTableSinkFactory接口的jdbc标识符的工厂类所导致的。这可能是由于您的项目中引入了多个不同版本的依赖项,或者是由于您的项目中存在多个JAR文件,其中包含了相同的类。

    2023-05-14 16:10:07
    赞同 展开评论 打赏
  • 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 JDCB 的 Connector? *来自志愿者整理的flink邮件归档

    2021-12-06 14:40:17
    赞同 1 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载