1 package elasticsearch; 2 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 7 import org.elasticsearch.action.bulk.BulkItemResponse; 8 import org.elasticsearch.action.bulk.BulkRequestBuilder; 9 import org.elasticsearch.action.bulk.BulkResponse; 10 import org.elasticsearch.action.delete.DeleteRequest; 11 import org.elasticsearch.action.delete.DeleteResponse; 12 import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; 13 import org.elasticsearch.action.get.GetResponse; 14 import org.elasticsearch.action.index.IndexRequest; 15 import org.elasticsearch.action.index.IndexResponse; 16 import org.elasticsearch.action.search.SearchResponse; 17 import org.elasticsearch.action.search.SearchType; 18 import org.elasticsearch.action.update.UpdateResponse; 19 import org.elasticsearch.client.transport.TransportClient; 20 import org.elasticsearch.cluster.node.DiscoveryNode; 21 import org.elasticsearch.common.collect.ImmutableList; 22 import org.elasticsearch.common.settings.ImmutableSettings; 23 import org.elasticsearch.common.settings.Settings; 24 import org.elasticsearch.common.text.Text; 25 import org.elasticsearch.common.transport.InetSocketTransportAddress; 26 import org.elasticsearch.common.transport.TransportAddress; 27 import org.elasticsearch.common.xcontent.XContentBuilder; 28 import org.elasticsearch.common.xcontent.XContentFactory; 29 import org.elasticsearch.index.query.QueryBuilders; 30 import org.elasticsearch.search.SearchHit; 31 import org.elasticsearch.search.SearchHits; 32 import org.elasticsearch.search.aggregations.AggregationBuilders; 33 import org.elasticsearch.search.aggregations.bucket.terms.Terms; 34 import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; 35 import org.elasticsearch.search.aggregations.metrics.sum.Sum; 36 import org.elasticsearch.search.highlight.HighlightField; 37 import org.elasticsearch.search.sort.SortOrder; 38 import org.junit.Before; 39 import org.junit.Test; 40 41 import com.fasterxml.jackson.databind.ObjectMapper; 42 43 public class TestEs { 44 45 TransportClient transportClient = new TransportClient(); 46 //before表示在执行每个test方法之前运行 47 @Before 48 public void test0() throws Exception { 49 //指定es集群中的节点信息 50 TransportAddress transportAddress = new InetSocketTransportAddress("192.168.1.99",9300); 51 //TransportAddress transportAddress1 = new InetSocketTransportAddress("192.168.1.98",9300); 52 transportClient.addTransportAddresses(transportAddress); 53 } 54 55 /** 56 * 通过java代码操作es-1 57 * 在实际工作中这样写不是很靠谱,需要完善,做测试可以 58 * @throws Exception 59 */ 60 @Test 61 public void test1() throws Exception { 62 //通过TransportClient可以和es集群交互 63 //TransportClient transportClient = new TransportClient(); 64 65 //指定es集群中的节点信息, 这个地方指定的端口是节点和节点之间的通信端口是9300,不是Http请求的端口9200. 66 TransportAddress transportAddress = new InetSocketTransportAddress("192.168.1.99",9300); 67 TransportAddress transportAddress1 = new InetSocketTransportAddress("192.168.1.98",9300); 68 transportClient.addTransportAddresses(transportAddress,transportAddress1);//加入多个地址 69 70 //获取当前transportClient连接到了集群多少个节点 71 ImmutableList<DiscoveryNode> connectedNodes = transportClient.connectedNodes(); 72 for (DiscoveryNode discoveryNode : connectedNodes) { 73 System.out.println(discoveryNode.getHostAddress());//打印192.168.1.99;192.168.1.98 74 //如果加入transportClient.addTransportAddresses(transportAddress) 只有一个ip,打印的就只有一个. 75 } 76 } 77 78 79 80 /** 81 * 通过java代码操作es-2 82 * 实际工作中使用的时候建议加上下面这些配置信息 83 * @throws Exception 84 */ 85 @Test 86 public void test2() throws Exception { 87 //指定es的配置信息 Immutable不可改变的; 88 Settings settings = ImmutableSettings.settingsBuilder() 89 .put("cluster.name", "elasticsearch")//如果集群名称在配置文件中被修改了,那么在这需要显式定义一下 90 //es集群名称默认是 elasticsearch sniff嗅; 发现; 91 .put("client.transport.sniff", true)//开启集群的嗅探功能,这样可以保证es会自动把集群中的其他节点信息添加到transportClient里面 92 //开启嗅探功能后 只要指定集群中的任意一个可用节点就可以了.当把代码运行之后TransportClient里面会把集群中所有节点的信息都拿到,能识别集群中的所有节点. 93 .build(); 94 95 //通过TransportClient可以和es集群交互 96 TransportClient transportClient = new TransportClient(settings); 97 //指定es集群中的节点信息 98 TransportAddress transportAddress = new InetSocketTransportAddress("192.168.1.99",9300); 99 transportClient.addTransportAddresses(transportAddress); 100 101 //获取当前transportClient连接到了集群多少个节点 102 ImmutableList<DiscoveryNode> connectedNodes = transportClient.connectedNodes(); 103 for (DiscoveryNode discoveryNode : connectedNodes) { 104 System.out.println(discoveryNode.getHostAddress()); //虽然前面只指定了1.99 但是打印192.168.1.99 192.168.1.98 105 } 106 } 107 108 String index = "crxy"; 109 String type = "emp"; 110 /** 111 * index-1 112 * @throws Exception 113 */ 114 @Test 115 public void test3() throws Exception { 116 String jsonStr = "{\"name\":\"zs\",\"age\":20}";//向索引库中传入一个String字符串,还可以接受其他类型 117 IndexResponse indexResponse = transportClient.prepareIndex(index, type, "7")//添加一个id=7的数据 118 .setSource(jsonStr) 119 //.execute().actionGet(); 这个和下面的get()方法是一样的,get()就是对.execute().actionGet() 进行了封装 120 .get();//执行 121 System.out.println(indexResponse.getVersion());//得到这个数据的version,如果version=1代表是新添加的数据 122 } 123 124 /** 125 * index-2 126 * 实际工作中使用 127 * @throws Exception 128 */ 129 @Test 130 public void test4() throws Exception {//把hashmap类型的数据放入index库 131 HashMap<String, Object> hashMap = new HashMap<String, Object>(); 132 hashMap.put("name", "heeh"); 133 hashMap.put("age", 20); 134 IndexResponse indexResponse = transportClient.prepareIndex(index, type, "8").setSource(hashMap).get(); 135 System.out.println(indexResponse.getVersion()); 136 } 137 138 /** 139 * index -3 140 * 实际工作中使用 141 * 使用对象的时候需要把对象中的属性转化成json字符串 142 * 143 * <dependency> 144 <groupId>com.fasterxml.jackson.core</groupId> 145 <artifactId>jackson-databind</artifactId> 146 <version>2.1.3</version> 147 </dependency> 148 * @throws Exception 149 */ 150 @Test 151 public void test5() throws Exception {//传入一个对象到index索引库 152 Person person = new Person(); 153 person.setName("lisi"); 154 person.setAge(30); 155 156 //如果直接传入一个person对象会报错,java.lang.IllegalArgumentException,必须把对象转换成一个Json字符串,使用jackson依赖 157 //IndexResponse indexResponse = transportClient.prepareIndex(index, type, "9").setSource(person).get(); 158 159 ObjectMapper objectMapper = new ObjectMapper(); 160 IndexResponse indexResponse = transportClient.prepareIndex(index, type, "9").setSource(objectMapper.writeValueAsString(person)).get(); 161 System.out.println(indexResponse.getVersion()); 162 } 163 164 /** 165 * index -4 166 * 测试数据这样使用 167 * @throws Exception 168 */ 169 @Test 170 public void test6() throws Exception { 171 XContentBuilder builder = XContentFactory.jsonBuilder()//XContentFactory 这个是ES官方提供的可以构建Json字符串的工具类. 172 .startObject()//{ 173 .field("name", "zs") 174 .field("age", 18) 175 .endObject();//} 176 IndexResponse indexResponse = transportClient.prepareIndex(index, type, "11").setSource(builder).get(); 177 System.out.println(indexResponse.getVersion()); 178 } 179 180 181 182 /** 183 * 查询 通过id 184 * @throws Exception 185 */ 186 @Test 187 public void test7() throws Exception { 188 GetResponse getResponse = transportClient.prepareGet(index, type, "9").get();//查询id为9的数据 189 System.out.println(getResponse.getSourceAsString()); 190 } 191 192 /** 193 * 局部更新 194 * @throws Exception 195 */ 196 @Test 197 public void test8() throws Exception { 198 XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("age", 19).endObject(); 199 UpdateResponse updateResponse = transportClient.prepareUpdate(index, type, "9").setDoc(builder).get(); 200 System.out.println(updateResponse.getVersion());//version打印2 数据更新 201 } 202 203 /** 204 * 删除-通过id删除 205 * @throws Exception 206 */ 207 @Test 208 public void test9() throws Exception { 209 DeleteResponse deleteResponse = transportClient.prepareDelete(index, type, "5").get();//删除比较简单 210 } 211 212 /** 213 * 求总数 214 * 类似于mysql中的select count(*) 215 * @throws Exception 216 */ 217 @Test 218 public void test10() throws Exception {//查找索引库中的数据个数 219 long count = transportClient.prepareCount(index).get().getCount(); 220 System.out.println(count); 221 } 222 223 224 /** 225 * 批量操作 bulk 226 * @throws Exception 227 */ 228 @Test 229 public void test11() throws Exception { 230 BulkRequestBuilder bulkBuilder = transportClient.prepareBulk(); 231 232 IndexRequest indexRequest = new IndexRequest(index, type, "12"); 233 XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("name", "haha").field("age", 18).endObject(); 234 indexRequest.source(builder); 235 236 DeleteRequest deleteRequest = new DeleteRequest(index, type, "13"); 237 238 bulkBuilder.add(indexRequest);//bulkBuilder中可以添加多个操作,这里一个是建立索引的操作,一个是删除的操作. 239 bulkBuilder.add(deleteRequest); 240 241 BulkResponse bulkResponse = bulkBuilder.get(); 242 if(bulkResponse.hasFailures()){//批量操作中可能有的操作会出现问题,这个地方对操作失败的处理 243 //获取所有错误信息,并打印 244 BulkItemResponse[] items = bulkResponse.getItems(); 245 for (BulkItemResponse bulkItemResponse : items) { 246 System.out.println(bulkItemResponse.getFailureMessage()); 247 } 248 }else{ 249 System.out.println("全部OK"); 250 } 251 252 } 253 254 255 /** 256 * 查询 257 * lt:小于 258 * lte:小于等于 259 * gt:大于 260 * gte:大于等于 261 * 262 * @throws Exception 263 */ 264 @Test 265 public void test12() throws Exception { 266 SearchResponse searchResponse = transportClient.prepareSearch(index)//指定索引库 267 .setTypes(type)//指定类型 268 .setQuery(QueryBuilders.matchQuery("name", "zs"))//指定查询条件,不支持通配符 269 //.setQuery(QueryBuilders.multiMatchQuery("zs", "name","title"))//根据多个属性进行查询 270 //.setQuery(QueryBuilders.matchAllQuery())//查询所有 271 //.setQuery(QueryBuilders.queryString("name:z* AND age:20"))//支持通配符* ?,可以实现复杂查询,可以使用AND OR 之类的运算符(运算符要大写) 272 //.setQuery(QueryBuilders.termQuery("name", "zs"))//在查询的时候不分词,主要针对 人名 地名等特殊的词语 273 //工作中没有说明特殊需求,就是用默认的查询类型,如果对搜索准确度要求非常高,建议使用DFS_QUERY_THEN_FETCH,如果只追求查询效果,对其他的指标不关心,可以使用QUERY_AND_FETCH 274 .setSearchType(SearchType.QUERY_THEN_FETCH)//指定查询类型,可以指定四种 275 .setExplain(true)//返回的数据按照搜索词的相关度排序 276 //分页参数 277 .setFrom(0) 278 .setSize(10) 279 //根据某一个字段排序 280 .addSort("age",SortOrder.DESC) 281 //过滤 282 //.setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(18).includeLower(true).includeUpper(false))//默认是闭区间 283 //.setPostFilter(FilterBuilders.rangeFilter("age").gt(0).lt(18)) 284 //实现高亮 285 .addHighlightedField("name")//设置高亮字段 286 .setHighlighterPreTags("<font color='red'>")//设置高亮前缀和后缀 287 .setHighlighterPostTags("</font>") 288 .get(); 289 SearchHits hits = searchResponse.getHits(); 290 long totalHits = hits.getTotalHits(); 291 System.out.println("总数:"+totalHits); 292 SearchHit[] hits2 = hits.getHits(); 293 for (SearchHit searchHit : hits2) { 294 //获取高亮内容 295 Map<String, HighlightField> highlightFields = searchHit.getHighlightFields(); 296 HighlightField highlightField = highlightFields.get("name"); 297 if(highlightField!=null){ 298 String name_h = ""; 299 Text[] fragments = highlightField.fragments(); 300 for (Text text : fragments) { 301 name_h+=text; 302 } 303 System.out.println("高亮内容:"+name_h); 304 } 305 System.out.println(searchHit.getSourceAsString()); 306 } 307 } 308 309 //ES是一个分布式是搜索引擎,天生就是为分布式而生的,但是分布式有优点,也是有缺点. 310 //一个索引库crxy的数据分布到了5个分片,去哪个分片去查,由于是分布式,可能每个分片中都有数据,所以一定要到所有分片中去查找. 311 312 313 314 /** 315 * 根据查询条件删除数据 316 * @throws Exception 317 */ 318 @Test 319 public void test13() throws Exception { 320 DeleteByQueryResponse deleteByQueryResponse = transportClient.prepareDeleteByQuery(index) 321 .setQuery(QueryBuilders.matchAllQuery()) 322 .get(); 323 } 324 325 326 /** 327 * 统计分析-count 328 * 根据年龄进行分组,统计相同年龄的数据有多少条 329 * 330 * 默认情况下,如果分组个数大于10条的话,默认只会返回前10条分组数据 331 * 如果想获取所有分组数据,或者想要获取指定数量的分组数据,如何实现呢? 332 * .size(0) 333 * 334 * @throws Exception 335 */ 336 @Test 337 public void test14() throws Exception { 338 SearchResponse searchResponse = transportClient.prepareSearch(index) 339 .setTypes(type) 340 .addAggregation(AggregationBuilders.terms("age_term").field("age").size(0))//给分组起个名称,并且指定分组字段 341 .get(); 342 343 Terms terms = searchResponse.getAggregations().get("age_term");//指定分组的名字. 344 //获取分组数据 345 List<Bucket> buckets = terms.getBuckets(); 346 for (Bucket bucket : buckets) { 347 System.out.println(bucket.getKey()+"----"+bucket.getDocCount()); 348 } 349 } 350 /*打印 351 21----3 352 18----2 353 19----1 354 20----1 355 */ 356 357 358 /** 359 * 统计分析-sum select name,sum(score) from table group by name; 360 * 需要使用.subAggregation 361 * @throws Exception 362 */ 363 /** 364 使用数据: 365 #aggregations-2 366 curl -XPUT 'localhost:9200/crxy/emp/1' -d'{"name":"zs","score":60}' 367 curl -XPUT 'localhost:9200/crxy/emp/2' -d'{"name":"zs","score":90}' 368 curl -XPUT 'localhost:9200/crxy/emp/3' -d'{"name":"ls","score":80}' 369 curl -XPUT 'localhost:9200/crxy/emp/4' -d'{"name":"ls","score":70}' 370 */ 371 @Test 372 public void test15() throws Exception { 373 SearchResponse searchResponse = transportClient.prepareSearch(index) 374 .setTypes(type) 375 .addAggregation(AggregationBuilders.terms("name_term").field("name")//指定分组字段 376 .subAggregation(AggregationBuilders.sum("score_sum").field("score")))//指定求sum的字段 377 .get(); 378 379 Terms terms = searchResponse.getAggregations().get("name_term"); 380 List<Bucket> buckets = terms.getBuckets(); 381 for (Bucket bucket : buckets) { 382 Sum sum = bucket.getAggregations().get("score_sum"); 383 System.out.println(bucket.getKey()+"----"+sum.getValue()); 384 } 385 } 386 /* 387 * 打印输出 388 ls----150.0 389 zs----150.0 390 */ 391 392 393 /** 394 * 指定分片查询(_shards),指定某个节点(_only_node)和某些节点(自定义的_only_nodes) 395 * @throws Exception 396 */ 397 @Test 398 public void test16() throws Exception { 399 SearchResponse searchResponse = transportClient.prepareSearch(index) 400 .setTypes(type) 401 .setQuery(QueryBuilders.matchAllQuery()) 402 //.setPreference("_shards:0,2") //.setPreference("_local") .setPreference("") 403 .setPreference("_only_node:8PoWbRVvQQ6NU283Bfd_7A,BJwexRvDTJ-VRx7Njs8uxA")//8PoWbRVvQQ6NU283Bfd_7A是一个节点的id 404 .get(); 405 406 SearchHits hits = searchResponse.getHits(); 407 long totalHits = hits.getTotalHits(); 408 System.out.println("总数:"+totalHits); 409 SearchHit[] hits2 = hits.getHits(); 410 for (SearchHit searchHit : hits2) { 411 System.out.println(searchHit.getSourceAsString()); 412 } 413 } 414 415 }
1 @Test 2 public void test16() throws Exception { 3 SearchResponse searchResponse = transportClient.prepareSearch("crxy*") 4 .setTypes(type) 5 .setQuery(QueryBuilders.matchAllQuery()) 6 //查询指定分片的数据可以使用下面这两种方案,第一种直接指定分片id,第二种根据routing参数的值计算分片id 7 //.setPreference("_shards:0") 8 //.setRouting("hehe") 9 .get(); 10 11 SearchHits hits = searchResponse.getHits(); 12 long totalHits = hits.getTotalHits(); 13 System.out.println("总数:"+totalHits); 14 SearchHit[] hits2 = hits.getHits(); 15 for (SearchHit searchHit : hits2) { 16 System.out.println(searchHit.getSourceAsString()); 17 } 18 } 19 20 21 @Test 22 public void test17() throws Exception { 23 HashMap<String, Object> hashMap = new HashMap<String, Object>(); 24 hashMap.put("name", "hello world"); 25 hashMap.put("age", 20); 26 IndexResponse indexResponse = transportClient.prepareIndex(index, type, "7") 27 .setSource(hashMap) 28 .setRouting("heha")//指定一个路由参数,参数相同的数据会保存到同一个分片 29 .get(); 30 System.out.println(indexResponse.getVersion()); 31 } 32
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6072478.html,如需转载请自行联系原作者