ES java api

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 2.0之后ES的java api用法有了很大变化。在此记录一些。 java应用程序连接ES集群,笔者使用的是TransportClient,获取TransportClient的代码设计为单例模式(见getClient方法)。

2.0之后ES的java api用法有了很大变化。在此记录一些。

java应用程序连接ES集群,笔者使用的是TransportClient,获取TransportClient的代码设计为单例模式(见getClient方法)。同时包含了设置自动提交文档的代码。注释比较详细,不再赘述。

下方另有提交文档、提交搜索请求的代码。

1、连接ES集群代码如下: 

复制代码
  1 package elasticsearch;
  2 
  3 import com.vividsolutions.jts.geom.GeometryFactory;
  4 import com.vividsolutions.jts.geom.MultiPolygon;
  5 import com.vividsolutions.jts.geom.Polygon;
  6 import com.vividsolutions.jts.io.ParseException;
  7 import com.vividsolutions.jts.io.WKTReader;
  8 import org.apache.commons.logging.Log;
  9 import org.apache.commons.logging.LogFactory;
 10 import org.elasticsearch.action.bulk.BulkProcessor;
 11 import org.elasticsearch.action.bulk.BulkRequest;
 12 import org.elasticsearch.action.bulk.BulkResponse;
 13 import org.elasticsearch.client.transport.TransportClient;
 14 import org.elasticsearch.common.settings.Settings;
 15 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 16 import org.elasticsearch.common.unit.ByteSizeUnit;
 17 import org.elasticsearch.common.unit.ByteSizeValue;
 18 import org.elasticsearch.common.unit.TimeValue;
 19 
 20 import java.net.InetAddress;
 21 import java.util.Date;
 22 
 23 /**
 24  * Created by ZhangDong on 2015/12/25.
 25  */
 26 public class EsClient {
 27     static Log log = LogFactory.getLog(EsClient.class);
 28 
 29     //    用于提供单例的TransportClient BulkProcessor
 30     static public TransportClient tclient = null;
 31     static BulkProcessor staticBulkProcessor = null;
 32 
 33 //【获取TransportClient 的方法】
 34     public static TransportClient getClient() {
 35         try {
 36             if (tclient == null) {
 37                 String EsHosts = "10.10.2.1:9300,10.10.2.2:9300";
 38                 Settings settings = Settings.settingsBuilder()
 39                         .put("cluster.name", "wshare_es")//设置集群名称
 40                         .put("tclient.transport.sniff", true).build();//自动嗅探整个集群的状态,把集群中其它机器的ip地址加到客户端中
 41 
 42                 tclient = TransportClient.builder().settings(settings).build();
 43                 String[] nodes = EsHosts.split(",");
 44                 for (String node : nodes) {
 45                     if (node.length() > 0) {//跳过为空的node(当开头、结尾有逗号或多个连续逗号时会出现空node)
 46                         String[] hostPort = node.split(":");
 47                         tclient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
 48 
 49                     }
 50                 }
 51             }//if
 52         } catch (Exception e) {
 53             e.printStackTrace();
 54         }
 55         return tclient;
 56     }
 57      //【设置自动提交文档】
 58     public static BulkProcessor getBulkProcessor() {
 59         //自动批量提交方式
 60         if (staticBulkProcessor == null) {
 61             try {
 62                 staticBulkProcessor = BulkProcessor.builder(getClient(),
 63                         new BulkProcessor.Listener() {
 64                             @Override
 65                             public void beforeBulk(long executionId, BulkRequest request) {
 66                                 //提交前调用
 67 //                                System.out.println(new Date().toString() + " before");
 68                             }
 69 
 70                             @Override
 71                             public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 72                                 //提交结束后调用(无论成功或失败)
 73 //                                System.out.println(new Date().toString() + " response.hasFailures=" + response.hasFailures());
 74                                 log.info( "提交" + response.getItems().length + "个文档,用时"
 75                                         + response.getTookInMillis() + "MS" + (response.hasFailures() ? " 有文档提交失败!" : ""));
 76 //                                response.hasFailures();//是否有提交失败
 77                             }
 78 
 79                             @Override
 80                             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
 81                                 //提交结束且失败时调用
 82                                 log.error( " 有文档提交失败!after failure=" + failure);
 83                             }
 84                         })
 85                         
 86                         .setBulkActions(1000)//文档数量达到1000时提交
 87                         .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//总文档体积达到5MB时提交 //
 88                         .setFlushInterval(TimeValue.timeValueSeconds(5))//每5S提交一次(无论文档数量、体积是否达到阈值)
 89                         .setConcurrentRequests(1)//加1后为可并行的提交请求数,即设为0代表只可1个请求并行,设为1为2个并行
 90                         .build();
 91 //                staticBulkProcessor.awaitClose(10, TimeUnit.MINUTES);//关闭,如有未提交完成的文档则等待完成,最多等待10分钟
 92             } catch (Exception e) {//关闭时抛出异常
 93                 e.printStackTrace();
 94             }
 95         }//if
 96 
 97 
 98 
 99 
