【ES系列九】——批量同步数据至ES

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: 通过es官网提供的bulk方法进行实现

一、方案


   1.通过es官网提供的bulk方法进行实现

   2.将数据按照规则写入到json文件中,通过curl命令进行批量提交操作

 

   注:如下实验es为集群,三台2c8g;mongodb为集群,三台2c16g;跑java程序的机器为2c4g


二、过程


   1.1.通过es官网提供的bulk方法实现,代码如下


package com.yunshi.timedtask.dao;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Component
public class TestElasticSearch4J {
  private static RestHighLevelClient client = new RestHighLevelClient(
          RestClient.builder(
                  new HttpHost("es1.yunshicloud.com", 9200, "http")
          ));
  public static void main(String[] args) throws IOException {
    TestElasticSearch4J testElasticSearch4J = new TestElasticSearch4J();
    List<Map<String,Object>> mapList = new ArrayList<>();
    System.out.println("准备数据,总计"+mapList.size()+"条");
    testElasticSearch4J.batchInsert(mapList,"material");
    client.close();
  }
  public  void batchInsert(List<Map<String,Object>> mapList,String type) throws IOException {
    BulkRequest request = new BulkRequest();
    for (Map<String,Object> map : mapList) {
      IndexRequest indexRequest= new IndexRequest("dev-rms-resource", type).source(map);
      request.add(indexRequest);
    }
    BulkResponse bulkItemResponses = client.bulk(request);
    for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
      if (bulkItemResponse.isFailed()) {
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
        System.out.println("批量插入完成"+failure.getStatus());
      }
    }
  }
}


   1.2.该方法本人没有测试效率

 

   2.1.通过查询数据写入到json文件中,结构格式如下

       {"index":{"_index":"dev-rms-resource","_type":"material"}}

       {"id":"123","name":"aaa","url":"https://123123"}

   2.2.通过curl命令进行批量提交操作

       curl -XPOST  http://192.168.1.211:9200/_bulk --data-binary @material.json

   2.3.将提交之后返回的每一条状态写入到一个文件中,通过如下linux命令检查是否有失败的

       grep -rn "400" *

   2.4.测试效率如下:

       160万数据,总大小2.4G,批量每次推送2万,大概耗时8分钟左右(包含读取mongodb的时间)

       54万数据,总大小6.4G,批量每次推送1千,大概耗时30分钟左右

   2.4.java代码实现

 

package com.yunshi.timedtask.scheduler;
import com.alibaba.fastjson.JSON;
import com.yunshi.timedtask.common.Constants;
import com.yunshi.timedtask.dao.BasicDao;
import com.yunshi.timedtask.dao.IEsBasicDao;
import com.yunshi.timedtask.dao.TestElasticSearch4J;
import com.yunshi.timedtask.dao.es.ESConfig;
import com.yunshi.timedtask.domain.*;
import com.yunshi.timedtask.util.DateUtil;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * Created by makang on 2019年12月30日.
 */
