大家好,我直接使用ddl定义kafka数据源出现了问题。
kafka里是logstash采上来的json格式数据。
ddl如下:
CREATE TABLE vpn_source (
c_real_ip VARCHAR,
d_real_ip VARCHAR,
c_real_port INT,
d_real_port INT,
logtype INT,
user
VARCHAR,
host_ip VARCHAR
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'vpnlog',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'flink_test',
'format.type' = 'json'
)
报错如下:
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.properties.bootstrap.servers=10.208.0.73:9092
connector.properties.group.id=flink_test
connector.properties.zookeeper.connect=10.208.0.73:2181
connector.topic=vpnlog
connector.type=kafka
connector.version=universal
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=c_real_ip
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=d_real_ip
schema.2.data-type=INT
schema.2.name=c_real_port
schema.3.data-type=INT
schema.3.name=d_real_port
schema.4.data-type=INT
schema.4.name=logtype
schema.5.data-type=VARCHAR(2147483647)
schema.5.name=user
schema.6.data-type=VARCHAR(2147483647)
schema.6.name=host_ip
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.filesystem.FileSystemTableFactory
flink环境
本地源码编译的flink1.11,直接通过start-cluster.sh启动的本地环境。*来自志愿者整理的flink邮件归档
你的DDL没有问题,问题应该是你没有把kafka的jar包添加进来。你可以到 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html 这里下载kafaka的universal版本的jar包。关于如何把jar包添加到pyflink里面使用,你可以参考文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/faq.html#adding-jar-files*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。