开发者社区> 问答> 正文

Flink JDBC Driver是否支持创建流数据表

Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: 

Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); 

Statement statement = connection.createStatement(); 

statement.executeUpdate( 

"CREATE TABLE table_kafka (\n" + 

" user_id BIGINT,\n" + 

" item_id BIGINT,\n" + 

" category_id BIGINT,\n" + 

" behavior STRING,\n" + 

" ts TIMESTAMP(3),\n" + 

" proctime as PROCTIME(),\n" + 

" WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + 

") WITH (\n" + 

" 'connector.type' = 'kafka', \n" + 

" 'connector.version' = 'universal', \n" + 

" 'connector.topic' = 'flink_im02', \n" + 

" 'connector.properties.group.id' = 'flink_im02_new',\n" + 

" 'connector.startup-mode' = 'earliest-offset', \n" + 

" 'connector.properties.zookeeper.connect' = '...:2181', \n" + 

" 'connector.properties.bootstrap.servers' = '...:9092', \n" + 

" 'format.type' = 'csv',\n" + 

" 'format.field-delimiter' = '|'\n" + 

")"); 

ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); 

while (rs1.next()) { 

System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); 

}  

statement.close(); 

connection.close(); 

报错: 

Reason: Required context properties mismatch. 

The matching candidates: 

org.apache.flink.table.sources.CsvBatchTableSourceFactory 

Mismatched properties: 

'connector.type' expects 'filesystem', but is 'kafka'*来自志愿者整理的flink邮件归档

展开
收起
玛丽莲梦嘉 2021-12-02 16:44:22 1053 0
1 条回答
写回答
取消 提交回答
  • 参考下这个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector  

    下面的语法应该是不支持的: 

    'format.type' = 'csv',\n" + 

    " 'format.field-delimiter' = '|'\n" 

    下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} 

    tEnv.sqlUpdate("CREATE TABLE pick_order (\n" 

    • " order_no VARCHAR,\n" 

    • " status INT\n" 

    • ") WITH (\n" 

    • " 'connector.type' = 'kafka',\n" 

    • " 'connector.version' = 'universal',\n" 

    • " 'connector.topic' = 'wanglei_test',\n" 

    • " 'connector.startup-mode' = 'latest-offset',\n" 

    • " 'connector.properties.0.key' = 'zookeeper.connect',\n" 

    • " 'connector.properties.0.value' = 'xxx:2181',\n" 

    • " 'connector.properties.1.key' = 'bootstrap.servers',\n" 

    • " 'connector.properties.1.value' = 'xxx:9092',\n" 

    • " 'update-mode' = 'append',\n" 

    • " 'format.type' = 'json',\n" 

    • " 'format.derive-schema' = 'true'\n" 

    • ")");*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:40:28
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载