flink 流式写入Elasticsearch 会经常出现各种错误:
1:Flink出现Caused by: java.lang.LinkageError: loader constraint violation错误
这是由于Flink的包加载机制引起的。 -- 解决方案一 原因: 类加载顺序问题,可以在flink-conf.yaml中加入 classloader.resolve-order: parent-first Flink的默认加载是child-first。 但是用了parent-first配置有可能出现类冲突问题。解决办法只针对个别包出来,不要一律用parent-first, 配置如下: classloader.parent-first-patterns.additional: javax.script; jdk; -- 解决方案二 pom.xml 中将Elasticsearch相关依赖放在最开始位置。
classloader.parent-first-patterns.default
,不建议修改,固定为以下这些值:
java.; scala.; org.apache.flink.; com.esotericsoftware.kryo; org.apache.hadoop.; javax.annotation.; org.slf4j; org.apache.log4j; org.apache.logging; org.apache.commons.logging; ch.qos.logback; org.xml; javax.xml; org.apache.xerces; org.w3c
classloader.parent-first-patterns.additional
:除了上一个参数指定的类之外,用户如果有其他类以child-first模式会发生冲突,而希望以双亲委派模型来加载的话,可以额外指定(分号分隔)。
2:Flink写入elaticSearch 遇到basic 认证问题
错误信息:
Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://es1:9200], URI [/xxx/_count?ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true], status line [HTTP/1.1 403 Forbidden] {"error":{"root_cause":[{"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"}],"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"},"status":403}
原因:
看到这个问题,很疑惑,我使用用户名密码认证了啊,明明python,curl都行,为啥java不行,难道是我调用的api不对,有一遍遍的查api。后面实在是没办法,进行HttpProxy代理抓包,发现的确是没有Http的认证,在请求Headers中,没有Basic的Auth信息。最后,没办法,放弃了,就直接在Http中Headers手动根据用户名密码进行Base64转码设置进去,解决这个问题。
public class ESTest2 { public static void main(String[] args) throws IOException { String user = "user"; String pwd = "pwd"; String auth = Base64.encodeBase64String((user+":"+pwd).getBytes()); System.out.println(auth); RestClientBuilder clientBuilder = RestClient.builder(new HttpHost("es1", 9200)); clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)}); RestHighLevelClient client = new RestHighLevelClient(clientBuilder); CountRequest countRequest = new CountRequest(); countRequest.indices(index_name); CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT); System.out.println(countResponse.getCount()); client.close(); } }
flink自定义ESsink Demo
import org.apache.commons.codec.binary.Base64; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.http.HttpHost; import org.apache.http.client.CredentialsProvider; import org.apache.http.message.BasicHeader; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import java.util.Map; /** * @description: 自定义es sink * @author: mdz * @date: 2021/7/14 **/ public class MyESSink extends RichSinkFunction<Map<String, String>> { private BulkRequest request = null; private RestHighLevelClient client = null; private CredentialsProvider credentialsProvider = null; private String esHost = null; private int esPort; private String esUsername = null; private String esPassword = null; private String index = null; private String type = null; public MyESSink(String esHost, int esPort, String esUsername, String esPassword, String index, String type) { this.esHost = esHost; this.esPort = esPort; this.esUsername = esUsername; this.esPassword = esPassword; this.index = index; this.type = type; } @Override public void open(Configuration parameters) throws Exception { String user = esUsername; String pwd = esPassword; String auth = Base64.encodeBase64String((user+":"+pwd).getBytes()); RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(esHost, esPort)); clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)}); client = new RestHighLevelClient(clientBuilder); super.open(parameters); } @Override public void invoke(Map<String, String> value, Context context) throws Exception { request = new BulkRequest(); request.add(new IndexRequest(index,type).id(value.get("id")).source(value,XContentType.JSON)); client.bulk(request, RequestOptions.DEFAULT); } @Override public void close() throws Exception { client.close(); super.close(); } }
flink 读取kafka sink 到Es
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import java.util.*; /** * @description: pulsar写入到es * @author: mdz * @date: 2021/5/19 **/ public class Kafka2Es { public static void main(String[] args) throws Exception { //创建环境,设置参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.disableOperatorChaining(); env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); //定义kafka相关参数 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092"); properties.setProperty("group.id","gp_sdbd-etl-test07"); properties.setProperty("request.timeout.ms", "600000"); properties.setProperty("enable.auto.commit","false"); //从kafka获取数据 FlinkKafkaConsumer<Object> smartnewsSource =new FlinkKafkaConsumer("my_topic1", new JSONKeyValueDeserializationSchema(true), properties ); smartnewsSource.setStartFromEarliest(); // 从最开始处消费 DataStreamSource<Object> smartnewsDS = env.addSource(smartnewsSource); SingleOutputStreamOperator<Map<String, String>> mapDS = smartnewsDS.map(new MapFunction<Object, Map<String, String>>() { @Override public HashMap<String, String> map(Object value) throws Exception { HashMap<String, String> map1 = new HashMap<>(); map1.put("data",value.toString()); return map1; } }); // //写入es mapDS.addSink(new MyESSink("esIp",9200,"user","pass","index","type")); env.execute(); } }