ElasticSearch API for JAVA 学习笔记

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介:

Client

Client是一个类,通过这个类可以实现对ES集群的各种操作:Index, Get, Delete , Search,以及对ES集群的管理任务。

Client的构造需要基于TransportClient

TransportClient

TransportClient可以远程连接ES集群,通过一个传输模块,但是它不真正的连接到集群,只是获取集群的一个或多个初始传输地址,在每次请求动作时,才真正连接到ES集群。

Settings

Settings类主要是在启动Client之前,配置一些属性参数,主要配置集群名称cluster.name,还有其他参数:

client.transport.sniff  是否为传输client添加嗅探功能

client.transport.ignore_cluster_name 设为true,忽略连接节点的集群名称验证

client.transport.ping_timeout 设置ping节点时的时间限,默认5s

client.transport.nodes_sampler_interval 设置sample/ping nodes listed 间隔时间,默认5s

1
2
3
4
5
6
7
8
9
10
11
12
13
         //通过Settings类设置属性参数
         Settings settings = Settings.settingsBuilder().put( "cluster.name" , "index-name" ).build();
         
         //启动Client
         Client client = TransportClient.builder().settings(settings).build().
                 addTransportAddress( new  InetSocketTransportAddress(InetAddress.getByName( "192.168.xxx.xxx" ), 9300 ));
         
         //如果不需要设置参数,直接如下
         /*Client client = TransportClient.builder().build().
                 addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.xxx.xxx"),9300));*/
         
         //关闭Clinet
         client.close();

Document API

主要分为以下类:Index API , Get API , Delete API , Update API, Multi Get API, Bulk API

es中的增删改查

Index API可以索引一个典型的JSON文档到指定的索引中,并且可以使它可以检索。

产生JSON

JSON产生可以有以下几种方式:

手动拼接一个JSON字符串

使用Map

使用第三方库,比如Jackson

使用内置的XContentFactory.jsonBuilder()

每种类型都会转换为byte[],因此如果对象已经是这种形式,可以直接使用,jsonBuilder是一个高度优化了的JSON产生器,它直接构造byte[]

