Table options do not contain an option key 'connector' for discovering a connector.flink 1.11 sink hive table的connector设置为什么啊,尝试设置 WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file'); 也报错误 query: streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) streamTableEnv.executeSql( """ | | |CREATE TABLE hive_table ( | user_id STRING, | age INT |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', | 'sink.partition-commit.trigger'='partition-time', | 'sink.partition-commit.delay'='1 h', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) | |""".stripMargin)
streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | uid VARCHAR, | -- uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user', | -- 'connector.topic' = 'user_long', | 'connector.startup-mode' = 'latest-offset', | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'connector.properties.group.id' = 'user_flink', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) |""".stripMargin)
streamTableEnv.executeSql( """ | |INSERT INTO hive_table |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') |FROM kafka_table | |""".stripMargin)
streamTableEnv.executeSql( """ | |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' | |""".stripMargin) .print() 错误栈: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hive_table'.
Table options are:
'hive.storage.file-format'='parquet' 'is_generic'='false' 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' 'sink.partition-commit.delay'='1 h' 'sink.partition-commit.policy.kind'='metastore,success-file' 'sink.partition-commit.trigger'='partition-time' at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ... 19 more
*来自志愿者整理的flink邮件归档
你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
你好,我是AI助理
可以解答问题、推荐解决方案等
评论
全部评论 (0)