flink 写入Elasticsearch 踩坑小结

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: flink 写入Elasticsearch 踩坑小结

flink 流式写入Elasticsearch 会经常出现各种错误:

1:Flink出现Caused by: java.lang.LinkageError: loader constraint violation错误

这是由于Flink的包加载机制引起的。
-- 解决方案一
原因: 类加载顺序问题,可以在flink-conf.yaml中加入
classloader.resolve-order: parent-first
Flink的默认加载是child-first。
但是用了parent-first配置有可能出现类冲突问题。解决办法只针对个别包出来,不要一律用parent-first, 配置如下:
classloader.parent-first-patterns.additional: javax.script; jdk;
-- 解决方案二
pom.xml 中将Elasticsearch相关依赖放在最开始位置。

classloader.parent-first-patterns.default,不建议修改,固定为以下这些值:

java.;
scala.;
org.apache.flink.;
com.esotericsoftware.kryo;
org.apache.hadoop.;
javax.annotation.;
org.slf4j;
org.apache.log4j;
org.apache.logging;
org.apache.commons.logging;
ch.qos.logback;
org.xml;
javax.xml;
org.apache.xerces;
org.w3c
  • classloader.parent-first-patterns.additional:除了上一个参数指定的类之外,用户如果有其他类以child-first模式会发生冲突,而希望以双亲委派模型来加载的话,可以额外指定(分号分隔)。

2:Flink写入elaticSearch 遇到basic 认证问题

错误信息:

Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://es1:9200], URI [/xxx/_count?ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true], status line [HTTP/1.1 403 Forbidden]
{"error":{"root_cause":[{"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"}],"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"},"status":403}

原因:

看到这个问题,很疑惑,我使用用户名密码认证了啊,明明python,curl都行,为啥java不行,难道是我调用的api不对,有一遍遍的查api。后面实在是没办法,进行HttpProxy代理抓包,发现的确是没有Http的认证,在请求Headers中,没有Basic的Auth信息。最后,没办法,放弃了,就直接在Http中Headers手动根据用户名密码进行Base64转码设置进去,解决这个问题。

public class ESTest2 {
    public static void main(String[] args) throws IOException {
        String user = "user";
        String pwd = "pwd";
        String auth = Base64.encodeBase64String((user+":"+pwd).getBytes());
        System.out.println(auth);
        RestClientBuilder clientBuilder = RestClient.builder(new HttpHost("es1", 9200));
        clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});
        RestHighLevelClient client = new RestHighLevelClient(clientBuilder);
        CountRequest countRequest = new CountRequest();
        countRequest.indices(index_name);
        CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
        System.out.println(countResponse.getCount());
        client.close();
    }
}

flink自定义ESsink Demo

import org.apache.commons.codec.binary.Base64;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpHost;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.Map;
/**
 * @description: 自定义es sink
 * @author: mdz
 * @date: 2021/7/14
 **/
public class MyESSink extends RichSinkFunction<Map<String, String>> {
    private BulkRequest request = null;
    private RestHighLevelClient client = null;
    private CredentialsProvider credentialsProvider = null;
    private String esHost = null;
    private int esPort;
    private String esUsername = null;
    private String esPassword = null;
    private String index = null;
    private String type = null;
    public MyESSink(String esHost, int esPort, String esUsername, String esPassword, String index, String type) {
        this.esHost = esHost;
        this.esPort = esPort;
        this.esUsername = esUsername;
        this.esPassword = esPassword;
        this.index = index;
        this.type = type;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        String user = esUsername;
        String pwd = esPassword;
        String auth = Base64.encodeBase64String((user+":"+pwd).getBytes());
        RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(esHost, esPort));
        clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});
        client = new RestHighLevelClient(clientBuilder);
        super.open(parameters);
    }
    @Override
    public void invoke(Map<String, String> value, Context context) throws Exception {
        request = new BulkRequest();
        request.add(new IndexRequest(index,type).id(value.get("id")).source(value,XContentType.JSON));
        client.bulk(request, RequestOptions.DEFAULT);
    }
    @Override
    public void close() throws Exception {
        client.close();
        super.close();
    }
}

