文章目录
创建design-demo项目
项目代码:https://gitee.com/java_wxid/java_wxid/tree/master/demo/design-demo
项目结构如下(示例):
创建EsController
代码如下(示例):
package com.example.designdemo.controller; import com.example.designdemo.service.EsService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class EsController { @Autowired private EsService esService; @PostMapping("es") public Boolean query(@RequestParam String query, Long fetchSize) { return esService.query(query, fetchSize); } }
创建EsService
代码如下(示例):
package com.example.designdemo.service; /** * @author zhiwei Liao * @Description * @Date create in 2022/9/12 0012 21:03 */ public interface EsService { Boolean query(String query, Long fetchSize); }
创建EsServiceimpl
代码如下(示例):
package com.example.designdemo.service.impl; import com.example.designdemo.esquery.EsQueryProcessor; import com.example.designdemo.service.EsService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Map; import java.util.stream.Stream; @Service public class EsServiceimpl implements EsService { @Autowired private EsQueryProcessor esQueryProcessor; public Boolean query(String query, Long fetchSize) { Stream<Map<String, Object>> mapStream = esQueryProcessor .scrollEsStream(query, fetchSize); mapStream.forEach(x -> System.out.println(x)); return true; } }
创建EsQueryProcessor
代码如下(示例):
package com.example.designdemo.esquery; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; /** * @Author: zhiwei Liao * @Date: 2022/9/29 21:29 * @Description: */ @Component public class EsQueryProcessor { //1. 我们要用stream 返回 为了节省内存 public Stream<Map<String, Object>> scrollEsStream(String query, Long fetchSize) { return StreamSupport.stream(Spliterators .spliteratorUnknownSize(new ScrollIterator(query, fetchSize), 0), false); } //2. 我们要 迭代器 private class ScrollIterator implements Iterator<Map<String, Object>> { private String scrollId; private List<String> columns; Iterator<Map<String, Object>> iterator; RestTemplate restTemplate = new RestTemplate(); // 真是项目中使用resttemplate的时候 //一定是进行过我们的 bean 配置注入的。这里边直接用new关键字是为了访问我们的es 接口。 //构造函数进行第一次查询,并且初始化我们后续需要使用的 columns 和 iterator 和 scroll public ScrollIterator(String query, Long fetchSize) { EsSqlResult esSqlResult = restTemplate.postForObject("http://localhost:9200/_sql?format=json", new EsSqlQuery(query, fetchSize), EsSqlResult.class);//第一次访问的结果出来了 this.scrollId = esSqlResult.getCursor(); this.columns = esSqlResult.getColumns() .stream().map(x->x.get("name")) .collect(Collectors.toList()); this.iterator = convert(columns, esSqlResult).iterator(); } // hasNext 根据 是否 scrollId 为null进行后续的 第二次,第三次,,,的访问,直到 scrollId 为null @Override public boolean hasNext() { return iterator.hasNext() || scrollNext(); } private boolean scrollNext() { if(iterator == null || this.scrollId == null) { return false; } EsSqlResult esSqlResult = restTemplate.postForObject("http://localhost:9200/_sql?format=json", new EsSqlQuery(this.scrollId), EsSqlResult.class);//第二次访问的结果出来了 this.scrollId = esSqlResult.getCursor(); this.iterator = convert(columns, esSqlResult).iterator(); return iterator.hasNext(); } @Override public Map<String, Object> next() { return iterator.next(); } } //3. 返回结果传统一点 List<map> private List<Map<String, Object>> convert(List<String> columns, EsSqlResult esSqlResult) { List<Map<String, Object>> results = new ArrayList<>(); for(List<Object> row : esSqlResult.getRows()) { Map<String, Object> map = new HashMap<>(); for(int i = 0; i < columns.size(); i++) { map.put(columns.get(i), row.get(i)); } results.add(map); } return results; } }
创建EsSqlQuery
代码如下(示例):
package com.example.designdemo.esquery; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; /** * @Author: zhiwei Liao * @Date: 2022/9/29 21:29 * @Description: */ @JsonIgnoreProperties public class EsSqlQuery { private String query; private Long fetchSize; private String cursor; public EsSqlQuery(String cursor) { this.cursor = cursor; } public EsSqlQuery(String query, Long fetchSize) { this.query = query; this.fetchSize = fetchSize; } public String getQuery() { return query; } public void setQuery(String query) { this.query = query; } public Long getFetchSize() { return fetchSize; } public void setFetchSize(Long fetchSize) { this.fetchSize = fetchSize; } public String getCursor() { return cursor; } public void setCursor(String cursor) { this.cursor = cursor; } }
创建EsSqlResult
代码如下(示例):
package com.example.designdemo.esquery; import java.util.List; import java.util.Map; /** * @Author: zhiwei Liao * @Date: 2022/9/29 21:29 * @Description: */ public class EsSqlResult { private List<Map<String, String>> columns; private List<List<Object>> rows; private String cursor; public List<Map<String, String>> getColumns() { return columns; } public void setColumns(List<Map<String, String>> columns) { this.columns = columns; } public List<List<Object>> getRows() { return rows; } public void setRows(List<List<Object>> rows) { this.rows = rows; } public String getCursor() { return cursor; } public void setCursor(String cursor) { this.cursor = cursor; } }