通过下边的代码讲解四种方法:index-api, get-api, delete-api, update-api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
      * es-api的方法学习:
      * 1.prepareIndex方法:索引数据到ElasticSearch
      * 2.prepareGet方法:获取信息
      * 3.prepareDelete方法:删除信息
      * 4.update方法:更新信息
      *   4.1 upsert:在使用update方法时:
      *       a:针对文档不存在的情况时,做出index数据的操作,update无效;
      *        b:如果文档存在,那么index数据操作无效,update有效;
      */
     public  static  void  main(String[] args)  throws  IOException, InterruptedException, ExecutionException {
         //通过Settings类设置属性参数  
         Settings settings = Settings.settingsBuilder().put( "cluster.name" , "myApp" ).build();  
           
         //启动Client  
         Client client =  null ;
         try  {
             client = TransportClient.builder().settings(settings).build().  
                     addTransportAddress( new  InetSocketTransportAddress(InetAddress.getByName( "101.200.124.27" ), 9300 ));
         catch  (UnknownHostException e) {
             e.printStackTrace();
         }  
         //执行操作
         SimpleDateFormat df =  new  SimpleDateFormat( "yyyy-MM-dd" );
         XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()
                 .startObject()
                 .field( "user" , "yuchen" )
                 .field( "interest" , "reading book" )
                 .field( "insert_time" ,df.format( new  Date()))
                 .endObject();
         //1.prepareIndex方法:索引数据到ElasticSearch
         IndexResponse response = client.prepareIndex( "index-test" , "weibo" , "4" )
             .setSource(jsonBuilder)
             .get();
         
         String _index = response.getIndex();
         String _type = response.getType();
         String _id = response.getId();
         long  _version = response.getVersion();
         boolean  created = response.isCreated();
         System.out.println(_index+ " " +_type+ " " +_id+ " " +_version+ " " +created);
         
         //2.prepareGet方法:获取信息
         GetResponse getResponse = client.prepareGet( "index-test" , "weibo" , "1" ).get();
         System.out.println(getResponse.getSourceAsString());
         
         //3.prepareDelete方法:删除信息
         DeleteResponse deleteResponse = client.prepareDelete( "index-test" , "weibo" , "4" ).get();
         System.out.println(deleteResponse.isFound());
         
         //4.update方法:更新信息
         UpdateRequest updateRequest =  new  UpdateRequest();
         updateRequest.index( "index-test" );
         updateRequest.type( "weibo" );
         updateRequest.id( "1" );
         updateRequest.doc(XContentFactory.jsonBuilder().startObject().field( "interest" , "music" ).endObject());
         UpdateResponse updateResponse = client.update(updateRequest).get();
         System.out.println(updateResponse.isCreated());
         //update方法: 可以为已有的文档添加新的字段
         UpdateResponse updateResponse2 = client.prepareUpdate( "index-test" "weibo" "1" )
             .setDoc(XContentFactory.jsonBuilder()
                 .startObject()
                 .field( "interest2" , "reading" )
                 .endObject()).get();
         System.out.println(updateResponse2.isCreated());
         //4.1 upsert:在使用update方法时,a:针对文档不存在的情况时,做出index数据的操作,update无效;
         //                          b:如果文档存在,那么index数据操作无效,update有效;
         //先构建一个IndexRequest
         IndexRequest indexRequest =  new  IndexRequest( "index-test" , "weibo" , "14" );
         indexRequest.source(XContentFactory.jsonBuilder()
                 .startObject()
                 .field( "user" , "yuchen2" )
                 .field( "interest" , "eating" )
                 .field( "insert_time" ,df.format( new  Date()))
                 .endObject());
         //再构建一个UpdateRequest,并用IndexRequest关联
         UpdateRequest updateRequest3 =  new  UpdateRequest( "index-test" , "weibo" , "14" );
         updateRequest3.doc(XContentFactory.jsonBuilder()
                 .startObject()
                 .field( "interest2" , "love" )
                 .endObject()
                 ).upsert(indexRequest);
         client.update(updateRequest3).get();
         
         if (client !=  null ){
             client.close();  
         }
     }

批量操作

Multi Get Api 和 Bulk Api可进行批量的增删改查

使用Multi Get Api 批量获取:

1
2
3
4
5
6
7
8
9
10
         //1. Muti-get Api
         //可以指定单个id,也在index,type下指定一个id-list;也可以指定别的index/type
         MultiGetResponse multiGetResponse = client.prepareMultiGet()
                 .add( "index-test" , "weibo" , "1" ) //指定单个id
                 .add( "index-test" , "weibo" , "11" , "13" , "14" ) //指定一个id-list
                 .add( "index-other" , "news" , "1" , "3" ).get(); //指定别的index/type
         for (MultiGetItemResponse item:multiGetResponse){
             GetResponse response = item.getResponse();
             System.out.println(response.getSourceAsString());
         }

Bulk Api批量增加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
         //2.Bulk Api:可以进行批量index和批量删除操作
         //2.1批量增加
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         bulkRequest.add(client.prepareIndex( "index-test" "weibo" "20" )
             .setSource(XContentFactory.jsonBuilder()
                     .startObject()
                         .field( "user" "yuchen20" )
                         .field( "postDate" new  Date())
                         .field( "message" "trying out Elasticsearch" )
                     .endObject()
                   )
           );
         bulkRequest.add(client.prepareIndex( "index-test" "weibo" "21" )
                 .setSource(XContentFactory.jsonBuilder()
                         .startObject()
                             .field( "user" "yuchen21" )
                             .field( "postDate" new  Date())
                             .field( "message" "trying out Elasticsearch" )
                         .endObject()
                       )
               );
         
         BulkResponse bulkResponse = bulkRequest.get();
         if (bulkResponse.hasFailures()){
             //...
         }

Bulk Api批量删除:

1
2
3
4
5
6
7
8
9
10
11
         //2.2批量删除
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         bulkRequest.add(client.prepareDelete( "index-test" "weibo" "20" )
           );
         bulkRequest.add(client.prepareDelete( "index-test" "weibo" "21" )
               );
         
         BulkResponse bulkResponse = bulkRequest.get();
         if (bulkResponse.hasFailures()){
             System.out.println( "bulk error:" +bulkResponse.buildFailureMessage());
         }

Bulk Api 批量更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
         //2.3批量更新
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         bulkRequest.add(client.prepareUpdate( "index-test" "weibo" "11" ).setDoc(XContentFactory
                 .jsonBuilder().startObject()
                 .field( "country" , "China" ) //新添加字段
                 .endObject()
                 )
           );
         bulkRequest.add(client.prepareUpdate( "index-test" "weibo" "13" ).setDoc(XContentFactory
                 .jsonBuilder().startObject()
                 .field( "user" , "yuchen13" ) //更新字段
                 .endObject()
                 )
               );
         
         BulkResponse bulkResponse = bulkRequest.get();
         if (bulkResponse.hasFailures()){
             System.out.println( "bulk error:" +bulkResponse.buildFailureMessage());
         }

BulkProcessor设置批量请求的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
         //BulkProcessor
         BulkProcessor bulkProcessor = BulkProcessor.builder(client,  new  BulkProcessor.Listener() {
             @Override
             public  void  beforeBulk( long  arg0, BulkRequest arg1) {
                 //批量执行前做的事情
                 System.out.println( "bulk api action starting..." );
             }
             @Override
             public  void  afterBulk( long  arg0, BulkRequest arg1, Throwable arg2) {
                 System.out.println( "exception:bukl api action ending...:" +arg2.getMessage());
             }
             @Override
             public  void  afterBulk( long  arg0, BulkRequest arg1, BulkResponse arg2) {
                 //正常执行完毕后...
                 System.out.println( "normal:bukl api action ending..." );
             }
         })
         //设置多种条件,对批量操作进行限制,达到限制中的任何一种触发请求的批量提交
         .setBulkActions( 1000 ) //设置批量操作一次性执行的action个数,根据请求个数批量提交
         //.setBulkSize(new ByteSizeValue(1,ByteSizeUnit.KB))//设置批量提交请求的大小允许的最大值
         //.setFlushInterval(TimeValue.timeValueMillis(100))//根据时间周期批量提交请求
         //.setConcurrentRequests(1)//设置允许并发请求的数量
         //设置请求失败时的补偿措施,重复请求3次
         //.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
         .build();
         for ( int  i = 0 ;i< 100000 ;i++){
             bulkProcessor.add( new  IndexRequest( "index-test" , "weibo2" , "" +i).source(
                     XContentFactory
                     .jsonBuilder()
                     .startObject()
                     .field( "name" , "yuchen" +i)
                     .field( "interest" , "love" +i)
                     .endObject()));
         }
         bulkProcessor.awaitClose( 5 , TimeUnit.MINUTES); //释放bulkProcessor资源
         System.out.println( "load succeed!" );


默认的参数:

  • sets bulkActions to 1000

  • sets bulkSize to 5mb

  • does not set flushInterval

  • sets concurrentRequests to 1

  • sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.


参考地址:

http://blog.csdn.net/wuyzhen_csdn/article/details/52381697











本文转自yunlielai51CTO博客,原文链接:http://blog.51cto.com/4925054/2084251,如需转载请自行联系原作者


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
3天前
|
SQL Java 程序员
Java 8中的Stream API:简介与实用案例
【5月更文挑战第23天】本文将深入探讨Java 8中的Stream API,这是一种能够极大提升Java程序员生产力的新特性。我们将从基础概念开始,然后通过一些实用的案例来展示如何使用Stream API进行数据处理和操作。无论你是Java的初学者还是经验丰富的开发者,本文都将为你提供有价值的信息。
|
3天前
|
关系型数据库 MySQL Java
实时计算 Flink版操作报错之遇到java.lang.IllegalStateException: The elasticsearch emitter must be serializable.的错误,如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2天前
|
安全 算法 Java
Java Stream API:原理、应用与深入解析
Java Stream API:原理、应用与深入解析
|
3天前
|
XML Web App开发 JavaScript
软件测试 -- Selenium常用API全面解答(java)
软件测试 -- Selenium常用API全面解答(java)
14 0
|
3天前
|
Prometheus 监控 Cloud Native
实时计算 Flink版操作报错之在使用ES时遇到“java.lang.IllegalStateException: The elasticsearch emitter must be serializable”,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3天前
|
Java API
Java 8新特性之Lambda表达式与Stream API
本文将介绍Java 8中的两个重要特性:Lambda表达式和Stream API。Lambda表达式是一种新的编程语法,可以使代码更简洁、易读。Stream API是一种处理数据的新方式,可以让我们更方便地对集合进行操作。通过学习这两个特性,我们可以编写出更简洁、高效的Java代码。
|
4天前
|
存储 Java API
【JAVA学习之路 | 提高篇】[内部类与常见API]String类
【JAVA学习之路 | 提高篇】[内部类与常见API]String类
|
11天前
|
Java Maven 开发工具
【ElasticSearch 】IK 分词器安装
【ElasticSearch 】IK 分词器安装
27 1
|
11天前
|
数据可视化 索引
elasticsearch head、kibana 安装和使用
elasticsearch head、kibana 安装和使用
|
11天前
|
Java Windows
windows下 安装 Elasticsearch报错warning: usage of JAVA_HOME is deprecated, use ES_JAVA_HOME
windows下 安装 Elasticsearch报错warning: usage of JAVA_HOME is deprecated, use ES_JAVA_HOME
51 0