Elasitcsearch High Level Rest Client学习笔记(二) 基础API - 木子H的个人空间 - OSCHINA

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Elasitcsearch High Level Rest Client学习笔记(二) 基础API - 木子H的个人空间 - OSCHINA

1、index API

IndexRequest request = new IndexRequest(
        "posts", //index
        "doc",   //type 类型,我对类型的理解有点类似于数据库中的表   index类似于数据库中的database
        "1");    //Document Id
String jsonString = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
        "}";
request.source(jsonString, XContentType.JSON);  //source可以有多种形式下面介绍

source可以以map的形式提供,查看官方文档介绍map形式提供的source会自动转换成json格式,初步观察源代码,写的还挺复杂,简单过了一遍其实没太懂,大概意思是map->XContentBuilder,XContentBuilder通过内置工具生成json

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(jsonMap);

source也可以以XContentBuilder形式提供,通过 Elasticsearch built-in helpers生产恒json

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy");
    builder.field("postDate", new Date());
    builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(builder);

source也可以以Object key-value形式提供,依然转换成json

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source("user", "kimchy",
                "postDate", new Date(),
                "message", "trying out Elasticsearch");

其他可选参数

设置路由,说起这个方法要介绍一下路由的概念,elasticsearch 路由机制

request.routing("routing");

parent,es的parent-child结构,简单点说是一对多的关系,es多对多情况要拆分成一对多;限制是parent和children必须在同一个shard当中,当添加文档时指定了parent后,就不会用默认的本文档id分配路由,而是采用父文档的路由值,保证父文档和子文档处于同一个shard当中。需要注意的是,查询时需要指定路由,否则查询会出错。当大于等于三级时,需要指定最顶层父节点路由,以让文档存储在用一个shard中。

request.parent("parent");

设置超时时间,有两种形式, TimeValue 或者字符串

request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s");

设置刷新策略, WriteRequest.RefreshPolicy 或者字符串

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");

版本号,好像是文档版本,类似于乐观锁的东西吧,没验证,验证后更新

request.version(2);

操作类型,当设置create时,并且指定index、type和id存在,不会更新文档,会抛出异常

request.opType(DocWriteRequest.OpType.CREATE); 
request.opType("create");

同步调用方式,没什么可说的

IndexResponse indexResponse = client.index(request);

异步调用方式,需要传入一个监听接口,通知结果或者接受异常

client.indexAsync(request, new ActionListener<IndexResponse>() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        //成功后执行代码,响应结果以参数传入
    }
    @Override
    public void onFailure(Exception e) {
       //失败时执行代码,异常以参数传入
    }
});

响应结果

响应结果官方示例代码如下

String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    //文档不存在,创建后处理代码
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    、、//
/////文档被更新
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    //成功shard数量小于总数
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); 
        //处理可能存在的失败
    }
}

异常处理

文档发生版本冲突

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .version(1);
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        //版本冲突处理代码
    }
}

设置opType=create,并且指定index、type和id存在,不会更新文档,会抛出异常

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
         //处理代码
    }
}

2、get api

GetRequest getRequest = new GetRequest(
        "posts", //index
        "doc",  //type
        "1");     //document id

可变参数

禁用获取源数据(source),默认开启。先要理解什么是源数据,_source字段,保存存储是的json body

request.fetchSourceContext(new FetchSourceContext(false));

设置检索返回字段和检索排除字段,同时试了一下即包含又排除的字段,排除有限,既包含又排斥的座位排斥字段

String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);

设置返回指定存储字段(我是这样理解的,不过我测试失败了,设置了存储字段之后,还是不能像例子一样获取值)

request.storedFields("message"); 
GetResponse getResponse = client.get(request);
String message = getResponse.getField("message").getValue();

设置路由,parent-child查询时需要指定路由

request.routing("routing");

设置查询父节点,在get查询时,不理解为什么需要设置parent,我测试了一下发现没什么用,不管设置的parent是不是正确的parent,都能查询出结果,这个应该是一个通用方法吧,在get中可能没有用到

request.parent("parent");

elasticsearch可以使用preference参数来指定分片查询的优先级,即我们可以通过该参数来控制搜索时的索引数据分片。如不设置该参数:在所有有效的主分片以及副本间轮询

没太理解,所以没测试

request.preference("preference");

设置实时查询开启/关闭,默认为true。

通过查询资料,real-time是es的一种方式,也是新文档索引模型,跟这个api好像关系不大,这个api不是很理解,我本地集群数据比较少,true/false没发现太大区别;es通过fsync把数据写入到磁盘中,fsync十分消耗资源,es的实时查询瓶颈在硬盘读写,es利用文件系统缓存来加快实时查询速度。具体资料:

大牛博客:https://www.jianshu.com/p/94ce44d6a802

官方文档:https://www.elastic.co/guide/en/elasticsearch/guide/current/near-real-time.html

request.realtime(false);

设置查询前执行刷新,默认false,每个shard默认1秒钟一次refresh

request.refresh(true);

版本号,不多解释

request.version(2);

调用方式

同步

GetResponse getResponse = client.get(getRequest);

异步

client.getAsync(request, new ActionListener<GetResponse>() {
    @Override
    public void onResponse(GetResponse getResponse) {
        //成功
    }
    @Override
    public void onFailure(Exception e) {
        //失败
    }
});

响应对象

GetResponse查询请求文档及其源数据以及最终存储字段

