ES批量写入数据

简介: ES批量写入数据

单次的实时操作改为批量操作,这样做的好处有,减少网路开销,从消息大小,时间,消息数量三个维度来衡量 批量操作的维度,如果数据不是要求非常实时的操作(非常实时的存储应该也不会选择es),改为批量操作后,错误均修复,大概配置如下。

1.代码

/*
* setBulkActions(1000):每添加1000个request,执行一次bulk操作
* setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)):每达到5M的请求size时,执行一次bulk操作
* setFlushInterval(TimeValue.timeValueSeconds(5)):每5s执行一次bulk操作
* setConcurrentRequests(1):默认是1,表示积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数,设置为0表示二者同步的
*setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3)):当ES由于资源不足发生异常
EsRejectedExecutionException重試策略:默认(50ms, 8),
* 策略算法:start + 10 * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1)
* */
package es;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Configuration
public class ESConfiguration {
    public static final Logger logger = LoggerFactory.getLogger(ESConfiguration.class);
    @Bean
    public BulkProcessor bulkProcessor() throws UnknownHostException {
        Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
        Client client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("http://192.168.10.33"), Integer.parseInt("9300")));
        return BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
            }
        }).setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
    }
}

此processer的含义为如果消息数量到达1000 或者消息大小到大5M 或者时间达到5s 任意条件满足,客户端就会把当前的数据提交到服务端处理。效率很高。

2.写入代码

package es;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@Repository
public class StudentInsertDao{
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private BulkProcessor bulkProcessor;
    private ObjectMapper objectMapper = new ObjectMapper();
    public void insert(Student student) {
        String type = student.getAge();
        String id = student.getName()+student.getAddr()+student.getAge();
        try {
            byte[] json = objectMapper.writeValueAsBytes(student);
            bulkProcessor.add(new IndexRequest("students", type, id).source(json));
        } catch (Exception e) {
            logger.error("bulkProcessor failed ,reason:{}",e);
        }
    }
}


 


相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
3月前
|
算法 搜索推荐 Serverless
为什么 ES 的搜索结果只到 10,000?强制“数清楚”的代价有多大
Elasticsearch 7.x后默认返回10,000总数,实为Block-Max WAND算法的性能优化——跳过低分文档块以提升查询速度。强行开启`track_total_hits:true`将禁用该优化,导致CPU飙升、延迟激增。本文深入Lucene底层,解析其原理、陷阱与治理方案。
522 1
|
Arthas 监控 Java
Java 诊断利器 Arthas使用
Java 诊断利器 Arthas使用
4941 0
|
网络协议 Java API
SpringBoot整合Elasticsearch-Rest-Client、测试保存、复杂检索
这篇文章介绍了如何在SpringBoot中整合Elasticsearch-Rest-Client,并提供了保存数据和进行复杂检索的测试示例。
SpringBoot整合Elasticsearch-Rest-Client、测试保存、复杂检索
|
负载均衡 Java Nacos
SpringCloud基础2——Nacos配置、Feign、Gateway
nacos配置管理、Feign远程调用、Gateway服务网关
SpringCloud基础2——Nacos配置、Feign、Gateway
|
消息中间件 存储 监控
RocketMQ Tag 详解!
本文详细介绍了 RocketMQ 中 Tag 的原理及其应用场景。Tag 是一种消息过滤机制,允许生产者在发送消息时指定标签,消费者据此选择性消费。文章通过源码分析展示了 Tag 在消息发送、存储及消费阶段的作用,并提供了完整的示例代码。尽管 Tag 功能简单高效,但也存在单一维度过滤等局限性。适合需要高效、低延迟消息传递的场景,如日志监控、电商系统等。
2006 3
|
负载均衡 弹性计算 域名解析
阿里云SLB负载均衡公网类型和私网类型区别
SLB负载均衡可以为多台云服务器提供流量分发服务,阿里云的SLB负载均衡实例分为公网类型和私网类型两种,那么二者之间有什么区别?云吞铺子来说说: 公网SLB和私网SLB区别 费用对比:私网SLB负载均衡是免费的,公网SLB负载均衡是付费的;简单来说,公网SLB主要面向对外提供服务,而私网SLB面向的是对内网的实例做负载均衡。
7362 0
|
消息中间件 测试技术 Kafka
使用ClickHouse集群的7个基本技巧
使用ClickHouse集群的7个基本技巧
505 1
|
JavaScript Java 中间件
Java CompletableFuture 异步超时实现探索
本文探讨了在JDK 8中`CompletableFuture`缺乏超时中断任务能力的问题,提出了一种异步超时实现方案,通过自定义工具类模拟JDK 9中`orTimeout`方法的功能,解决了任务超时无法精确控制的问题,适用于多线程并行执行优化场景。
488 0
|
消息中间件 存储 算法
RocketMQ学习笔记
RocketMQ学习笔记
446 0
|
Java API C++
Java8 CompletableFuture异步编程-进阶篇
Java8 CompletableFuture异步编程-进阶篇

热门文章

最新文章

下一篇
开通oss服务