1、index API
IndexRequest request = new IndexRequest( "posts", //index "doc", //type 类型,我对类型的理解有点类似于数据库中的表 index类似于数据库中的database "1"); //Document Id String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON); //source可以有多种形式下面介绍
source可以以map的形式提供,查看官方文档介绍map形式提供的source会自动转换成json格式,初步观察源代码,写的还挺复杂,简单过了一遍其实没太懂,大概意思是map->XContentBuilder,XContentBuilder通过内置工具生成json
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(jsonMap);
source也可以以XContentBuilder形式提供,通过 Elasticsearch built-in helpers生产恒json
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.field("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(builder);
source也可以以Object key-value形式提供,依然转换成json
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch");
其他可选参数
设置路由,说起这个方法要介绍一下路由的概念,elasticsearch 路由机制
request.routing("routing");
parent,es的parent-child结构,简单点说是一对多的关系,es多对多情况要拆分成一对多;限制是parent和children必须在同一个shard当中,当添加文档时指定了parent后,就不会用默认的本文档id分配路由,而是采用父文档的路由值,保证父文档和子文档处于同一个shard当中。需要注意的是,查询时需要指定路由,否则查询会出错。当大于等于三级时,需要指定最顶层父节点路由,以让文档存储在用一个shard中。
request.parent("parent");
设置超时时间,有两种形式, TimeValue 或者字符串
request.timeout(TimeValue.timeValueSeconds(1)); request.timeout("1s");
设置刷新策略, WriteRequest.RefreshPolicy 或者字符串
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
版本号,好像是文档版本,类似于乐观锁的东西吧,没验证,验证后更新
request.version(2);
操作类型,当设置create时,并且指定index、type和id存在,不会更新文档,会抛出异常
request.opType(DocWriteRequest.OpType.CREATE); request.opType("create");
同步调用方式,没什么可说的
IndexResponse indexResponse = client.index(request);
异步调用方式,需要传入一个监听接口,通知结果或者接受异常
client.indexAsync(request, new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { //成功后执行代码,响应结果以参数传入 } @Override public void onFailure(Exception e) { //失败时执行代码,异常以参数传入 } });
响应结果
响应结果官方示例代码如下
String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { //文档不存在,创建后处理代码 } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { 、、// /////文档被更新 } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { //成功shard数量小于总数 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); //处理可能存在的失败 } }
异常处理
文档发生版本冲突
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .version(1); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { //版本冲突处理代码 } }
设置opType=create,并且指定index、type和id存在,不会更新文档,会抛出异常
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { //处理代码 } }
2、get api
GetRequest getRequest = new GetRequest( "posts", //index "doc", //type "1"); //document id
可变参数
禁用获取源数据(source),默认开启。先要理解什么是源数据,_source字段,保存存储是的json body
request.fetchSourceContext(new FetchSourceContext(false));
设置检索返回字段和检索排除字段,同时试了一下即包含又排除的字段,排除有限,既包含又排斥的座位排斥字段
String[] includes = new String[]{"message", "*Date"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"message"}; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
设置返回指定存储字段(我是这样理解的,不过我测试失败了,设置了存储字段之后,还是不能像例子一样获取值)
request.storedFields("message"); GetResponse getResponse = client.get(request); String message = getResponse.getField("message").getValue();
设置路由,parent-child查询时需要指定路由
request.routing("routing");
设置查询父节点,在get查询时,不理解为什么需要设置parent,我测试了一下发现没什么用,不管设置的parent是不是正确的parent,都能查询出结果,这个应该是一个通用方法吧,在get中可能没有用到
request.parent("parent");
elasticsearch可以使用preference参数来指定分片查询的优先级,即我们可以通过该参数来控制搜索时的索引数据分片。如不设置该参数:在所有有效的主分片以及副本间轮询
没太理解,所以没测试
request.preference("preference");
设置实时查询开启/关闭,默认为true。
通过查询资料,real-time是es的一种方式,也是新文档索引模型,跟这个api好像关系不大,这个api不是很理解,我本地集群数据比较少,true/false没发现太大区别;es通过fsync把数据写入到磁盘中,fsync十分消耗资源,es的实时查询瓶颈在硬盘读写,es利用文件系统缓存来加快实时查询速度。具体资料:
大牛博客:https://www.jianshu.com/p/94ce44d6a802
官方文档:https://www.elastic.co/guide/en/elasticsearch/guide/current/near-real-time.html
request.realtime(false);
设置查询前执行刷新,默认false,每个shard默认1秒钟一次refresh
request.refresh(true);
版本号,不多解释
request.version(2);
调用方式
同步
GetResponse getResponse = client.get(getRequest);
异步
client.getAsync(request, new ActionListener<GetResponse>() { @Override public void onResponse(GetResponse getResponse) { //成功 } @Override public void onFailure(Exception e) { //失败 } });
响应对象
GetResponse查询请求文档及其源数据以及最终存储字段
String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); //字符串形式 Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); //Map形式 byte[] sourceAsBytes = getResponse.getSourceAsBytes(); //字节数组形式 } else { //处理找不到文档代码。注意返回的是404状态而不是异常, }
异常处理
如果需要捕获异常,需要try catch代码块包裹,ElasticsearchException是runtime exception
GetRequest request = new GetRequest("does_not_exist", "doc", "1"); try { GetResponse getResponse = client.get(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { //处理文档不存在情况 } }
指定版本号,版本号冲突情况
try { GetRequest request = new GetRequest("posts", "doc", "1").version(2); GetResponse getResponse = client.get(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { //版本号冲突 } }
3、delete api
如果存在parent的type,需要指定路由(route)
异步调用测试失败,按照官方文档,不管成功还是失败均没捕获到断点
DeleteRequest request = new DeleteRequest( "posts", //index "doc", //type "1"); //id
调用方式
同步
DeleteResponse deleteResponse = client.delete(request);
异步
client.deleteAsync(request, new ActionListener<DeleteResponse>() { @Override public void onResponse(DeleteResponse deleteResponse) { //成功 } @Override public void onFailure(Exception e) { //失败 } });
响应对象
返回操作信息,如下
String index = deleteResponse.getIndex(); String type = deleteResponse.getType(); String id = deleteResponse.getId(); long version = deleteResponse.getVersion(); ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { //成功shard数量小于总数 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); //处理潜在的错误 } }
删除文档不存在情况
DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist"); DeleteResponse deleteResponse = client.delete(request); if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { //删除文档不存在 }
异常处理
版本号冲突
try { DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2); DeleteResponse deleteResponse = client.delete(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { //版本冲突 } }
4、update api
UpdateRequest request = new UpdateRequest( "posts", //index "doc", //type "1"); //document id
painless script暂时不写,没搞明白语法
源数据格式(其实都是转换成json格式)
UpdateRequest request = new UpdateRequest("posts", "doc", "1"); String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}"; request.doc(jsonString, XContentType.JSON); //json格式
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "daily update"); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(jsonMap); //map格式
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("updated", new Date()); builder.field("reason", "daily update"); } builder.endObject(); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(builder); //XContentBuilder
UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update"); //object key-pairs
upserts(save or update)
当文档不存在时,以新文档插入
String jsonString = "{\"created\":\"2017-01-01\"}"; request.upsert(jsonString, XContentType.JSON);
可变参数
设置路由,路由什么意思,前一篇文章介绍过
request.routing("routing");
设置父节点
request.parent("parent");
两种形式的超时设置
request.timeout(TimeValue.timeValueSeconds(1)); request.timeout("1s");
设置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
冲突尝试次数
request.retryOnConflict(3);
是否返回源数据,默认false
request.fetchSource(true);
指定包含字段或者排除字段,重叠时排除字段优先
String[] includes = new String[]{"updated", "r*"}; String[] excludes = Strings.EMPTY_ARRAY; request.fetchSource(new FetchSourceContext(true, includes, excludes));
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"updated"}; request.fetchSource(new FetchSourceContext(true, includes, excludes));
数据版本号
request.version(2);
noop检测。我的理解是返回状态值
request.detectNoop(false);
调用方式
同步
UpdateResponse updateResponse = client.update(request);
异步
client.updateAsync(request, new ActionListener<UpdateResponse>() { @Override public void onResponse(UpdateResponse updateResponse) { //成功 } @Override public void onFailure(Exception e) { //失败 } });
响应对象
String index = updateResponse.getIndex(); String type = updateResponse.getType(); String id = updateResponse.getId(); long version = updateResponse.getVersion(); if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { (upserts)首次创建对象 } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { //文档被更新 } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) { //文档被删除 } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) { //没有 }
设置返回源数据,回去返回的源数据
GetResult result = updateResponse.getGetResult(); //以GetResult格式返回更新后的文档 if (result.isExists()) { String sourceAsString = result.sourceAsString(); //字符串形式返回更新后文档源数据 Map<String, Object> sourceAsMap = result.sourceAsMap(); //map形式 byte[] sourceAsBytes = result.source(); //字节形式 } else { //默认情况下,源数据在响应对象中返回 }
shard异常
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { //成功数量小于总数 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); //处理可能的异常 } }
版本号冲突情况
pdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("field", "value") .version(1); try { UpdateResponse updateResponse = client.update(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { //处理版本号冲突情况 } }