源码如下:
CREATE TABLE buy_cnt_per_hour (
hour_of_day BIGINT,
buy_cnt BIGINT
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'buy_cnt_per_hour',
'connector.document-type' = 'user_behavior',
'connector.bulk-flush.max-actions' = '1',
'format.type' = 'json',
'update-mode' = 'append'
)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class ESTest {
public static void main(String[] args) throws Exception {
//2、设置运行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, buy_cnt BIGINT "
") WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6',"
" 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'buy_cnt_per_hour',"
" 'connector.document-type' = 'user_behavior',"
" 'connector.bulk-flush.max-actions' = '1',\n" + " 'format.type' = 'json',"
" 'update-mode' = 'append' )";
tableEnv.sqlUpdate(sinkDDL);
Table table = tableEnv.sqlQuery("select * from test_es ");
tableEnv.toRetractStream(table, Row.class).print();
streamEnv.execute("");
}
}
具体errorThe matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json' The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append*来自志愿者整理的flink邮件归档
看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem connector只支持csv format,所以会有这个错误。 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
org.apache.flink flink-sql-connector-elasticsearch6_2.11 ${flink.version} org.apache.flink flink-json ${flink.version}
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。