【ES系列九】——批量同步数据至ES

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 通过es官网提供的bulk方法进行实现

一、方案


   1.通过es官网提供的bulk方法进行实现

   2.将数据按照规则写入到json文件中,通过curl命令进行批量提交操作

 

   注:如下实验es为集群,三台2c8g;mongodb为集群,三台2c16g;跑java程序的机器为2c4g


二、过程


   1.1.通过es官网提供的bulk方法实现,代码如下


package com.yunshi.timedtask.dao;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Component
public class TestElasticSearch4J {
  private static RestHighLevelClient client = new RestHighLevelClient(
          RestClient.builder(
                  new HttpHost("es1.yunshicloud.com", 9200, "http")
          ));
  public static void main(String[] args) throws IOException {
    TestElasticSearch4J testElasticSearch4J = new TestElasticSearch4J();
    List<Map<String,Object>> mapList = new ArrayList<>();
    System.out.println("准备数据,总计"+mapList.size()+"条");
    testElasticSearch4J.batchInsert(mapList,"material");
    client.close();
  }
  public  void batchInsert(List<Map<String,Object>> mapList,String type) throws IOException {
    BulkRequest request = new BulkRequest();
    for (Map<String,Object> map : mapList) {
      IndexRequest indexRequest= new IndexRequest("dev-rms-resource", type).source(map);
      request.add(indexRequest);
    }
    BulkResponse bulkItemResponses = client.bulk(request);
    for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
      if (bulkItemResponse.isFailed()) {
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
        System.out.println("批量插入完成"+failure.getStatus());
      }
    }
  }
}


   1.2.该方法本人没有测试效率

 

   2.1.通过查询数据写入到json文件中,结构格式如下

       {"index":{"_index":"dev-rms-resource","_type":"material"}}

       {"id":"123","name":"aaa","url":"https://123123"}

   2.2.通过curl命令进行批量提交操作

       curl -XPOST  http://192.168.1.211:9200/_bulk --data-binary @material.json

   2.3.将提交之后返回的每一条状态写入到一个文件中,通过如下linux命令检查是否有失败的

       grep -rn "400" *

   2.4.测试效率如下:

       160万数据,总大小2.4G,批量每次推送2万,大概耗时8分钟左右(包含读取mongodb的时间)

       54万数据,总大小6.4G,批量每次推送1千,大概耗时30分钟左右

   2.4.java代码实现

 

package com.yunshi.timedtask.scheduler;
import com.alibaba.fastjson.JSON;
import com.yunshi.timedtask.common.Constants;
import com.yunshi.timedtask.dao.BasicDao;
import com.yunshi.timedtask.dao.IEsBasicDao;
import com.yunshi.timedtask.dao.TestElasticSearch4J;
import com.yunshi.timedtask.dao.es.ESConfig;
import com.yunshi.timedtask.domain.*;
import com.yunshi.timedtask.util.DateUtil;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * Created by makang on 2019年12月30日.
 */
