Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable ( user_id STRING, user_name STRING uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'users' );Connector Options | Option | Required | Default | Type | Description | | connector | required | (none) | String | Specify what connector to use, valid values are: elasticsearch-6: connect to Elasticsearch 6.x cluster elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster | | hosts | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. | | index | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. | | document-type | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. | | document-id.key-delimiter | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | | failure-handler | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are: fail: throws an exception if a request fails and thus causes a job failure. ignore: ignores failures and drops the request. retry_rejected: re-adds requests that have failed due to queue capacity saturation. custom class name: for failure handling with a ActionRequestFailureHandler subclass. | | sink.flush-on-checkpoint | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. | | sink.bulk-flush.max-actions | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. | | sink.bulk-flush.max-size | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. | | sink.bulk-flush.interval | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | | sink.bulk-flush.backoff.strategy | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are: DISABLED: no retry performed, i.e. fail after the first request error. CONSTANT: wait for backoff delay between retries. EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries. | | sink.bulk-flush.backoff.max-retries | optional | 8 | Integer | Maximum number of backoff retries. | | sink.bulk-flush.backoff.delay | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. | | connection.max-retry-timeout | optional | (none) | Duration | Maximum timeout between retries. | | connection.path-prefix | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' | | format | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |*来自志愿者整理的flink
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
[1] https://issues.apache.org/jira/browse/FLINK-18361*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。