String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
    long version = getResponse.getVersion();
    String sourceAsString = getResponse.getSourceAsString();        //字符串形式
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); //Map形式
    byte[] sourceAsBytes = getResponse.getSourceAsBytes();          //字节数组形式
} else {
    //处理找不到文档代码。注意返回的是404状态而不是异常,
}

异常处理

如果需要捕获异常,需要try catch代码块包裹,ElasticsearchException是runtime exception

GetRequest request = new GetRequest("does_not_exist", "doc", "1");
try {
    GetResponse getResponse = client.get(request);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        //处理文档不存在情况
    }
}

指定版本号,版本号冲突情况

try {
    GetRequest request = new GetRequest("posts", "doc", "1").version(2);
    GetResponse getResponse = client.get(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        //版本号冲突
    }
}

3、delete api

如果存在parent的type,需要指定路由(route)

异步调用测试失败,按照官方文档,不管成功还是失败均没捕获到断点

DeleteRequest request = new DeleteRequest(
        "posts",    //index
        "doc",     //type
        "1");      //id

调用方式

同步

DeleteResponse deleteResponse = client.delete(request);

异步

client.deleteAsync(request, new ActionListener<DeleteResponse>() {
    @Override
    public void onResponse(DeleteResponse deleteResponse) {
        //成功
    }
    @Override
    public void onFailure(Exception e) {
        //失败
    }
});

响应对象

返回操作信息,如下

String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    //成功shard数量小于总数
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); //处理潜在的错误
    }
}

删除文档不存在情况

DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(request);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    //删除文档不存在
}

异常处理

版本号冲突

try {
    DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
    DeleteResponse deleteResponse = client.delete(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        //版本冲突
    }
}

4、update api

UpdateRequest request = new UpdateRequest(
        "posts", //index
        "doc",  //type
        "1");    //document id

painless script暂时不写,没搞明白语法

源数据格式(其实都是转换成json格式)

UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON); //json格式
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(jsonMap); //map格式
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(builder); //XContentBuilder
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); //object key-pairs

upserts(save or update)

当文档不存在时,以新文档插入

String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);

可变参数

设置路由,路由什么意思,前一篇文章介绍过

request.routing("routing");

设置父节点

request.parent("parent");

两种形式的超时设置

request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s");

设置刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");

冲突尝试次数

request.retryOnConflict(3);

是否返回源数据,默认false

request.fetchSource(true);

指定包含字段或者排除字段,重叠时排除字段优先

String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes));
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(new FetchSourceContext(true, includes, excludes));

数据版本号

request.version(2);

noop检测。我的理解是返回状态值

request.detectNoop(false);

调用方式

同步

UpdateResponse updateResponse = client.update(request);

异步

client.updateAsync(request, new ActionListener<UpdateResponse>() {
    @Override
    public void onResponse(UpdateResponse updateResponse) {
        //成功
    }
    @Override
    public void onFailure(Exception e) {
        //失败
    }
});

响应对象

String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    (upserts)首次创建对象
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    //文档被更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    //文档被删除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    //没有
}

设置返回源数据,回去返回的源数据

GetResult result = updateResponse.getGetResult(); //以GetResult格式返回更新后的文档
if (result.isExists()) {
    String sourceAsString = result.sourceAsString(); //字符串形式返回更新后文档源数据
    Map<String, Object> sourceAsMap = result.sourceAsMap(); //map形式
    byte[] sourceAsBytes = result.source(); //字节形式
} else {
    //默认情况下,源数据在响应对象中返回
}

shard异常

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
     //成功数量小于总数
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason();  //处理可能的异常
    }
}

版本号冲突情况

pdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("field", "value")
        .version(1);
try {
    UpdateResponse updateResponse = client.update(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        //处理版本号冲突情况
    }
}


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
9天前
|
安全 Java API
第7章 Spring Security 的 REST API 与微服务安全(2024 最新版)(上)
第7章 Spring Security 的 REST API 与微服务安全(2024 最新版)
28 0
第7章 Spring Security 的 REST API 与微服务安全(2024 最新版)(上)
|
1月前
|
前端开发 JavaScript API
基于React的简易REST API客户端设计与实现
基于React的简易REST API客户端设计与实现
21 3
|
6月前
|
安全 API 开发工具
获取仓库列表接口可以通过SDK或者REST API两种方式调用
获取仓库列表接口可以通过SDK或者REST API两种方式调用
50 2
|
2月前
|
缓存 小程序 API
【社区每周】新增保存文件到系统储存空间API;小程序开发体验问卷调研发布
【社区每周】新增保存文件到系统储存空间API;小程序开发体验问卷调研发布
43 0
|
3月前
|
JSON 缓存 API
title: 深入理解REST API设计的最佳实践
title: 深入理解REST API设计的最佳实践
36 0
|
4月前
|
分布式计算 Hadoop Java
[hadoop3.x系列]HDFS REST HTTP API的使用(二)HttpFS
[hadoop3.x系列]HDFS REST HTTP API的使用(二)HttpFS
54 1
|
4月前
|
分布式计算 Hadoop API
✨[hadoop3.x系列]HDFS REST HTTP API的使用(一)WebHDFS
✨[hadoop3.x系列]HDFS REST HTTP API的使用(一)WebHDFS
63 1
|
4月前
|
JSON 前端开发 生物认证
REST API 的指纹验证机制
REST API 的指纹验证机制
33 0
|
5月前
|
JSON Kubernetes API
kubernetes REST Api详解(导入Swagger至Postman)
kubernetes REST Api详解(导入Swagger至Postman)
110 1
|
5月前
|
XML 资源调度 API
YARN REST API 总结
YARN REST API 总结
118 1