你好, Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create语句的时候报错了 编译的jar包是jar-with-dependencies的
代码截图: public String ddlSql = String.format("CREATE TABLE %s (\n" + " 数字 BIGINT,\n" + " 味精字符串,\n" + " 用户名 STRING,\n" + " 更新时间时间戳(3)\n" + ") WITH (\n" + " '连接器' = '卡夫卡',\n" + " '主题' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " '格式' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")\n", 表名, 主题, 服务器, 组);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(ddlSql);
报错信息: 引起:org.apache.flink.table.api.ValidationException:在类路径中找不到实现“org.apache.flink.table.factories.DynamicTableSourceFactory”的标识符“kafka”的任何工厂。 可用的工厂标识符有: 数据源 在 org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) 在 org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ……还有 33 个
参考了这个http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12,还是会报一样的错
附上pom依赖: <依赖> <依赖> org.apache.flink flink-java ${flink.version} </依赖> <依赖> org.apache.flink flink-table-api-java-bridge_2.12 ${flink.version} </依赖> <依赖> org.apache.flink flink-table-api-java ${flink.version} </依赖> <依赖> org.apache.flink flink-connector-kafka_2.12 ${flink.version} </依赖> <依赖> org.apache.flink flink-sql-connector-kafka_2.12 ${flink.version} </依赖> <依赖> org.apache.flink flink-json ${flink.version} </依赖> </依赖>*来自志愿者整理的flink邮件归档
可能你的打包方式有关系。你要是直接在这个程序里面运行是可以运行的么?
如果可以在idea运行,但是打出来的jar包不能运行的话,很有可能和SPI文件有关系。 如果你用的是shade plugin,需要看下这个transformer[1]
[1] https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。