public class InitDataTaskService implements Runnable {
    private final Logger log = LoggerFactory.getLogger(InitDataTaskService.class);
    //调用底层的bean
    private BasicDao basicDao;
    private IEsBasicDao iEsBasicDao;
    private ESConfig esConfig;
    private int pageNum;
    //重写的构造器,用以接收外部传递过来的参数
    public InitDataTaskService(BasicDao basicDao, IEsBasicDao iEsBasicDao, ESConfig esConfig, int pageNum){
        this.basicDao = basicDao;
        this.iEsBasicDao = iEsBasicDao;
        this.esConfig = esConfig;
        this.pageNum = pageNum;
    }
    //实现的Runnable接口中的run方法
    @Override
    public  void run(){
        //开始时间
        long stertTime = System.currentTimeMillis();
        Map<String,Object> querymaterialAll = new HashMap<String,Object>(1);
        Map<String,Object> sortFilter = new HashMap<String,Object>(1);
        int pageNum = this.pageNum;
        //一、同步素材表中数据
        long materialAllSize = basicDao.count("material",querymaterialAll);
        log.info("开始同步素材,素材总数为:"+materialAllSize);
        //1.1.添加检索条件来提高查询效率
        long allMaterialSize = 0;
        for (int pageSize=1;((pageSize-1)*pageNum*20)<materialAllSize;pageSize++){
            int materialSize = 0;
            log.info("当前执行分页条件为:pageSize:"+pageSize+"==pageNum:"+pageNum*20);
            List<Map<String,Object>> materialList = basicDao.find("material",sortFilter,querymaterialAll,pageSize,pageNum*20);
            try {
                String filePath = "./material"+pageSize+".json";
                for (Map<String,Object> materialMap:materialList) {
                    Map<String,Object> materialesMap = new HashMap<>();
                    materialesMap.putAll(materialMap);
                    //写入到json文件中
                    FileWriter fw = new FileWriter(filePath, true);
                    BufferedWriter bw = new BufferedWriter(fw);
                    bw.append("{\"index\":{\"_index\":\""+esConfig.getIndexname()+"\",\"_type\":\"material\"}} \n" );
                    bw.append(JSON.toJSONString(materialesMap)+" \n");
                    bw.close();
                    fw.close();
                    materialSize ++;
                    allMaterialSize++;
                }
                //执行提交命令
                execCommand("curl -XPOST  "+esConfig.getAnalyzerServerIp()+"/_bulk --data-binary @material"+pageSize+".json","./logs/material"+pageSize+".log");
            } catch (Exception e) {
                log.info("失败的素材页数为:"+pageSize);
                e.printStackTrace();
            }
            log.info("当前更新条数为:mongodb内容:"+materialSize);
            materialList.clear();
        }
        long endTime = System.currentTimeMillis();
        log.info("===素材更新用时为:"+DateUtil.getDistanceTime(stertTime,endTime)
                +"===素材更新条数为:"+allMaterialSize);
    }
    /**
     * 执行curl命令的方法
     * @param cmd
     * @param logPath
     */
    public void execCommand(String cmd,String logPath) {
        try {
            log.info("开始执行shell命令,同步es数据");
            long startTime = System.currentTimeMillis();
            Runtime rt = Runtime.getRuntime();
            Process proc = rt.exec(cmd,null,null);
            InputStream stderr = proc.getInputStream();
            InputStreamReader isr = new InputStreamReader(stderr, "GBK");
            BufferedReader br = new BufferedReader(isr);
            String line = "";
            int lineInt = 0;
            FileWriter fw = new FileWriter(logPath, true);
            BufferedWriter bw = new BufferedWriter(fw);
            while ((line = br.readLine()) != null) {
                lineInt++;
                bw.append(line+" \n");
            }
            bw.close();
            fw.close();
            long endTime = System.currentTimeMillis();
            log.info("结束执行shell命令,同步es数据,用时:"+(endTime-startTime)+"==子进程执行个数:"+lineInt);
            stderr.close();
            isr.close();
            br.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


三、遇到的坑&待优化空间


   1.json文件的大小不得大于es集群配置的bulk.queue_size参数大小(配置文件默认配置为10M)

 

   1.json文件的大小可控制,超出限制大小则向下一个文件中保存数据从而保证批量提交文件大小在合理范围内

   2.代码中检测批量提交失败的内容进行记录,最后做重试

   3.如果批量没有失败的情况,则需要将生成的json文件删除

 

四、总结


   在做的过程中也是在尝试各种方法,黑猫白猫,先抓住一个老鼠,解决当前问题;然后再考虑后续效率以及优化的相关问题。

   希望小编的总结能够给读者带来帮助。

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
相关文章
|
Linux Shell Python
-bash: pip: command not found pip命令报错 解决方法(Centos版)
-bash: pip: command not found pip命令报错 解决方法(Centos版)
4269 0
|
7月前
|
人工智能 架构师 云栖大会
2024云栖大会 | 产品生态伙伴专场论坛暨产品生态伙伴颁奖典礼
2024云栖大会 | 产品生态伙伴专场论坛暨产品生态伙伴颁奖典礼
Elasticsearch 批量更新
讲述Elasticsearch批量更新索引指定字段操作
|
人工智能 自然语言处理 自动驾驶
【通义】AI视界|马斯克亲自辟谣:xAI不可能在特斯拉的推理计算机上运行
本文精选了24小时内的重要科技新闻,包括马斯克辟谣xAI不会运行在特斯拉计算机上、谷歌发布AlphaProteo AI模型、百度贴吧“弱智吧”成为AI训练佳选、荣耀推出跨应用智能体以及苹果即将在iOS 18.2中加入图像生成功能。更多内容请访问通义官网体验。
|
分布式计算 数据库 开发者
单体架构
单体架构介绍
276 3
|
小程序 开发者
如何调试已经上线的小程序
如何调试已经上线的小程序
409 0
|
程序员 C语言
c enum(枚举)
c enum(枚举)
296 1
|
机器学习/深度学习 自然语言处理 算法
19ContraBERT:顶会ICSE23 数据增强+对比学习+代码预训练模型,提升NLP模型性能与鲁棒性:处理程序变异(变量重命名)【网安AIGC专题11.15】
19ContraBERT:顶会ICSE23 数据增强+对比学习+代码预训练模型,提升NLP模型性能与鲁棒性:处理程序变异(变量重命名)【网安AIGC专题11.15】
381 1
|
存储 网络协议 数据安全/隐私保护
逆向USB设备共享:利用内网穿透让远程设备访问本地USB设备
逆向USB设备共享:利用内网穿透让远程设备访问本地USB设备
逆向USB设备共享:利用内网穿透让远程设备访问本地USB设备
|
机器学习/深度学习 人工智能 自然语言处理
大语言模型的预训练[4]:指示学习Instruction Learning详解以及和Prompt工程、ICL区别
大语言模型的预训练[4]:指示学习Instruction Learning详解以及和Prompt工程、ICL区别