ElasticSearch版本
elasticsearch-5.4.3.jar
指定 ip地址创建client
private TransportClient client = null; /** *指定 ip地址创建client */ @Before public void init() throws Exception { //设置集群名称 Settings settings = Settings.builder() .put("cluster.name", "my-es") //自动感知的功能(可以通过当前指定的节点获取所有es节点的信息) .put("client.transport.sniff", true) .build(); //创建client client = new PreBuiltTransportClient(settings).addTransportAddresses( // Java对应的API操作的端口都是9300,记住是9300 new InetSocketTransportAddress(InetAddress.getByName("192.168.100.211"), 9300), new InetSocketTransportAddress(InetAddress.getByName("192.168.100.212"), 9300), new InetSocketTransportAddress(InetAddress.getByName("192.168.100.213"), 9300)); }
添加数据
/** *添加数据 */ @Test public void testCreate() throws IOException { // index可以理解为数据库;type理解为数据表;id相当于数据库表中记录的主键,是唯一的。 IndexResponse response = client.prepareIndex("gamelog", "users", "1") .setSource( jsonBuilder() .startObject() // field理解为列 .field("username", "老赵") .field("gender", "male") .field("birthday", new Date()) .field("fv", 9999) .field("message", "trying out Elasticsearch") .endObject() ).get(); }
查找一条
/** *查找一条 */ @Test public void testGet() throws IOException { GetResponse response = client.prepareGet("gamelog", "users", "1").get(); System.out.println(response.getSourceAsString()); }
查找多条
/** * 查找多条 */ @Test public void testMultiGet() throws IOException { MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("gamelog", "users", "1") .add("gamelog", "users", "2", "3") .add("news", "fulltext", "1") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); System.out.println(json); } } }
数据更新
/** * 数据更新 */ @Test public void testUpdate() throws Exception { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("gamelog"); updateRequest.type("users"); updateRequest.id("2"); updateRequest.doc( jsonBuilder() .startObject() .field("fv", 999.9) .endObject()); client.update(updateRequest).get(); }
数据删除-指定ID
/** * 数据删除-指定ID */ @Test public void testDelete() { DeleteResponse response = client.prepareDelete("gamelog", "users", "2").get(); System.out.println(response); }
数据删除--指定任意某个字段
/** * 数据删除--指定任意某个字段 */ @Test public void testDeleteByQuery() { BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) //指定查询条件 .filter(QueryBuilders.matchQuery("username", "老段")) //指定索引名称 .source("gamelog") .get(); long deleted = response.getDeleted(); System.out.println(deleted); }
异步删除
/** * 异步删除 */ @Test public void testDeleteByQueryAsync() { DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("gamelog") .execute(new ActionListener<BulkByScrollResponse>() { @Override public void onResponse(BulkByScrollResponse response) { long deleted = response.getDeleted(); System.out.println("数据删除了"); System.out.println(deleted); } @Override public void onFailure(Exception e) { e.printStackTrace(); } }); try { System.out.println("异步删除"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } }
范围查询
/** * 范围查询 */ @Test public void testRange() { QueryBuilder qb = rangeQuery("fv") // [88.99, 10000) .from(88.99) .to(10000) .includeLower(true) .includeUpper(false); SearchResponse response = client.prepareSearch("gamelog").setQuery(qb).get(); System.out.println(response); }
聚合查询
先添加一些数据
/** * curl -XPUT 'http://192.168.5.251:9200/player_info/player/1' -d '{ "name": "curry", "age": 29, "salary": 3500,"team": "war", "position": "pg"}' * curl -XPUT 'http://192.168.5.251:9200/player_info/player/2' -d '{ "name": "thompson", "age": 26, "salary": 2000,"team": "war", "position": "pg"}' * curl -XPUT 'http://192.168.5.251:9200/player_info/player/3' -d '{ "name": "irving", "age": 25, "salary": 2000,"team": "cav", "position": "pg"}' * curl -XPUT 'http://192.168.5.251:9200/player_info/player/4' -d '{ "name": "green", "age": 26, "salary": 2000,"team": "war", "position": "pf"}' * curl -XPUT 'http://192.168.5.251:9200/player_info/player/5' -d '{ "name": "james", "age": 33, "salary": 4000,"team": "cav", "position": "sf"}' */
@Test public void testAddPlayer() throws IOException { IndexResponse response = client.prepareIndex("player_info", "player", "1") .setSource( jsonBuilder() .startObject() .field("name", "James") .field("age", 33) .field("salary", 3000) .field("team", "cav") .field("position", "sf") .endObject() ).get(); }
group by/count
例如要计算每个球队的球员数,如果使用SQL语句,应表达如下:
select team, count(*) as player_count from player group by team;
ES的java api:
@Test public void testAgg1() { //指定索引和type SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //按team分组然后聚合,但是并没有指定聚合函数 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("player_count").field("team"); //添加聚合器 builder.addAggregation(teamAgg); //触发 SearchResponse response = builder.execute().actionGet(); //System.out.println(response); //将返回的结果放入到一个map中 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); // Set<String> keys = aggMap.keySet(); // // for (String key: keys) { // System.out.println(key); // } // //取出聚合属性 StringTerms terms = (StringTerms) aggMap.get("player_count"); // //依次迭代出分组聚合数据 // for (Terms.Bucket bucket : terms.getBuckets()) { // //分组的名字 // String team = (String) bucket.getKey(); // //count,分组后一个组有多少数据 // long count = bucket.getDocCount(); // System.out.println(team + " " + count); // } Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator(); while (teamBucketIt .hasNext()) { Terms.Bucket bucket = teamBucketIt.next(); String team = (String) bucket.getKey(); long count = bucket.getDocCount(); System.out.println(team + " " + count); } }
group by多个field
例如要计算每个球队每个位置的球员数,如果使用SQL语句,应表达如下:
select team, position, count(*) as pos_count from player group by team, position;
ES的java api:
/** * group by多个field * 例如要计算每个球队每个位置的球员数,如果使用SQL语句 * select team, position, count(*) as pos_count from player group by team, position; */ @Test public void testAgg2() { SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //指定别名和分组的字段 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); TermsAggregationBuilder posAgg= AggregationBuilders.terms("pos_count").field("position"); //添加两个聚合构建器 builder.addAggregation(teamAgg.subAggregation(posAgg)); //执行查询 SearchResponse response = builder.execute().actionGet(); //将查询结果放入map中 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); //根据属性名到map中查找 StringTerms teams = (StringTerms) aggMap.get("team_name"); //循环查找结果 for (Terms.Bucket teamBucket : teams.getBuckets()) { //先按球队进行分组 String team = (String) teamBucket.getKey(); Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); StringTerms positions = (StringTerms) subAggMap.get("pos_count"); //因为一个球队有很多位置,那么还要依次拿出位置信息 for (Terms.Bucket posBucket : positions.getBuckets()) { //拿到位置的名字 String pos = (String) posBucket.getKey(); //拿出该位置的数量 long docCount = posBucket.getDocCount(); //打印球队,位置,人数 System.out.println(team + " " + pos + " " + docCount); } } }
max/min/sum/avg
例如要计算每个球队年龄最大/最小/总/平均的球员年龄,如果使用SQL语句,应表达如下:
select team, max(age) as max_age from player group by team;
ES的java api:
/** * select team, max(age) as max_age from player group by team; */ @Test public void testAgg3() { SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //指定安球队进行分组 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); //指定分组求最大值 MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age"); //分组后求最大值 builder.addAggregation(teamAgg.subAggregation(maxAgg)); //查询 SearchResponse response = builder.execute().actionGet(); Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); //根据team属性,获取map中的内容 StringTerms teams = (StringTerms) aggMap.get("team_name"); for (Terms.Bucket teamBucket : teams.getBuckets()) { //分组的属性名 String team = (String) teamBucket.getKey(); //在将聚合后取最大值的内容取出来放到map中 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); //取分组后的最大值 InternalMax ages = (InternalMax)subAggMap.get("max_age"); double max = ages.getValue(); System.out.println(team + " " + max); } }
对多个field求max/min/sum/avg
例如要计算每个球队球员的平均年龄,同时又要计算总年薪,如果使用SQL语句,应表达如下:
select team, avg(age)as avg_age, sum(salary) as total_salary from player group by team;
ES的java api:
/** * select team, avg(age) as avg_age, sum(salary) as total_salary from player group by team; */ @Test public void testAgg4() { SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //指定分组字段 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team"); //指定聚合函数是求平均数据 AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age"); //指定另外一个聚合函数是求和 SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); //分组的聚合器关联了两个聚合函数 builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg)); SearchResponse response = builder.execute().actionGet(); Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); //按分组的名字取出数据 StringTerms teams = (StringTerms) aggMap.get("team_name"); for (Terms.Bucket teamBucket : teams.getBuckets()) { //获取球队名字 String team = (String) teamBucket.getKey(); Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); //根据别名取出平均年龄 InternalAvg avgAge = (InternalAvg)subAggMap.get("avg_age"); //根据别名取出薪水总和 InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary"); double avgAgeValue = avgAge.getValue(); double totalSalaryValue = totalSalary.getValue(); System.out.println(team + " " + avgAgeValue + " " + totalSalaryValue); } }
聚合后对Aggregation结果排序
例如要计算每个球队总年薪,并按照总年薪倒序排列,如果使用SQL语句,应表达如下:
select team, sum(salary) as total_salary from player group by team order by total_salary desc;
ES的java api:
/** * select team, sum(salary) as total_salary from player group by team order by total_salary desc; */ @Test public void testAgg5() { SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player"); //按team进行分组,然后指定排序规则 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team").order(Terms.Order.aggregation("total_salary ", true)); SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); builder.addAggregation(termsAgg.subAggregation(sumAgg)); SearchResponse response = builder.execute().actionGet(); Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); StringTerms teams = (StringTerms) aggMap.get("team_name"); for (Terms.Bucket teamBucket : teams.getBuckets()) { String team = (String) teamBucket.getKey(); Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary"); double totalSalaryValue = totalSalary.getValue(); System.out.println(team + " " + totalSalaryValue); } }
需要特别注意的是,排序是在TermAggregation处执行的,Order.aggregation函数的第一个参数是aggregation的名字,第二个参数是boolean型,true表示正序,false表示倒序。
- Aggregation结果条数的问题
默认情况下,search执行后,仅返回10条聚合结果,如果想反悔更多的结果,需要在构建TermsBuilder 时指定size:
TermsBuilder teamAgg= AggregationBuilders.terms("team").size(15);
- Aggregation结果的解析/输出
得到response后:
Map<String, Aggregation> aggMap = response.getAggregations().asMap(); StringTerms teamAgg= (StringTerms) aggMap.get("keywordAgg"); Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator(); while (teamBucketIt .hasNext()) { Bucket buck = teamBucketIt .next(); //球队名 String team = buck.getKey(); //记录数 long count = buck.getDocCount(); //得到所有子聚合 Map subaggmap = buck.getAggregations().asMap(); //avg值获取方法 double avg_age= ((InternalAvg) subaggmap.get("avg_age")).getValue(); //sum值获取方法 double total_salary = ((InternalSum) subaggmap.get("total_salary")).getValue(); //... //max/min以此类推 }
总结
综上,聚合操作主要是调用了SearchRequestBuilder的addAggregation方法,通常是传入一个TermsBuilder,子聚合调用TermsBuilder的subAggregation方法,可以添加的子聚合有TermsBuilder、SumBuilder、AvgBuilder、MaxBuilder、MinBuilder等常见的聚合操作。
从实现上来讲,SearchRequestBuilder在内部保持了一个私有的 SearchSourceBuilder实例, SearchSourceBuilder内部包含一个List<AbstractAggregationBuilder>,每次调用addAggregation时会调用 SearchSourceBuilder实例,添加一个AggregationBuilder。
同样的,TermsBuilder也在内部保持了一个List<AbstractAggregationBuilder>,调用addAggregation方法(来自父类addAggregation)时会添加一个AggregationBuilder。有兴趣的读者也可以阅读源码的实现。
参考来源:https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/index.html
参考来源:https://elasticsearch.cn/article/102