开发者社区> 问答> 正文

flink 1.11 es未定义pk的sink问题

根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: 不确定是我配置使用的方式不对,还是确实存在bug。。

CREATE TABLE ES6_SENSORDATA_OUTPUT ( event varchar, user_id varchar, distinct_id varchar, _date varchar, _event_time varchar, recv_time varchar, _browser_version varchar, path_name varchar, _search varchar, event_type varchar, _current_project varchar, message varchar, stack varchar, component_stack varchar, _screen_width varchar, _screen_height varchar ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<ES_YUNTU.SERVERS>', 'index' = 'flink_sensordata_target_event', 'document-type' = 'default', 'document-id.key-delimiter' = '$', 'sink.bulk-flush.interval' = '1000', 'failure-handler' = 'fail', 'format' = 'json' )

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-07 16:05:47 792 0
1 条回答
写回答
取消 提交回答
  • 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].

    所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。

    [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102

    [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509*来自志愿者整理的flink邮件归档

    2021-12-07 16:19:44
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载