public class InitDataTaskService implements Runnable {
    private final Logger log = LoggerFactory.getLogger(InitDataTaskService.class);
    //调用底层的bean
    private BasicDao basicDao;
    private IEsBasicDao iEsBasicDao;
    private ESConfig esConfig;
    private int pageNum;
    //重写的构造器,用以接收外部传递过来的参数
    public InitDataTaskService(BasicDao basicDao, IEsBasicDao iEsBasicDao, ESConfig esConfig, int pageNum){
        this.basicDao = basicDao;
        this.iEsBasicDao = iEsBasicDao;
        this.esConfig = esConfig;
        this.pageNum = pageNum;
    }
    //实现的Runnable接口中的run方法
    @Override
    public  void run(){
        //开始时间
        long stertTime = System.currentTimeMillis();
        Map<String,Object> querymaterialAll = new HashMap<String,Object>(1);
        Map<String,Object> sortFilter = new HashMap<String,Object>(1);
        int pageNum = this.pageNum;
        //一、同步素材表中数据
        long materialAllSize = basicDao.count("material",querymaterialAll);
        log.info("开始同步素材,素材总数为:"+materialAllSize);
        //1.1.添加检索条件来提高查询效率
        long allMaterialSize = 0;
        for (int pageSize=1;((pageSize-1)*pageNum*20)<materialAllSize;pageSize++){
            int materialSize = 0;
            log.info("当前执行分页条件为:pageSize:"+pageSize+"==pageNum:"+pageNum*20);
            List<Map<String,Object>> materialList = basicDao.find("material",sortFilter,querymaterialAll,pageSize,pageNum*20);
            try {
                String filePath = "./material"+pageSize+".json";
                for (Map<String,Object> materialMap:materialList) {
                    Map<String,Object> materialesMap = new HashMap<>();
                    materialesMap.putAll(materialMap);
                    //写入到json文件中
                    FileWriter fw = new FileWriter(filePath, true);
                    BufferedWriter bw = new BufferedWriter(fw);
                    bw.append("{\"index\":{\"_index\":\""+esConfig.getIndexname()+"\",\"_type\":\"material\"}} \n" );
                    bw.append(JSON.toJSONString(materialesMap)+" \n");
                    bw.close();
                    fw.close();
                    materialSize ++;
                    allMaterialSize++;
                }
                //执行提交命令
                execCommand("curl -XPOST  "+esConfig.getAnalyzerServerIp()+"/_bulk --data-binary @material"+pageSize+".json","./logs/material"+pageSize+".log");
            } catch (Exception e) {
                log.info("失败的素材页数为:"+pageSize);
                e.printStackTrace();
            }
            log.info("当前更新条数为:mongodb内容:"+materialSize);
            materialList.clear();
        }
        long endTime = System.currentTimeMillis();
        log.info("===素材更新用时为:"+DateUtil.getDistanceTime(stertTime,endTime)
                +"===素材更新条数为:"+allMaterialSize);
    }
    /**
     * 执行curl命令的方法
     * @param cmd
     * @param logPath
     */
    public void execCommand(String cmd,String logPath) {
        try {
            log.info("开始执行shell命令,同步es数据");
            long startTime = System.currentTimeMillis();
            Runtime rt = Runtime.getRuntime();
            Process proc = rt.exec(cmd,null,null);
            InputStream stderr = proc.getInputStream();
            InputStreamReader isr = new InputStreamReader(stderr, "GBK");
            BufferedReader br = new BufferedReader(isr);
            String line = "";
            int lineInt = 0;
            FileWriter fw = new FileWriter(logPath, true);
            BufferedWriter bw = new BufferedWriter(fw);
            while ((line = br.readLine()) != null) {
                lineInt++;
                bw.append(line+" \n");
            }
            bw.close();
            fw.close();
            long endTime = System.currentTimeMillis();
            log.info("结束执行shell命令,同步es数据,用时:"+(endTime-startTime)+"==子进程执行个数:"+lineInt);
            stderr.close();
            isr.close();
            br.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


三、遇到的坑&待优化空间


   1.json文件的大小不得大于es集群配置的bulk.queue_size参数大小(配置文件默认配置为10M)

 

   1.json文件的大小可控制,超出限制大小则向下一个文件中保存数据从而保证批量提交文件大小在合理范围内

   2.代码中检测批量提交失败的内容进行记录,最后做重试

   3.如果批量没有失败的情况,则需要将生成的json文件删除

 

四、总结


   在做的过程中也是在尝试各种方法,黑猫白猫,先抓住一个老鼠,解决当前问题;然后再考虑后续效率以及优化的相关问题。

   希望小编的总结能够给读者带来帮助。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
前端开发 API
ES 高级实战(四)查询 ES 数据
ES 高级实战(四)查询 ES 数据
1269 0
ES 高级实战(四)查询 ES 数据
|
4月前
|
自然语言处理 索引
03_ES数据查询操作
03_ES数据查询操作
46 0
|
4月前
|
JSON 前端开发 JavaScript
ES6(2015)-ES13(2022)新增特性大总结
ES6(2015)-ES13(2022)新增特性大总结
40 0
|
5月前
|
存储
ES批量写入数据
ES批量写入数据
64 1
|
5月前
|
API 开发工具 网络架构
springtboot 操作es
springtboot 操作es
|
6月前
|
数据库
解决logstash同步数据库内容到ES时,同步时间点用到了别的表的最新时间点
解决logstash同步数据库内容到ES时,同步时间点用到了别的表的最新时间点
26 0
|
10月前
|
索引
ES5新增方法(一)
前言 今天和大家分享一下ES5中一些新增的方法。 一、数组方法 迭代(遍历)方法:forEach(),map(),filter(),some(),every() array.forEach(function(value,index,arr)) value:数组当前项的值 index:数组当前项的索引 arr:数组对象本身
|
11月前
|
JSON 安全 数据安全/隐私保护
elasticdump迁移ES数据详解
elasticdump迁移ES数据详解
|
11月前
ES6中&&和 __ 鲜为人知的骚操作
ES6中&&和 __ 鲜为人知的骚操作
57 0
|
12月前
|
测试技术 索引
ES数据删除优化
分享一下ES数据删除优化的相关经历,根据业务需要一共优化了3次,包含了其中踩到的坑和一些花时间解决的问题.
827 0