1.pom
org.apache.flink flink-connector-elasticsearch6_2.11 1.8.0
2. 写es
public static ElasticsearchSink createEsProducer(ParameterTool params, String index, String type) { List httpHosts = new ArrayList<>(); String url = params.getProperties().getProperty("es.http.url"); int port = Integer.parseInt(params.getProperties().getProperty("es.http.port")); httpHosts.add(new HttpHost(url, port, "http")); // use a ElasticsearchSink.Builder to create an ElasticsearchSink ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction() { public IndexRequest createIndexRequest(String element) { Map map = JSONObject.parseObject(element, Map.class); System.out.println(map); return Requests.indexRequest() .index(index) .type(type) .source(map); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } } ); // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setRestClientFactory( restClientBuilder -> { restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { @Override public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { requestConfigBuilder.setConnectTimeout(5000); requestConfigBuilder.setSocketTimeout(40000); requestConfigBuilder.setConnectionRequestTimeout(1000); return requestConfigBuilder; } }).setMaxRetryTimeoutMillis(5*60*1000); } ); return esSinkBuilder.build(); }
3.主程序
public static void main(String[] args) throws Exception { /*if (args == null || args.length == 0) { throw new RuntimeException("config file name must be config, config is args[0]"); }*/ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //DataStream userStream = env.addSource(new RandomUserSource()); ParameterTool parameterTool = Configs.loadConfig("config-test.properties"); env.getConfig().setGlobalJobParameters(parameterTool); //String topic = parameterTool.getProperties().getProperty("kafka.user.info.topic"); String topic="test1115"; DataStream userStream = env.addSource(Utils.createConsumers(parameterTool, topic)); DataStream userMapperStream = userStream.flatMap(new UserMapper()); String index = "sys_user_test"; String type = "user"; userMapperStream.addSink(Utils.createEsProducer(parameterTool, index, type)); env.execute("user-to-es"); }