当配置好HiveCatalog后, SQL-Cli 也可以查到hive库表信息 创建kafka表:
create table test.test_kafka( word VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'format' = 'json', 'json.ignore-parse-errors' = 'true' ); 在 Hive里面可以查到改表 hive > DESCRIBE FORMATTED test_kafka ........... is_generic true .........
但是我在 Flink SQL > select * from test.test_kafka; 报错: org.apache.flink.table.api.ValidationException: Unsupported options found for connector 'kafka'. Unsupported options: is_generic Supported options: connector format json.fail-on-missing-field json.ignore-parse-errors*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您描述的问题,您在Flink SQL中查询由Kafka源创建的表时遇到了ValidationException
,提示不支持的选项is_generic
。这表明Flink SQL在尝试读取表配置时遇到了Kafka连接器不识别的配置项。从Hive元数据中描述表时出现的is_generic true
可能是导致问题的原因,因为Flink的Kafka connector并不认识这个属性。
检查表定义来源: 确认test_kafka
表是否直接通过Flink SQL创建,或者最初是在Hive中定义并同步到Flink的Hive Catalog。由于Hive和Flink的表定义可能包含不同的扩展属性,直接在Hive中定义的表可能包含了Flink不理解的元数据属性,如is_generic
。
修改表定义: 如果该表是先在Hive中定义而后被Flink引用,您需要确保所有用于Flink的表定义仅包含Flink支持的选项。对于Kafka Connector,应仅使用Flink所支持的配置项进行表定义。
重新创建表: 在Flink SQL环境中,按照Flink兼容的语法重新创建此表,避免使用任何非标准或特定于Hive的配置。您的原始创建语句大部分是正确的,但需确保所有配置都与Flink兼容。例如:
CREATE TABLE test.test_kafka (
word VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'xx',
'properties.bootstrap.servers' = 'xx',
'properties.group.id' = 'test',
'format' = 'json',
'json.fail-on-missing-field' = 'false', -- 更正为false以匹配示例中的ignore-parse-errors逻辑
'json.ignore-parse-errors' = 'true'
);
验证表定义: 在执行上述创建语句后,再次尝试在Flink SQL CLI中查询该表,命令如下:
SELECT * FROM test.test_kafka;
注意事项:
通过以上步骤,您应该能够解决因不支持的配置项导致的查询错误,确保Flink SQL能正常读取Kafka表的数据。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。