单次的实时操作改为批量操作,这样做的好处有,减少网路开销,从消息大小,时间,消息数量三个维度来衡量 批量操作的维度,如果数据不是要求非常实时的操作(非常实时的存储应该也不会选择es),改为批量操作后,错误均修复,大概配置如下。
1.代码
/* * setBulkActions(1000):每添加1000个request,执行一次bulk操作 * setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)):每达到5M的请求size时,执行一次bulk操作 * setFlushInterval(TimeValue.timeValueSeconds(5)):每5s执行一次bulk操作 * setConcurrentRequests(1):默认是1,表示积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数,设置为0表示二者同步的 *setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3)):当ES由于资源不足发生异常 EsRejectedExecutionException重試策略:默认(50ms, 8), * 策略算法:start + 10 * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1) * */ package es; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetAddress; import java.net.UnknownHostException; @Configuration public class ESConfiguration { public static final Logger logger = LoggerFactory.getLogger(ESConfiguration.class); @Bean public BulkProcessor bulkProcessor() throws UnknownHostException { Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); Client client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("http://192.168.10.33"), Integer.parseInt("9300"))); return BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long l, BulkRequest bulkRequest) { } @Override public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) { } @Override public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable); } }).setBulkActions(1000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); } }
此processer的含义为如果消息数量到达1000 或者消息大小到大5M 或者时间达到5s 任意条件满足,客户端就会把当前的数据提交到服务端处理。效率很高。
2.写入代码
package es; import com.fasterxml.jackson.databind.ObjectMapper; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.index.IndexRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; @Repository public class StudentInsertDao{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private BulkProcessor bulkProcessor; private ObjectMapper objectMapper = new ObjectMapper(); public void insert(Student student) { String type = student.getAge(); String id = student.getName()+student.getAddr()+student.getAge(); try { byte[] json = objectMapper.writeValueAsBytes(student); bulkProcessor.add(new IndexRequest("students", type, id).source(json)); } catch (Exception e) { logger.error("bulkProcessor failed ,reason:{}",e); } } }