flink 读取kafka sink 到Es

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import java.util.*;
/**
 * @description: pulsar写入到es
 * @author: mdz
 * @date: 2021/5/19
 **/
public class Kafka2Es {
    public static void main(String[] args) throws Exception {
        //创建环境,设置参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
        //定义kafka相关参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");
        properties.setProperty("group.id","gp_sdbd-etl-test07");
        properties.setProperty("request.timeout.ms", "600000");
        properties.setProperty("enable.auto.commit","false");
        //从kafka获取数据
        FlinkKafkaConsumer<Object> smartnewsSource =new FlinkKafkaConsumer("my_topic1",
                new JSONKeyValueDeserializationSchema(true),
                properties
        );
        smartnewsSource.setStartFromEarliest();   // 从最开始处消费
        DataStreamSource<Object> smartnewsDS = env.addSource(smartnewsSource);
        SingleOutputStreamOperator<Map<String, String>> mapDS = smartnewsDS.map(new MapFunction<Object, Map<String, String>>() {
            @Override
            public HashMap<String, String> map(Object value) throws Exception {
                HashMap<String, String> map1 = new HashMap<>();
                map1.put("data",value.toString());
                return map1;
            }
        });
//        //写入es
          mapDS.addSink(new MyESSink("esIp",9200,"user","pass","index","type"));
        env.execute();
    }
}
    相关文章
    |
    9月前
    |
    Oracle 关系型数据库 API
    实时计算 Flink版产品使用合集之当sink到elasticsearch时,可以指定es的指定字段吗
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    实时计算 Flink版产品使用合集之当sink到elasticsearch时,可以指定es的指定字段吗
    |
    9月前
    |
    canal NoSQL 关系型数据库
    实时计算 Flink版产品使用合集之如何在ElasticSearch中查看同步的数据
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    9月前
    |
    Kubernetes 关系型数据库 MySQL
    实时计算 Flink版产品使用合集之在Kubernetes(k8s)中同步MySQL变更到Elasticsearch该怎么操作
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    9月前
    |
    Prometheus 监控 Cloud Native
    实时计算 Flink版操作报错之在使用ES时遇到“java.lang.IllegalStateException: The elasticsearch emitter must be serializable”,是什么原因
    在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
    |
    9月前
    |
    关系型数据库 MySQL Java
    实时计算 Flink版操作报错之遇到java.lang.IllegalStateException: The elasticsearch emitter must be serializable.的错误,如何处理
    在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
    |
    9月前
    |
    SQL Oracle 关系型数据库
    实时计算 Flink版产品使用合集之源MySQL表新增字段后,要同步这个改变到Elasticsearch的步骤是什么
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    9月前
    |
    SQL Java 关系型数据库
    实时计算 Flink版产品使用合集之怎么连接 Elasticsearch
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    3月前
    |
    存储 安全 数据管理
    如何在 Rocky Linux 8 上安装和配置 Elasticsearch
    本文详细介绍了在 Rocky Linux 8 上安装和配置 Elasticsearch 的步骤,包括添加仓库、安装 Elasticsearch、配置文件修改、设置内存和文件描述符、启动和验证 Elasticsearch,以及常见问题的解决方法。通过这些步骤,你可以快速搭建起这个强大的分布式搜索和分析引擎。
    107 5
    |
    4月前
    |
    存储 JSON Java
    elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
    这篇文章是关于Elasticsearch的学习指南,包括了解Elasticsearch、版本对应、安装运行Elasticsearch和Kibana、安装head插件和elasticsearch-ik分词器的步骤。
    414 0
    elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
    |
    5月前
    |
    NoSQL 关系型数据库 Redis
    mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
    mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
    mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo