flink 写入Elasticsearch 踩坑小结

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: flink 写入Elasticsearch 踩坑小结

flink 流式写入Elasticsearch 会经常出现各种错误:

1:Flink出现Caused by: java.lang.LinkageError: loader constraint violation错误

这是由于Flink的包加载机制引起的。
-- 解决方案一
原因: 类加载顺序问题,可以在flink-conf.yaml中加入
classloader.resolve-order: parent-first
Flink的默认加载是child-first。
但是用了parent-first配置有可能出现类冲突问题。解决办法只针对个别包出来,不要一律用parent-first, 配置如下:
classloader.parent-first-patterns.additional: javax.script; jdk;
-- 解决方案二
pom.xml 中将Elasticsearch相关依赖放在最开始位置。

classloader.parent-first-patterns.default,不建议修改,固定为以下这些值:

java.;
scala.;
org.apache.flink.;
com.esotericsoftware.kryo;
org.apache.hadoop.;
javax.annotation.;
org.slf4j;
org.apache.log4j;
org.apache.logging;
org.apache.commons.logging;
ch.qos.logback;
org.xml;
javax.xml;
org.apache.xerces;
org.w3c
  • classloader.parent-first-patterns.additional:除了上一个参数指定的类之外,用户如果有其他类以child-first模式会发生冲突,而希望以双亲委派模型来加载的话,可以额外指定(分号分隔)。

2:Flink写入elaticSearch 遇到basic 认证问题

错误信息:

Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://es1:9200], URI [/xxx/_count?ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true], status line [HTTP/1.1 403 Forbidden]
{"error":{"root_cause":[{"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"}],"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"},"status":403}

原因:

看到这个问题,很疑惑,我使用用户名密码认证了啊,明明python,curl都行,为啥java不行,难道是我调用的api不对,有一遍遍的查api。后面实在是没办法,进行HttpProxy代理抓包,发现的确是没有Http的认证,在请求Headers中,没有Basic的Auth信息。最后,没办法,放弃了,就直接在Http中Headers手动根据用户名密码进行Base64转码设置进去,解决这个问题。

public class ESTest2 {
    public static void main(String[] args) throws IOException {
        String user = "user";
        String pwd = "pwd";
        String auth = Base64.encodeBase64String((user+":"+pwd).getBytes());
        System.out.println(auth);
        RestClientBuilder clientBuilder = RestClient.builder(new HttpHost("es1", 9200));
        clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});
        RestHighLevelClient client = new RestHighLevelClient(clientBuilder);
        CountRequest countRequest = new CountRequest();
        countRequest.indices(index_name);
        CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
        System.out.println(countResponse.getCount());
        client.close();
    }
}

flink自定义ESsink Demo

import org.apache.commons.codec.binary.Base64;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpHost;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.Map;
/**
 * @description: 自定义es sink
 * @author: mdz
 * @date: 2021/7/14
 **/
public class MyESSink extends RichSinkFunction<Map<String, String>> {
    private BulkRequest request = null;
    private RestHighLevelClient client = null;
    private CredentialsProvider credentialsProvider = null;
    private String esHost = null;
    private int esPort;
    private String esUsername = null;
    private String esPassword = null;
    private String index = null;
    private String type = null;
    public MyESSink(String esHost, int esPort, String esUsername, String esPassword, String index, String type) {
        this.esHost = esHost;
        this.esPort = esPort;
        this.esUsername = esUsername;
        this.esPassword = esPassword;
        this.index = index;
        this.type = type;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        String user = esUsername;
        String pwd = esPassword;
        String auth = Base64.encodeBase64String((user+":"+pwd).getBytes());
        RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(esHost, esPort));
        clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});
        client = new RestHighLevelClient(clientBuilder);
        super.open(parameters);
    }
    @Override
    public void invoke(Map<String, String> value, Context context) throws Exception {
        request = new BulkRequest();
        request.add(new IndexRequest(index,type).id(value.get("id")).source(value,XContentType.JSON));
        client.bulk(request, RequestOptions.DEFAULT);
    }
    @Override
    public void close() throws Exception {
        client.close();
        super.close();
    }
}

flink 读取kafka sink 到Es

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import java.util.*;
/**
 * @description: pulsar写入到es
 * @author: mdz
 * @date: 2021/5/19
 **/
public class Kafka2Es {
    public static void main(String[] args) throws Exception {
        //创建环境,设置参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
        //定义kafka相关参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");
        properties.setProperty("group.id","gp_sdbd-etl-test07");
        properties.setProperty("request.timeout.ms", "600000");
        properties.setProperty("enable.auto.commit","false");
        //从kafka获取数据
        FlinkKafkaConsumer<Object> smartnewsSource =new FlinkKafkaConsumer("my_topic1",
                new JSONKeyValueDeserializationSchema(true),
                properties
        );
        smartnewsSource.setStartFromEarliest();   // 从最开始处消费
        DataStreamSource<Object> smartnewsDS = env.addSource(smartnewsSource);
        SingleOutputStreamOperator<Map<String, String>> mapDS = smartnewsDS.map(new MapFunction<Object, Map<String, String>>() {
            @Override
            public HashMap<String, String> map(Object value) throws Exception {
                HashMap<String, String> map1 = new HashMap<>();
                map1.put("data",value.toString());
                return map1;
            }
        });
//        //写入es
          mapDS.addSink(new MyESSink("esIp",9200,"user","pass","index","type"));
        env.execute();
    }
}
    相关文章
    |
    5月前
    |
    Oracle 关系型数据库 API
    实时计算 Flink版产品使用合集之当sink到elasticsearch时,可以指定es的指定字段吗
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    实时计算 Flink版产品使用合集之当sink到elasticsearch时,可以指定es的指定字段吗
    |
    5月前
    |
    关系型数据库 MySQL Java
    实时计算 Flink版操作报错之遇到java.lang.IllegalStateException: The elasticsearch emitter must be serializable.的错误,如何处理
    在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
    |
    5月前
    |
    Prometheus 监控 Cloud Native
    实时计算 Flink版操作报错之在使用ES时遇到“java.lang.IllegalStateException: The elasticsearch emitter must be serializable”,是什么原因
    在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
    |
    5月前
    |
    Kubernetes 关系型数据库 MySQL
    实时计算 Flink版产品使用合集之在Kubernetes(k8s)中同步MySQL变更到Elasticsearch该怎么操作
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    5月前
    |
    SQL Java 关系型数据库
    实时计算 Flink版产品使用合集之怎么连接 Elasticsearch
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    5月前
    |
    canal NoSQL 关系型数据库
    实时计算 Flink版产品使用合集之如何在ElasticSearch中查看同步的数据
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    5月前
    |
    SQL Oracle 关系型数据库
    实时计算 Flink版产品使用合集之源MySQL表新增字段后,要同步这个改变到Elasticsearch的步骤是什么
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    1月前
    |
    NoSQL 关系型数据库 Redis
    mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
    mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
    mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
    |
    2月前
    |
    数据可视化 Docker 容器
    一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
    这篇文章提供了通过Docker安装Elasticsearch和Kibana的详细过程和图解,包括下载镜像、创建和启动容器、处理可能遇到的启动失败情况(如权限不足和配置文件错误)、测试Elasticsearch和Kibana的连接,以及解决空间不足的问题。文章还特别指出了配置文件中空格的重要性以及环境变量中字母大小写的问题。
    一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
    |
    2月前
    |
    JSON 自然语言处理 数据库
    Elasticsearch从入门到项目部署 安装 分词器 索引库操作
    这篇文章详细介绍了Elasticsearch的基本概念、倒排索引原理、安装部署、IK分词器的使用,以及如何在Elasticsearch中进行索引库的CRUD操作,旨在帮助读者从入门到项目部署全面掌握Elasticsearch的使用。