开发者社区> 问答> 正文

ddl es 报错怎么办?

源码如下: 

CREATE TABLE buy_cnt_per_hour (  

    hour_of_day BIGINT, 

    buy_cnt BIGINT 

) WITH ( 

    'connector.type' = 'elasticsearch', 

    'connector.version' = '6', 

    'connector.hosts' = 'http://localhost:9200', 

    'connector.index' = 'buy_cnt_per_hour', 

    'connector.document-type' = 'user_behavior', 

    'connector.bulk-flush.max-actions' = '1', 

    'format.type' = 'json', 

    'update-mode' = 'append' 

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 

import org.apache.flink.table.api.EnvironmentSettings; 

import org.apache.flink.table.api.Table; 

import org.apache.flink.table.api.java.StreamTableEnvironment; 

import org.apache.flink.types.Row; 

public class ESTest { 

public static void main(String[] args) throws Exception { 

//2、设置运行环境 

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); 

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); 

streamEnv.setParallelism(1); 

String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, buy_cnt BIGINT " 

  • ") WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6'," 

  • " 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'buy_cnt_per_hour'," 

  • " 'connector.document-type' = 'user_behavior'," 

  • " 'connector.bulk-flush.max-actions' = '1',\n" + " 'format.type' = 'json'," 

  • " 'update-mode' = 'append' )"; 

tableEnv.sqlUpdate(sinkDDL); 

Table table = tableEnv.sqlQuery("select * from test_es "); 

tableEnv.toRetractStream(table, Row.class).print(); 

streamEnv.execute(""); 

具体errorThe matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json' The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append*来自志愿者整理的flink邮件归档

展开
收起
玛丽莲梦嘉 2021-12-02 16:42:33 1040 0
1 条回答
写回答
取消 提交回答
  • 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem connector只支持csv format,所以会有这个错误。  在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。 

      org.apache.flink   flink-sql-connector-elasticsearch6_2.11   ${flink.version}       org.apache.flink   flink-json   ${flink.version}    

    [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:38:12
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载