Spark通过RestHighLevelClient批量写入ES7

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Java开发Spark批量写入elasticsearch 7

ES的工具类

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

public class ESUtils {
    public static BulkProcessor.Listener getBulkListener(){
        BulkProcessor.Listener listener =  new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {

            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {

            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

            }
        };
        return listener;
    }

    public static ActionListener<BulkResponse> getActionListener() {
        ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkResponse) {

            }

            @Override
            public void onFailure(Exception e) {

            }
        };
        return listener;
    }


    /**
     * 获取处理器
     * @param client
     * @param listener
     * @param bulkActions
     * @return
     * @throws InterruptedException
     */
    public static BulkProcessor getBulkProcessor(RestHighLevelClient client, BulkProcessor.Listener listener, int bulkActions) throws InterruptedException {
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                (request, bulkListener) ->
                        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener).build();
        BulkProcessor.Builder builder = BulkProcessor.builder(
                (request, bulkListener) ->
                        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener);
        builder.setBulkActions(bulkActions);
        builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
        builder.setConcurrentRequests(0);
        builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
        builder.setBackoffPolicy(BackoffPolicy
                .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
        return bulkProcessor;
    }
}

BulkProcessor的方式

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;

/**
 * 效率要比RestHighLevelClient.bulk或者client.bulkAsync快很多
 */
public class OneIdToEs {
    private static final Logger LOG = LoggerFactory.getLogger(OneIdToEsUserPass.class);
    public static void main(String[] args) {
        if (args.length < 9) {
            System.out.println("请输入 hive的one_id表名称 对应hive表唯一id字段 对应hive表的日期分区字段 对应hive表日期分区的值 写入es索引名称 es的Ip地址 es的端口号 es用户名 es密码 如:dwd.dwd_one_id_data_source_info_da exist_mark dt 2020-11-10 dwd_one_id_data_source_info_da 192.168.250.116 9200 zs zs123");
            System.exit(1);
        }

//        参数列表
        //库名称.表名称
        String hiveTableName = args[0];
        String hiveTableUserIdName = args[1];
        String hiveTablePartitionName = args[2];
        String hiveTablePartitionValue = args[3];
        String esTableName = args[4];

        //es
        String esIp = args[5];
        String esPort = args[6];
        String esUser = args[7];
        String esPass= args[8];

        System.out.println("---------------------------------------------------spark start--------------------------------------------------------");
        SparkSession spark = SparkSession
                .builder()
                .appName("ES_WGP_DATA")
                .enableHiveSupport()
                .getOrCreate();

        String sql = "select * from " + hiveTableName +" where "+ hiveTablePartitionName +"=" +"'" +hiveTablePartitionValue+"'";
        final Dataset<Row> inputData = spark.sql(sql);
        LOG.warn("执行的sql语句 ---> " + sql);
        LOG.warn("执行的sql语句的数据量是 ---> " + inputData.count());

        inputData.show();

        final String[] columns = inputData.columns();
        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
        final Broadcast<String[]> broadcast = jsc.broadcast(columns);

        inputData.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
            @Override
            public void call(Iterator<Row> iterator) throws Exception {
                //es 写入
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY,
                        new UsernamePasswordCredentials(esUser, esPass));  //es账号密码(默认用户名为elastic)
                RestHighLevelClient client =new RestHighLevelClient(
                        RestClient.builder(new HttpHost(esIp, Integer.parseInt(esPort), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                                httpClientBuilder.disableAuthCaching();
                                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                            }
                        })
                );

                if (null == client) {
                    LOG.error("es -->  client is failed!");
                }

                final BulkProcessor.Listener listener = ESUtils.getBulkListener();
                final BulkProcessor bulkProcessor = ESUtils.getBulkProcessor(client, listener, 100);

                final String[] columns = broadcast.value();
                int count = 0;
                while (iterator.hasNext()) {
                    final Row row = iterator.next();
                    final String id = row.getAs(hiveTableUserIdName);
                    final XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject()
                            .field(hiveTableUserIdName, id);
                    for (String colName: columns) {
                        if (!colName.contentEquals(hiveTableUserIdName) ) {
                            Object obj = row.getAs(colName);
                            if (null == obj) {
                                obj = "-1";
                            }
                            xContentBuilder.field(colName, obj.toString());
                        }
                    }
                    xContentBuilder.endObject();

                    final UpdateRequest update = new UpdateRequest(esTableName, id).doc(xContentBuilder).upsert();
                    bulkProcessor.add(update);
//                    final IndexRequest indexRequest = new IndexRequest(esTableName).id(id).source(xContentBuilder).opType(DocWriteRequest.OpType.CREATE);
//                    bulkProcessor.add(indexRequest);
                    count ++;
                    if (count >= 100) {
                        bulkProcessor.flush();
                        count = 0;
                    }
                }
                bulkProcessor.flush();
                bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
                bulkProcessor.close();
                client.close();
            }
        });
    }
}

bulkAsync的方式

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;

