ES批量写入数据

简介: ES批量写入数据

单次的实时操作改为批量操作,这样做的好处有,减少网路开销,从消息大小,时间,消息数量三个维度来衡量 批量操作的维度,如果数据不是要求非常实时的操作(非常实时的存储应该也不会选择es),改为批量操作后,错误均修复,大概配置如下。

1.代码

/*
* setBulkActions(1000):每添加1000个request,执行一次bulk操作
* setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)):每达到5M的请求size时,执行一次bulk操作
* setFlushInterval(TimeValue.timeValueSeconds(5)):每5s执行一次bulk操作
* setConcurrentRequests(1):默认是1,表示积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数,设置为0表示二者同步的
*setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3)):当ES由于资源不足发生异常
EsRejectedExecutionException重試策略:默认(50ms, 8),
* 策略算法:start + 10 * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1)
* */
package es;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Configuration
public class ESConfiguration {
    public static final Logger logger = LoggerFactory.getLogger(ESConfiguration.class);
    @Bean
    public BulkProcessor bulkProcessor() throws UnknownHostException {
        Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
        Client client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("http://192.168.10.33"), Integer.parseInt("9300")));
        return BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
            }
        }).setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
    }
}

此processer的含义为如果消息数量到达1000 或者消息大小到大5M 或者时间达到5s 任意条件满足,客户端就会把当前的数据提交到服务端处理。效率很高。

2.写入代码

package es;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@Repository
public class StudentInsertDao{
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private BulkProcessor bulkProcessor;
    private ObjectMapper objectMapper = new ObjectMapper();
    public void insert(Student student) {
        String type = student.getAge();
        String id = student.getName()+student.getAddr()+student.getAge();
        try {
            byte[] json = objectMapper.writeValueAsBytes(student);
            bulkProcessor.add(new IndexRequest("students", type, id).source(json));
        } catch (Exception e) {
            logger.error("bulkProcessor failed ,reason:{}",e);
        }
    }
}


 


相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
4月前
|
算法 搜索推荐 Serverless
为什么 ES 的搜索结果只到 10,000?强制“数清楚”的代价有多大
Elasticsearch 7.x后默认返回10,000总数,实为Block-Max WAND算法的性能优化——跳过低分文档块以提升查询速度。强行开启`track_total_hits:true`将禁用该优化,导致CPU飙升、延迟激增。本文深入Lucene底层,解析其原理、陷阱与治理方案。
638 1
|
Arthas 监控 Java
Java 诊断利器 Arthas使用
Java 诊断利器 Arthas使用
5149 0
|
1月前
|
存储 对象存储
阿里云OSS储类型对比:标准 / 低频 / 归档 / 冷归档 / 深度冷归档全解析,一文看懂
阿里云OSS提供标准、低频、归档、冷归档及深度冷归档5类存储,对象存储OSS官网:https://t.aliyun.com/U/bgeMpr 适配热/温/冷数据场景:标准型实时访问;低频型月均访问<1次;归档及更冷类型访问频次更低、有解冻延迟与最小存储时长(30–180天)要求,且小文件按64KB起计费。
392 1
|
9月前
|
机器学习/深度学习 人工智能 PyTorch
阿里云GPU云服务器简介:优势场景价详解,最新收费标准与活动价格参考
阿里云GPU云服务器怎么样?阿里云GPU结合了GPU计算力与CPU计算力,主要应用于于深度学习、科学计算、图形可视化、视频处理多种应用场景,现在购买有包月5折包年4折起等优惠,GPU 计算型 gn6i实例4核15G包月优惠价1681.00元/1个月起,包年16141.80元/1年起;GPU 计算型 gn6v实例8核32G包月优惠价3817.00元/1个月起,包年36647.40元/1起等。本文为您详细介绍阿里云GPU云服务器产品优势、应用场景以及最新活动价格。
|
8月前
|
传感器 机器学习/深度学习 算法
【指纹识别】指纹细节提取(Matlab代码实现)
【指纹识别】指纹细节提取(Matlab代码实现)
556 4
|
网络协议 Java API
SpringBoot整合Elasticsearch-Rest-Client、测试保存、复杂检索
这篇文章介绍了如何在SpringBoot中整合Elasticsearch-Rest-Client,并提供了保存数据和进行复杂检索的测试示例。
SpringBoot整合Elasticsearch-Rest-Client、测试保存、复杂检索
|
JSON 人工智能 算法
探索LLM推理全阶段的JSON格式输出限制方法
文章详细讨论了如何确保大型语言模型(LLMs)输出结构化的JSON格式,这对于提高数据处理的自动化程度和系统的互操作性至关重要。
3503 52
|
存储 SQL 运维
当「内容科技企业」遇上多模数据库:新榜采用Lindorm打造全域数据“超级底盘”
新榜业务以数据服务提升内容产业信息流通效率,其数据处理需求聚焦于跨平台实时数据融合处理、实时分析检索、批量更新效率三大维度。Lindorm通过多模超融合架构,提供检索分析一体化、多引擎数据共享,分布式弹性扩展等能力,成为支撑新榜内容服务的核心引擎,助力客户在内容生态竞争中持续领跑。
|
负载均衡 Java Nacos
SpringCloud基础2——Nacos配置、Feign、Gateway
nacos配置管理、Feign远程调用、Gateway服务网关
SpringCloud基础2——Nacos配置、Feign、Gateway
|
消息中间件 存储 算法
RocketMQ学习笔记
RocketMQ学习笔记
464 0