ES批量写入数据

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: ES批量写入数据

单次的实时操作改为批量操作,这样做的好处有,减少网路开销,从消息大小,时间,消息数量三个维度来衡量 批量操作的维度,如果数据不是要求非常实时的操作(非常实时的存储应该也不会选择es),改为批量操作后,错误均修复,大概配置如下。

1.代码

/*
* setBulkActions(1000):每添加1000个request,执行一次bulk操作
* setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)):每达到5M的请求size时,执行一次bulk操作
* setFlushInterval(TimeValue.timeValueSeconds(5)):每5s执行一次bulk操作
* setConcurrentRequests(1):默认是1,表示积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数,设置为0表示二者同步的
*setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3)):当ES由于资源不足发生异常
EsRejectedExecutionException重試策略:默认(50ms, 8),
* 策略算法:start + 10 * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1)
* */
package es;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Configuration
public class ESConfiguration {
    public static final Logger logger = LoggerFactory.getLogger(ESConfiguration.class);
    @Bean
    public BulkProcessor bulkProcessor() throws UnknownHostException {
        Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
        Client client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("http://192.168.10.33"), Integer.parseInt("9300")));
        return BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
            }
        }).setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
    }
}

此processer的含义为如果消息数量到达1000 或者消息大小到大5M 或者时间达到5s 任意条件满足,客户端就会把当前的数据提交到服务端处理。效率很高。

2.写入代码

package es;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@Repository
public class StudentInsertDao{
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private BulkProcessor bulkProcessor;
    private ObjectMapper objectMapper = new ObjectMapper();
    public void insert(Student student) {
        String type = student.getAge();
        String id = student.getName()+student.getAddr()+student.getAge();
        try {
            byte[] json = objectMapper.writeValueAsBytes(student);
            bulkProcessor.add(new IndexRequest("students", type, id).source(json));
        } catch (Exception e) {
            logger.error("bulkProcessor failed ,reason:{}",e);
        }
    }
}


 


相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
SQL 算法 关系型数据库
Mysql关键字执行顺序-深入解析
Mysql关键字执行顺序-深入解析
1979 0
Mysql关键字执行顺序-深入解析
Maven之阿里云镜像仓库配置
方式一:全局配置可以添加阿里云的镜像到maven的setting.xml配置中,这样就不需要每次在pom中,添加镜像仓库的配置,在mirrors节点下面添加子节点: <id>nexus-aliyun</id> <mirrorOf>central</mirrorOf> <name>Nexus aliyun</name> <url>http://maven.
|
4月前
|
机器学习/深度学习 人工智能 PyTorch
阿里云GPU云服务器简介:优势场景价详解,最新收费标准与活动价格参考
阿里云GPU云服务器怎么样?阿里云GPU结合了GPU计算力与CPU计算力,主要应用于于深度学习、科学计算、图形可视化、视频处理多种应用场景,现在购买有包月5折包年4折起等优惠,GPU 计算型 gn6i实例4核15G包月优惠价1681.00元/1个月起,包年16141.80元/1年起;GPU 计算型 gn6v实例8核32G包月优惠价3817.00元/1个月起,包年36647.40元/1起等。本文为您详细介绍阿里云GPU云服务器产品优势、应用场景以及最新活动价格。
|
网络协议 Java API
SpringBoot整合Elasticsearch-Rest-Client、测试保存、复杂检索
这篇文章介绍了如何在SpringBoot中整合Elasticsearch-Rest-Client,并提供了保存数据和进行复杂检索的测试示例。
SpringBoot整合Elasticsearch-Rest-Client、测试保存、复杂检索
|
存储 缓存 固态存储
优化Elasticsearch 硬件配置
优化Elasticsearch 硬件配置
540 5
|
容灾 关系型数据库 数据库
阿里云RDS服务巴黎奥运会赛事系统,助力云上奥运稳定运行
2024年巴黎奥运会,阿里云作为官方云服务合作伙伴,提供了稳定的技术支持。云数据库RDS通过备份恢复、实时监控、容灾切换等产品能力,确保了赛事系统的平稳运行。
 阿里云RDS服务巴黎奥运会赛事系统,助力云上奥运稳定运行
|
存储 安全 前端开发
端到端加密:确保数据传输安全的最佳实践
【10月更文挑战第12天】端到端加密(E2EE)是确保数据传输安全的重要手段,通过加密技术保障数据在传输过程中的隐私与完整性,防止第三方窃听和篡改。本文介绍E2EE的工作原理、核心优势及实施步骤,并探讨其在即时通讯、文件共享和金融服务等领域的应用,强调了选择加密协议、密钥管理、数据加密及安全接口设计的重要性,旨在帮助企业和开发者有效保护用户数据,满足数据保护法规要求。
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
869 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
SQL 存储 Java
Python-sqlparse解析SQL工具库一文详解(一)
Python-sqlparse解析SQL工具库一文详解(一)
5181 113
Python-sqlparse解析SQL工具库一文详解(一)

热门文章

最新文章