开发者社区> 问答> 正文

flink1.11 DDL定义kafka source报错了怎么办?

错误信息: Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)

代码: public class DDLSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String create_sql= "create table test\n" + "(\n" + "name varchar,\n" + "city varchar\n" + ")with (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'test',\n" + "'connector.properties.0.key' = 'group.id',\n" + "'connector.properties.0.value' = 'test_gd',\n" + "'connector.properties.1.key' = 'bootstrap.servers',\n" + "'connector.properties.1.value' = '127.0.0.1:9092',\n" + "'connector.property-version' = '1',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'format.type' = 'json',\n" + "'format.property-version' = '1',\n" + "'format.derive-schema' = 'true',\n" + "'update-mode' = 'append')";

tableEnv.executeSql(create_sql); Table table = tableEnv.sqlQuery("select name from test "); TableSchema schema = table.getSchema(); System.out.println(schema); DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class); tuple2DataStream.print(); tableEnv.execute("test"); //bsEnv.execute("fff"); } }

*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-08 10:59:13 901 0
1 条回答
写回答
取消 提交回答
  • 你使用的是Flink 1.11版本,但是你的建表语句还是用的老版本,建议更换新版本的建表语句后再试一下 参考如下: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

    *来自志愿者整理的flink

    2021-12-08 11:19:36
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载