100 
101         return staticBulkProcessor;
102     }
103 }
复制代码

 2、插入文档的代码(自动批量提交方式,注释中另有手动批量提交、单个文档提交的方式)

复制代码
 1 package elasticsearch;
 2 
 3 import org.apache.commons.logging.Log;
 4 import org.apache.commons.logging.LogFactory;
 5 import org.elasticsearch.action.index.IndexRequest;
 6 
 7 
 8 /**
 9  * Created by ZhangDong on 2015/12/25.
10  */
11 public class EsInsert2 {
12     static Log log = LogFactory.getLog(EsInsert2.class);
13     public static void add(String json) {
14                 try {  //EsClient.getBulkProcessor()是位于上方EsClient类中的方法
15                     EsClient.getBulkProcessor().add(new IndexRequest("设置的index name", "设置的type name","要插入的文档的ID").source(json));//添加文档,以便自动提交
16                 } catch (Exception e) {
17                     log.error("add文档时出现异常:e=" + e + " json=" + json);
18                 }
19     }
20 }
21 //手动 批量更新
22 //        BulkRequestBuilder bulkRequest = tclient.prepareBulk();
23 //        for(int i=500;i<1000;i++){
24 //            //业务对象
25 //            String json = "";
26 //            IndexRequestBuilder indexRequest = tclient.prepareIndex("twitter", "tweet")
27 //                    //指定不重复的ID
28 //                    .setSource(json).setId(String.valueOf(i));
29 //            //添加到builder中
30 //            bulkRequest.add(indexRequest);
31 //        }
32 //
33 //        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
34 //        if (bulkResponse.hasFailures()) {
35 //            // process failures by iterating through each bulk response item
36 //            System.out.println(bulkResponse.buildFailureMessage());
37 //        }
38 
39 //单个文档提交
40 //        String json = "{\"relationship\":{},\"tags\":[\"camera\",\"video\"]}";
41 //        IndexResponse response = getClient().prepareIndex("dots", "scan", JSON.parseObject(json).getString("rid")).setSource(json).get();
42 //        return response.toString();
复制代码

 

3、进行搜索的代码,其中有适用于复杂搜索逻辑的BoolQuery用法,以及关键词高亮的配置、在某个字段精确搜索、全文搜索、匹配全部文档、搜索同时返回聚类信息的用法:

复制代码
 1 package service;
 2 
 3 import elasticsearch.EsClient;
 4 import org.apache.commons.logging.Log;
 5 import org.apache.commons.logging.LogFactory;
 6 import org.elasticsearch.action.search.SearchRequestBuilder;
 7 import org.elasticsearch.action.search.SearchResponse;
 8 import org.elasticsearch.index.query.*;
 9 import org.elasticsearch.search.aggregations.AggregationBuilders;
