ES批量写入数据

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: ES批量写入数据

单次的实时操作改为批量操作,这样做的好处有,减少网路开销,从消息大小,时间,消息数量三个维度来衡量 批量操作的维度,如果数据不是要求非常实时的操作(非常实时的存储应该也不会选择es),改为批量操作后,错误均修复,大概配置如下。

1.代码

/*
* setBulkActions(1000):每添加1000个request,执行一次bulk操作
* setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)):每达到5M的请求size时,执行一次bulk操作
* setFlushInterval(TimeValue.timeValueSeconds(5)):每5s执行一次bulk操作
* setConcurrentRequests(1):默认是1,表示积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数,设置为0表示二者同步的
*setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3)):当ES由于资源不足发生异常
EsRejectedExecutionException重試策略:默认(50ms, 8),
* 策略算法:start + 10 * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1)
* */
package es;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Configuration
public class ESConfiguration {
    public static final Logger logger = LoggerFactory.getLogger(ESConfiguration.class);
    @Bean
    public BulkProcessor bulkProcessor() throws UnknownHostException {
        Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
        Client client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("http://192.168.10.33"), Integer.parseInt("9300")));
        return BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
            }
        }).setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
    }
}

此processer的含义为如果消息数量到达1000 或者消息大小到大5M 或者时间达到5s 任意条件满足,客户端就会把当前的数据提交到服务端处理。效率很高。

2.写入代码

package es;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@Repository
public class StudentInsertDao{
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private BulkProcessor bulkProcessor;
    private ObjectMapper objectMapper = new ObjectMapper();
    public void insert(Student student) {
        String type = student.getAge();
        String id = student.getName()+student.getAddr()+student.getAge();
        try {
            byte[] json = objectMapper.writeValueAsBytes(student);
            bulkProcessor.add(new IndexRequest("students", type, id).source(json));
        } catch (Exception e) {
            logger.error("bulkProcessor failed ,reason:{}",e);
        }
    }
}


 


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
前端开发 API
ES 高级实战(四)查询 ES 数据
ES 高级实战(四)查询 ES 数据
1384 0
ES 高级实战(四)查询 ES 数据
|
6月前
|
自然语言处理 索引
03_ES数据查询操作
03_ES数据查询操作
88 0
|
6月前
|
存储 Unix 索引
ES常用查询命令
ES常用查询命令
|
6月前
|
JSON 前端开发 JavaScript
ES6(2015)-ES13(2022)新增特性大总结
ES6(2015)-ES13(2022)新增特性大总结
96 0
|
6月前
|
网络架构
ES6函数新增了哪些扩展?
ES6函数新增了哪些扩展?
59 0
|
JSON 移动开发 NoSQL
【ES系列九】——批量同步数据至ES
通过es官网提供的bulk方法进行实现
|
网络架构
ES6中函数新增了哪些扩展?
ES6允许为函数的参数设置默认值
ES5新增方法(一)
前言 今天和大家分享一下ES5中一些新增的方法。 一、数组方法 迭代(遍历)方法:forEach(),map(),filter(),some(),every() array.forEach(function(value,index,arr)) value:数组当前项的值 index:数组当前项的索引 arr:数组对象本身
|
JSON 安全 数据安全/隐私保护
elasticdump迁移ES数据详解
elasticdump迁移ES数据详解
|
测试技术 索引
ES数据删除优化
分享一下ES数据删除优化的相关经历,根据业务需要一共优化了3次,包含了其中踩到的坑和一些花时间解决的问题.
1086 0