错误信息: Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)
代码: public class DDLSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String create_sql= "create table test\n" + "(\n" + "name varchar,\n" + "city varchar\n" + ")with (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'test',\n" + "'connector.properties.0.key' = 'group.id',\n" + "'connector.properties.0.value' = 'test_gd',\n" + "'connector.properties.1.key' = 'bootstrap.servers',\n" + "'connector.properties.1.value' = '127.0.0.1:9092',\n" + "'connector.property-version' = '1',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'format.type' = 'json',\n" + "'format.property-version' = '1',\n" + "'format.derive-schema' = 'true',\n" + "'update-mode' = 'append')";
tableEnv.executeSql(create_sql); Table table = tableEnv.sqlQuery("select name from test "); TableSchema schema = table.getSchema(); System.out.println(schema); DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class); tuple2DataStream.print(); tableEnv.execute("test"); //bsEnv.execute("fff"); } }
*来自志愿者整理的flink邮件归档
你使用的是Flink 1.11版本,但是你的建表语句还是用的老版本,建议更换新版本的建表语句后再试一下 参考如下: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。