10 import org.springframework.stereotype.Service;
11 
12 /**
13  * Created by ZhangDong on 2016/1/5.
14  */
15 @Service
16 public class SearchService2 {
17 
18     Log log = LogFactory.getLog(getClass());
19     public SearchResponse getSimpleSearchResponse( int page, int pagesize){
20 
21         BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();
22         mustQuery.must(QueryBuilders.matchAllQuery()); // 添加第1条must的条件 此处为匹配所有文档
23 
24         mustQuery.must(QueryBuilders.matchPhraseQuery("title", "时间简史"));//添加第2条must的条件 title字段必须为【时间简史】
25         // ↑ 放入筛选条件(termQuery为精确搜索,大小写敏感且不支持*) 实验发现matchPhraseQuery可对中文精确匹配term
26 
27         mustQuery.must(QueryBuilders.matchQuery("auther", "霍金")); // 添加第3条must的条件
28 
29         QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("物理")//.escape(true)//escape 转义 设为true,避免搜索[]、结尾为!的关键词时异常 但无法搜索*
30                 .defaultOperator(QueryStringQueryBuilder.Operator.AND);//不同关键词之间使用and关系
31         mustQuery.must(queryBuilder);//添加第4条must的条件 关键词全文搜索筛选条件
32 
33         SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch("index name").setTypes("type name")
34                 .setQuery(mustQuery)
35                 .addHighlightedField("*")/*星号表示在所有字段都高亮*/.setHighlighterRequireFieldMatch(false)//配置高亮显示搜索结果
36                 .setHighlighterPreTags("<高亮前缀标签>").setHighlighterPostTags("<高亮后缀标签>");//配置高亮显示搜索结果
37 
38                 searchRequestBuilder = searchRequestBuilder.addAggregation(AggregationBuilders.terms("agg1(聚类返回时根据此key获取聚类结果)")
39                         .size(1000)/*返回1000条聚类结果*/.field("要在文档中聚类的字段,如果是嵌套的则用点连接父子字段,如【person.company.name】"));
40 
41         SearchResponse searchResponse = searchRequestBuilder.setFrom((page - 1) * pagesize)//分页起始位置(跳过开始的n个)
42                 .setSize(pagesize)//本次返回的文档数量
43                 .execute().actionGet();//执行搜索
44 
45         log.info("response="+searchResponse);
46         return searchResponse;
47     }
48 }
复制代码

 

4、ES中使用delete-by-query插件,DSL方式按条件删除数据的方法:

ES2.1中,默认的文档删除方式只有按ID删除方法:

curl -XDELETE 'localhost:9200/customer/external/2?pretty'

