ES transport client底层是netty实现,netty本质上是异步方式,但是netty自身可以使用sync或者await(future超时机制)来实现类似同步调用!
因此,ES transport client可以同步调用也可以异步(不过底层的socket必然是异步实现)。
发送端例子
对于java client的数据发送(这里以bulk为例),写过的人都知道,其实是很简单的,因为大部分事情都已经被client做掉了,那么我们先给出例子感知一下:
client初始化
Settings settings = Settings.settingsBuilder() .put("cluster.name", "myClusterName") .put("client.transport.sniff", true).build(); client=new TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress("host1",9300)) .addTransportAddress(new InetSocketTransportAddress("host2",9300));
bulk数据发送
对于数据的发送ES提供了两种方式:
第一种bulk api:
import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }
可以看到这种方式是由client端自己添加数据,然后调用BulkResponse bulkResponse = bulkRequest.get();
来完成数据的发送。
第二种叫做Bulk Processor:
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .build();
初始化bulk processor之后,客户端只需要往bulkProcessor添加数据即可bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
,你可以先配置好bulk的size、interval等,其他的事情就交给processor自己去做吧。
两种方式各有利弊,第一种要自己控制bulk size和interval,但是有利于对发送失败的处理;而第二种简单易用,用户只管add数据就好,但是对于使用回调函数来处理异常会不那么方便,如何选择就看使用场景的了。
部分内容摘自:http://www.opscoder.info/es_javaclient.html
本文转自张昺华-sky博客园博客,原文链接:http://www.cnblogs.com/bonelee/p/7889931.html,如需转载请自行联系原作者