public class OneIdToEsUserPass {
    private static final Logger LOG = LoggerFactory.getLogger(OneIdToEsUserPass.class);
    public static void main(String[] args) {
        if (args.length < 9) {
            System.out.println("请输入 hive的one_id表名称 对应hive表唯一id字段 对应hive表的日期分区字段 对应hive表日期分区的值 写入es索引名称 es的Ip地址 es的端口号 es用户名 es密码 如:dwd.dwd_one_id_data_source_info_da exist_mark dt 2020-11-10 dwd_one_id_data_source_info_da 192.168.250.116 9200 zs zs123");
            System.exit(1);
        }

//        参数列表
        //库名称.表名称
        String hiveTableName = args[0];
        String hiveTableUserIdName = args[1];
        String hiveTablePartitionName = args[2];
        String hiveTablePartitionValue = args[3];
        String esTableName = args[4];

        //es
        String esIp = args[5];
        String esPort = args[6];
        String esUser = args[7];
        String esPass= args[8];

//        try {
//            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//            credentialsProvider.setCredentials(AuthScope.ANY,
//                    new UsernamePasswordCredentials(esUser, esPass));  //es账号密码(默认用户名为elastic)
//            RestHighLevelClient client =new RestHighLevelClient(
//                    RestClient.builder(new HttpHost(esIp, Integer.parseInt(esPort), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
//                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
//                            httpClientBuilder.disableAuthCaching();
//                            return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//                        }
//                    })
//            );
//            GetIndexRequest exist=new GetIndexRequest(esTableName);
//            boolean exists=client.indices().exists(exist, RequestOptions.DEFAULT);
//
//            if (exists) {
//                DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(esTableName);
//                AcknowledgedResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
//                boolean acknowledged = deleteIndexResponse.isAcknowledged();
//                if (acknowledged) {
//                    LOG.warn("es删除索引成功 ---> " + esTableName);
//                }
//            }
//            client.close();
//        }catch (IOException e) {
//            LOG.error("es 删除索引失败 -->  e: ", e.getMessage());
//        }

        System.out.println("---------------------------------------------------spark start--------------------------------------------------------");
        SparkSession spark = SparkSession
                .builder()
                .appName("ES_WGP_DATA")
                .enableHiveSupport()
                .getOrCreate();

        String sql = "select * from " + hiveTableName +" where "+ hiveTablePartitionName +"=" +"'" +hiveTablePartitionValue+"'";
        final Dataset<Row> inputData = spark.sql(sql);
        LOG.warn("执行的sql语句 ---> " + sql);
        LOG.warn("执行的sql语句的数据量是 ---> " + inputData.count());

        inputData.show();

        final String[] columns = inputData.columns();
        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
        final Broadcast<String[]> broadcast = jsc.broadcast(columns);

        inputData.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
            @Override
            public void call(Iterator<Row> iterator) throws Exception {
                //es 写入
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY,
                        new UsernamePasswordCredentials(esUser, esPass));  //es账号密码(默认用户名为elastic)
                RestHighLevelClient client =new RestHighLevelClient(
                        RestClient.builder(new HttpHost(esIp, Integer.parseInt(esPort), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                                httpClientBuilder.disableAuthCaching();
                                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                            }
                        })
                );

                if (null == client) {
                    LOG.error("es -->  client is failed!");
                }

                //异步调用使用
//                final ActionListener<BulkResponse> listener = ESUtils.getActionListener();
                final BulkRequest request = new BulkRequest();
                request.timeout("3m");

                final String[] columns = broadcast.value();
                int count = 0;
                while (iterator.hasNext()) {
                    final Row row = iterator.next();
                    final String id = row.getAs(hiveTableUserIdName);
                    final XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject()
                            .field(hiveTableUserIdName, id);
                    for (String colName: columns) {
                        if (!colName.contentEquals(hiveTableUserIdName) ) {
                            Object obj = row.getAs(colName);
                            if (null == obj) {
                                obj = "-1";
                            }
                            xContentBuilder.field(colName, obj.toString());
                        }
                    }
                    xContentBuilder.endObject();

                   final UpdateRequest update = new UpdateRequest(esTableName, id).doc(xContentBuilder).upsert();
                   request.add(update);
//                    final IndexRequest indexRequest = new IndexRequest(esTableName).id(id).source(xContentBuilder).opType(DocWriteRequest.OpType.CREATE);
//                    request.add(indexRequest);
                    count ++;
                    if (count >= 100) {
//                        client.bulkAsync(request, RequestOptions.DEFAULT, listener);
                        client.bulk(request, RequestOptions.DEFAULT);
                        count = 0;
                    }
                }
                client.bulk(request, RequestOptions.DEFAULT);
//                client.bulkAsync(request, RequestOptions.DEFAULT, listener);
                client.close();
                client.close();
            }
        });
    }
}

es写入.png

说明

  1.使用第一种的UpdateRequest的doc是没有显示删除的,使用第二种doc有显示删除。
  2.使用UpdateRequest update = new UpdateRequest(esTableName, id).doc(xContentBuilder).upsert() 没有生效,应该是:
BulkRequestBuilder bulkRequest = esBase.getClient().prepareBulk();
        for (int i = 0; i < data.size(); i++) {
            T t = data.get(i);
            String json = JSONObject.toJSONString(t);
            IndexRequest indexRequest = new IndexRequest(index, type, t.get_id()).source(json).parent(parentIdList.get(i));
            UpdateRequest updateRequest = new UpdateRequest(index, type, t.get_id()).parent(parentIdList.get(i)).doc(json).upsert(indexRequest);

            bulkRequest.add(updateRequest);
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
157 0
|
6天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
3月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
2月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
120 0
|
3月前
|
分布式计算 监控 大数据
Spark RDD分区和数据分布:优化大数据处理
Spark RDD分区和数据分布:优化大数据处理
|
4月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
100 0
|
4月前
|
SQL 分布式计算 大数据
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
100 0
|
4月前
|
分布式计算 资源调度 大数据
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
65 0
|
4月前
|
SQL 机器学习/深度学习 分布式计算
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day17】——Spark4
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day17】——Spark4
41 0
|
4月前
|
存储 消息中间件 分布式计算
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day28】——Spark15+数据倾斜1
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day28】——Spark15+数据倾斜1
36 0