背景信息
es查询,在不指定 size 大小的情况下,默认查询 10条数据,比如执行如下查询命令
GET crm_meiqia_conversation/_search
如果你需要查询更多数据的话,你就可以通过指定 size 大小来查询更多数据,比如执行如下命令
GET crm_meiqia_conversation/_search { "size":20 }
这个时候就有个疑问,如果有一些特殊的场景,想要一次性查询指定条件下的所有数据改如何操作呢,下面就来基于 Java 实现查询指定条件下的所有数据操作。
Java 实现查询 Elasticsearch 全部数据
实现后效果
首先来看一下基于 Java 实现查询指定条件下的 es 所有数据的展示效果
实现前效果
而默认情况下不设置 size 大小的 es 查询,默认查询 10条数据,就像这样的效果
Java 代码实现
下面开始讲如何通过 Java 实现查询 es 全部数据,首先来看一下默认查询 es 10条数据的代码
public AjaxResult getMeiqiaUidList(MeiqiaConversation meiqiaConversation) { BoolQueryBuilder query = QueryBuilders.boolQuery(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); //会话id Long convId = meiqiaConversation.getConvId(); if (convId != null) { boolQuery.filter(QueryBuilders.termQuery("convId",convId)); } //会话日期 String convStartDate = (String) meiqiaConversation.getParams().get("convStartDate"); String convEndDate = (String) meiqiaConversation.getParams().get("convEndDate"); if (StringUtils.isNotEmpty(convStartDate)) { Date date = DateUtils.stringToDate(convStartDate, DateUtils.SDF_YMDHMS); boolQuery.filter(QueryBuilders.rangeQuery("convStartDate").gte(date.getTime())); } if (StringUtil.isNotEmptyString(convEndDate)) { Date date = DateUtils.stringToDate(convEndDate, DateUtils.SDF_YMDHMS); boolQuery.filter(QueryBuilders.rangeQuery("convEndDate").lte(date.getTime())); } //会话日期 Date convStartDate2 = meiqiaConversation.getConvStartDate(); Date convEndDate2 = meiqiaConversation.getConvEndDate(); if (Objects.nonNull(convStartDate2)) { boolQuery.filter(QueryBuilders.rangeQuery("convStartDate").gte(convStartDate2.getTime())); } if (Objects.nonNull(convEndDate2)) { boolQuery.filter(QueryBuilders.rangeQuery("convEndDate").lte(convEndDate2.getTime())); } //学号 String uid = (String) meiqiaConversation.getParams().get("uid"); if (StringUtils.isNotEmpty(uid)) { if (uid.contains("#")) { String replace = uid.replace("#", ""); boolQuery.filter(QueryBuilders.termQuery("clientInfo.name",replace)); }else { boolQuery.filter(QueryBuilders.termQuery("clientInfo.uid",uid)); } } //客服工号 String agentId = (String) meiqiaConversation.getParams().get("agentId"); if (StringUtils.isNotEmpty(agentId)) { boolQuery.filter(QueryBuilders.termQuery("agentId",agentId)); } // 会话内容 String content = (String) meiqiaConversation.getParams().get("content"); if (StringUtils.isNotEmpty(content)) { boolQuery.filter(QueryBuilders.matchPhrasePrefixQuery("convContent.content",content)); } query.must(boolQuery); // 初始化搜索请求构建器,用于构造搜索请求 SearchRequestBuilder searchRequest = client.prepareSearch(indexProperties.getMeiqiaConversationIndex()) // 设置搜索的类型 .setTypes(indexProperties.getMeiqiaConversationType()) // 设置查询条件 .setQuery(query); // 使用SearchRequest获取搜索响应 SearchResponse searchResponse = searchRequest.get(); // 初始化存储所有搜索结果的列表 List<EsMeiqiaConversation> rows = new ArrayList<>(); // 格式化搜索响应中的数据,并添加到rows列表中 List<EsMeiqiaConversation> list1 = formatMeiqiaDto(searchResponse); rows.addAll(list1); //记录返回的uid name List<MeiqiaConversation> list = new ArrayList<>(); if (CollectionUtils.isNotEmpty(rows)) { //获取 uid name Map<String, List<EsMeiqiaConversation>> collect = rows.stream().collect(Collectors.groupingBy(EsMeiqiaConversation::getClientUid, Collectors.toList())); Set<String> uids = collect.keySet(); for (String u : uids) { MeiqiaConversation conv = new MeiqiaConversation(); conv.setUid(u); //同一个uid 对应同一个 name List<EsMeiqiaConversation> esconv = collect.get(u); String name = esconv.get(0).getClientName(); conv.setName(name); list.add(conv); } } return AjaxResult.success(list); }
那么如何实现 一次查询满足条件的全部 es 数据呢,可以通过 scroll 实现,改造后的代码如下
public AjaxResult getMeiqiaUidList(MeiqiaConversation meiqiaConversation) { BoolQueryBuilder query = QueryBuilders.boolQuery(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); //会话id Long convId = meiqiaConversation.getConvId(); if (convId != null) { boolQuery.filter(QueryBuilders.termQuery("convId",convId)); } //会话日期 String convStartDate = (String) meiqiaConversation.getParams().get("convStartDate"); String convEndDate = (String) meiqiaConversation.getParams().get("convEndDate"); if (StringUtils.isNotEmpty(convStartDate)) { Date date = DateUtils.stringToDate(convStartDate, DateUtils.SDF_YMDHMS); boolQuery.filter(QueryBuilders.rangeQuery("convStartDate").gte(date.getTime())); } if (StringUtil.isNotEmptyString(convEndDate)) { Date date = DateUtils.stringToDate(convEndDate, DateUtils.SDF_YMDHMS); boolQuery.filter(QueryBuilders.rangeQuery("convEndDate").lte(date.getTime())); } //会话日期 Date convStartDate2 = meiqiaConversation.getConvStartDate(); Date convEndDate2 = meiqiaConversation.getConvEndDate(); if (Objects.nonNull(convStartDate2)) { boolQuery.filter(QueryBuilders.rangeQuery("convStartDate").gte(convStartDate2.getTime())); } if (Objects.nonNull(convEndDate2)) { boolQuery.filter(QueryBuilders.rangeQuery("convEndDate").lte(convEndDate2.getTime())); } //学号 String uid = (String) meiqiaConversation.getParams().get("uid"); if (StringUtils.isNotEmpty(uid)) { if (uid.contains("#")) { String replace = uid.replace("#", ""); boolQuery.filter(QueryBuilders.termQuery("clientInfo.name",replace)); }else { boolQuery.filter(QueryBuilders.termQuery("clientInfo.uid",uid)); } } //客服工号 String agentId = (String) meiqiaConversation.getParams().get("agentId"); if (StringUtils.isNotEmpty(agentId)) { boolQuery.filter(QueryBuilders.termQuery("agentId",agentId)); } // 会话内容 String content = (String) meiqiaConversation.getParams().get("content"); if (StringUtils.isNotEmpty(content)) { boolQuery.filter(QueryBuilders.matchPhrasePrefixQuery("convContent.content",content)); } query.must(boolQuery); // 初始化搜索请求构建器,用于构造搜索请求 SearchRequestBuilder searchRequest = client.prepareSearch(indexProperties.getMeiqiaConversationIndex()) // 设置搜索的类型 .setTypes(indexProperties.getMeiqiaConversationType()) // 设置查询条件 .setQuery(query) // 设置返回结果的数量为100 .setSize(100) // 设置滚动查询的时间间隔为1分钟 .setScroll(TimeValue.timeValueMinutes(1)); // 使用SearchRequest获取搜索响应 SearchResponse searchResponse = searchRequest.get(); // 初始化存储所有搜索结果的列表 List<EsMeiqiaConversation> rows = new ArrayList<>(); // 格式化搜索响应中的数据,并添加到rows列表中 List<EsMeiqiaConversation> list1 = formatMeiqiaDto(searchResponse); rows.addAll(list1); // 使用Scroll方式遍历所有搜索结果 do { // 准备下一次Scroll搜索,设置滚动时间为1分钟 // 将scorllId循环传递 获取全部数据 searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).execute().actionGet(); // 格式化新一批搜索结果,并添加到rows列表中 List<EsMeiqiaConversation> list = formatMeiqiaDto(searchResponse); if (CollectionUtils.isNotEmpty(list)) { rows.addAll(list); } // 当搜索结果为空时,结束循环 // 当searchHits的数组为空的时候结束循环,至此数据全部读取完毕 } while (searchResponse.getHits().getHits().length != 0); // 创建一个ClearScrollRequest实例,用于清除滚动查询的会话。 ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); // 将上一次查询返回的滚动ID添加到请求中,以便清除这个特定的会话。 // 这是必要的,因为ClearScrollRequest需要至少一个滚动ID才能执行清除操作。 clearScrollRequest.addScrollId(searchResponse.getScrollId()); // 发送ClearScroll请求并获取操作的结果。 // 这一步是必需的,因为它实际执行了清除滚动会话的操作,并允许我们处理结果或任何异常。 client.clearScroll(clearScrollRequest).actionGet(); //记录返回的uid name List<MeiqiaConversation> list = new ArrayList<>(); if (CollectionUtils.isNotEmpty(rows)) { //获取 uid name Map<String, List<EsMeiqiaConversation>> collect = rows.stream().collect(Collectors.groupingBy(EsMeiqiaConversation::getClientUid, Collectors.toList())); Set<String> uids = collect.keySet(); for (String u : uids) { MeiqiaConversation conv = new MeiqiaConversation(); conv.setUid(u); //同一个uid 对应同一个 name List<EsMeiqiaConversation> esconv = collect.get(u); String name = esconv.get(0).getClientName(); conv.setName(name); list.add(conv); } } return AjaxResult.success(list); }
核心代码是增加了滚动查询数据的操作
最后是清除滚动会话的操作
到这里关于 Java 实现 es 查询指定条件下的全部数据操作就结束了,整个操作过程比较容易理解,增加了 es 滚动查询 scroll 操作来实现查询 es 全部数据。
写在最后
以上是实现 es 查询指定条件下的全部数据的代码方法,大家需要借鉴的话,只需要补充 滚动查询部分即可,希望对大家有帮助。