(参考:Deleting Documents | Elasticsearch Reference [2.1] | Elastic https://www.elastic.co/guide/en/elasticsearch/reference/2.1/_deleting_documents.html

按条件删除需要安装delete-by-query插件,在线安装方式可使用命令

plugin install delete-by-query

随后会从https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/delete-by-query/2.1.0/delete-by-query-2.1.0.zip处下载插件安装包。但是本人使用的某个ES环境是离线的,需要手动下载上述URL对应的ZIP,放置于elasticsearch-2.1.0文件夹下,与bin、config等文件夹同级,同时还要下载 https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/delete-by-query/2.1.0/delete-by-query-2.1.0.zip.md5 校验文件放于同一位置(XXX.sha1应该也可以),使用以下命令离线安装:

bin/plugin install file:delete-by-query-2.1.0.zip

其中delete-by-query-2.1.0.zip是相对路径,绝对路径应该也可以,随后便安装成功了。

安装成功后查看,发现其实就是解压delete-by-query-2.1.0.zip的内容放置于elasticsearch-2.1.0/plugins/delete-by-query 文件夹下,猜测手动解压也可以使用。

注意:如果是ES集群,需要对每个节点都安装这个插件,而且每个节点安装后要重启ES。

使用DSL方式按条件删除文档的方法:

复制代码
DELETE方式,请求
http://localhost:9200/index_name/type_name/_query
http payload内容:
{
  "query":{
    "match_all":{}
  }
}
上述query为匹配全部文档。
复制代码
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
2月前
|
Java API Spring
打造未来电商新引擎:揭秘Java可扩展API设计,让支付与物流灵活如丝,引领电商时代潮流!
【8月更文挑战第30天】本文通过电商平台案例,探讨了如何设计可扩展的Java API。首先定义支付和物流服务的接口与抽象类,然后实现具体服务,接着引入工厂模式或依赖注入管理服务实例,最后通过配置实现灵活扩展。这种设计确保了应用架构的灵活性和长期稳定性。
46 3
|
2月前
|
安全 Java API
告别繁琐编码,拥抱Java 8新特性:Stream API与Optional类助你高效编程,成就卓越开发者!
【8月更文挑战第29天】Java 8为开发者引入了多项新特性,其中Stream API和Optional类尤其值得关注。Stream API对集合操作进行了高级抽象,支持声明式的数据处理,避免了显式循环代码的编写;而Optional类则作为非空值的容器,有效减少了空指针异常的风险。通过几个实战示例,我们展示了如何利用Stream API进行过滤与转换操作,以及如何借助Optional类安全地处理可能为null的数据,从而使代码更加简洁和健壮。
74 0
|
27天前
|
Java API C++
Java 8 Stream Api 中的 peek 操作
本文介绍了Java中`Stream`的`peek`操作,该操作通过`Consumer&lt;T&gt;`函数消费流中的每个元素,但不改变元素类型。文章详细解释了`Consumer&lt;T&gt;`接口及其使用场景,并通过示例代码展示了`peek`操作的应用。此外,还对比了`peek`与`map`的区别,帮助读者更好地理解这两种操作的不同用途。作者为码农小胖哥,原文发布于稀土掘金。
Java 8 Stream Api 中的 peek 操作
|
10天前
|
安全 Java API
时间日期API(Date,SimpleDateFormat,Calendar)+java8新增日期API (LocalTime,LocalDate,LocalDateTime)
这篇文章介绍了Java中处理日期和时间的API,包括旧的日期API(Date、SimpleDateFormat、Calendar)和Java 8引入的新日期API(LocalTime、LocalDate、LocalDateTime)。文章详细解释了这些类/接口的方法和用途,并通过代码示例展示了如何使用它们。此外,还讨论了新旧API的区别,新API的不可变性和线程安全性,以及它们提供的操作日期时间的灵活性和简洁性。
|
13天前
|
Java 程序员 API
Java 8新特性之Lambda表达式与Stream API的探索
【9月更文挑战第24天】本文将深入浅出地介绍Java 8中的重要新特性——Lambda表达式和Stream API,通过实例解析其语法、用法及背后的设计哲学。我们将一探究竟,看看这些新特性如何让Java代码变得更加简洁、易读且富有表现力,同时提升程序的性能和开发效率。
|
12天前
|
SQL Java Linux
Java 8 API添加了一个新的抽象称为流Stream
Java 8 API添加了一个新的抽象称为流Stream
|
14天前
|
Java
flyway报错Caused by: java.lang.NoSuchMethodError: org.flywaydb.core.api.configuration.FluentConfigurat
flyway报错Caused by: java.lang.NoSuchMethodError: org.flywaydb.core.api.configuration.FluentConfigurat
16 2
|
1月前
|
安全 Java API
【性能与安全的双重飞跃】JDK 22外部函数与内存API:JNI的继任者,引领Java新潮流!
【9月更文挑战第7天】JDK 22外部函数与内存API的发布,标志着Java在性能与安全性方面实现了双重飞跃。作为JNI的继任者,这一新特性不仅简化了Java与本地代码的交互过程,还提升了程序的性能和安全性。我们有理由相信,在外部函数与内存API的引领下,Java将开启一个全新的编程时代,为开发者们带来更加高效、更加安全的编程体验。让我们共同期待Java在未来的辉煌成就!
51 11
|
1月前
|
安全 Java API
【本地与Java无缝对接】JDK 22外部函数和内存API:JNI终结者,性能与安全双提升!
【9月更文挑战第6天】JDK 22的外部函数和内存API无疑是Java编程语言发展史上的一个重要里程碑。它不仅解决了JNI的诸多局限和挑战,还为Java与本地代码的互操作提供了更加高效、安全和简洁的解决方案。随着FFM API的逐渐成熟和完善,我们有理由相信,Java将在更多领域展现出其强大的生命力和竞争力。让我们共同期待Java编程新纪元的到来!
49 11
|
29天前
|
监控 Java 大数据
【Java内存管理新突破】JDK 22:细粒度内存管理API,精准控制每一块内存!
【9月更文挑战第9天】虽然目前JDK 22的确切内容尚未公布,但我们可以根据Java语言的发展趋势和社区的需求,预测细粒度内存管理API可能成为未来Java内存管理领域的新突破。这套API将为开发者提供前所未有的内存控制能力,助力Java应用在更多领域发挥更大作用。我们期待JDK 22的发布,期待Java语言在内存管理领域的持续创新和发展。
下一篇
无影云桌面