开发者社区> 问答> 正文

Apache Flink(v1.6.0)验证Elasticsearch Sink(v6.4)

我正在使用Apache Flink v1.6.0,我正在尝试写入Elasticsearch v6.4.0,它存放在Elastic Cloud中。我在向Elastic Cloud集群进行身份验证时遇到问题。

我已经能够让Flink写入本地Elasticsearch v6.4.0节点,该节点没有使用以下代码进行加密:

/*

Elasticsearch Configuration

*/
List httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(

    httpHosts,
    new ElasticsearchSinkFunction<ObjectNode>() {
        private IndexRequest createIndexRequest(ObjectNode payload) {

            // remove the value node so the fields are at the base of the json payload
            JsonNode jsonOutput = payload.get("value");

            return Requests.indexRequest()
                    .index("raw-payload")
                    .type("payload")
                    .source(jsonOutput.toString(), XContentType.JSON);
        }

        @Override
        public void process(ObjectNode payload, RuntimeContext ctx, RequestIndexer indexer) {
            indexer.add(createIndexRequest(payload));
        }
    }

);

// set number of events to be seen before writing to Elasticsearch
esSinkBuilder.setBulkFlushMaxActions(1);

// finally, build and add the sink to the job's pipeline
stream.addSink(esSinkBuilder.build());
然而,当我尝试添加验证到代码库中,作为记录在这里的弗林克文档和这里对应Elasticsearch Java文档上。看起来像这样:

// provide a RestClientFactory for custom configuration on the internally created REST client
Header[] defaultHeaders = new Header[]{new BasicHeader("username", "password")};
esSinkBuilder.setRestClientFactory(

    restClientBuilder -> {
        restClientBuilder.setDefaultHeaders(defaultHeaders);
    }

);
执行作业时出现以下错误:

14:49:54,700 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: org.elasticsearch.ElasticsearchStatusException: method [HEAD], host [https://XXXXXXXXXXXXXX.europe-west1.gcp.cloud.es.io:9243], URI [/], status line [HTTP/1.1 401 Unauthorized]

at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at com.downuk.AverageStockSalePrice.main(AverageStockSalePrice.java:146)

Caused by: org.elasticsearch.ElasticsearchStatusException: method [HEAD], host [https://XXXXXXXXXXXXXX.europe-west1.gcp.cloud.es.io:9243], URI [/], status line [HTTP/1.1 401 Unauthorized]

at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:625)

展开
收起
flink小助手 2018-12-10 13:12:37 5667 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {

        // TODO Additional rest client args go here - authentication headers for secure connections etc...
      }
    })
    
    2019-07-17 23:19:11
    赞同 1 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
Hybrid Cloud and Apache Spark 立即下载

相关镜像