开发者社区> 问答> 正文

ddl es 报错

源码如下:

CREATE TABLE buy_cnt_per_hour (

  hour_of_day BIGINT,

  buy_cnt BIGINT

) WITH (

  'connector.type' = 'elasticsearch',

  'connector.version' = '6',

  'connector.hosts' = 'http://localhost:9200',

  'connector.index' = 'buy_cnt_per_hour',

  'connector.document-type' = 'user_behavior',

  'connector.bulk-flush.max-actions' = '1',

  'format.type' = 'json',

  'update-mode' = 'append'

)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2、设置运行环境

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

streamEnv.setParallelism(1);

String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT "

  • ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6',"

  • "'connector.hosts' = 'http://localhost:9200','connector.index' = 'buy_cnt_per_hour',"

  • "'connector.document-type' = 'user_behavior',"

  • "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json',"

  • "'update-mode' = 'append' )";

tableEnv.sqlUpdate(sinkDDL);

Table table = tableEnv.sqlQuery("select * from test_es ");

tableEnv.toRetractStream(table, Row.class).print();

streamEnv.execute("");

}

}

具体error

The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-07 15:36:06 2296 0
1 条回答
写回答
取消 提交回答
  • 这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作

    真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。

    而tableEnv.toRetractStream(table, Row.class).print(); 

    这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。*来自志愿者整理的flink邮件归档

    2021-12-07 16:41:50
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载