【设计模式】【第六章】【查询ElasticSearch 大量数据场景】【迭代器模式】

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: • 创建design-demo项目• 创建EsController• 创建EsService• 创建EsServiceimpl• 创建EsQueryProcessor• 创建EsSqlQuery• 创建EsSqlResult

文章目录



创建design-demo项目


项目代码:https://gitee.com/java_wxid/java_wxid/tree/master/demo/design-demo


项目结构如下(示例):


805554aefe9c4d1aba6c254f6932bd55.png


创建EsController


代码如下(示例):


package com.example.designdemo.controller;
import com.example.designdemo.service.EsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class EsController {
    @Autowired
    private EsService esService;
    @PostMapping("es")
    public Boolean query(@RequestParam String query, Long fetchSize) {
        return esService.query(query, fetchSize);
    }
}


创建EsService


代码如下(示例):


package com.example.designdemo.service;
/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
public interface EsService {
    Boolean query(String query, Long fetchSize);
}


创建EsServiceimpl


代码如下(示例):


package com.example.designdemo.service.impl;
import com.example.designdemo.esquery.EsQueryProcessor;
import com.example.designdemo.service.EsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.stream.Stream;
@Service
public class EsServiceimpl implements EsService {
    @Autowired
    private EsQueryProcessor esQueryProcessor;
    public Boolean query(String query, Long fetchSize) {
        Stream<Map<String, Object>> mapStream = esQueryProcessor
                .scrollEsStream(query, fetchSize);
        mapStream.forEach(x -> System.out.println(x));
        return true;
    }
}


创建EsQueryProcessor


代码如下(示例):


package com.example.designdemo.esquery;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
 * @Author: zhiwei Liao
 * @Date: 2022/9/29 21:29
 * @Description:
 */
@Component
public class EsQueryProcessor {
    //1. 我们要用stream 返回 为了节省内存
    public Stream<Map<String, Object>> scrollEsStream(String query, Long fetchSize) {
        return StreamSupport.stream(Spliterators
                .spliteratorUnknownSize(new ScrollIterator(query, fetchSize), 0), false);
    }
    //2. 我们要 迭代器
    private class ScrollIterator implements Iterator<Map<String, Object>> {
        private String scrollId;
        private List<String> columns;
        Iterator<Map<String, Object>> iterator;
        RestTemplate restTemplate = new RestTemplate(); // 真是项目中使用resttemplate的时候
        //一定是进行过我们的 bean 配置注入的。这里边直接用new关键字是为了访问我们的es 接口。
        //构造函数进行第一次查询,并且初始化我们后续需要使用的 columns 和 iterator 和 scroll
        public ScrollIterator(String query, Long fetchSize) {
            EsSqlResult esSqlResult = restTemplate.postForObject("http://localhost:9200/_sql?format=json",
                    new EsSqlQuery(query, fetchSize), EsSqlResult.class);//第一次访问的结果出来了
            this.scrollId = esSqlResult.getCursor();
            this.columns = esSqlResult.getColumns()
                    .stream().map(x->x.get("name"))
                    .collect(Collectors.toList());
            this.iterator = convert(columns, esSqlResult).iterator();
        }
        // hasNext 根据 是否 scrollId 为null进行后续的 第二次,第三次,,,的访问,直到 scrollId 为null
        @Override
        public boolean hasNext() {
            return iterator.hasNext() || scrollNext();
        }
        private boolean scrollNext() {
            if(iterator == null || this.scrollId == null) {
                return false;
            }
            EsSqlResult esSqlResult = restTemplate.postForObject("http://localhost:9200/_sql?format=json",
                    new EsSqlQuery(this.scrollId), EsSqlResult.class);//第二次访问的结果出来了
            this.scrollId = esSqlResult.getCursor();
            this.iterator = convert(columns, esSqlResult).iterator();
            return iterator.hasNext();
        }
        @Override
        public Map<String, Object> next() {
            return iterator.next();
        }
    }
    //3. 返回结果传统一点 List<map>
    private List<Map<String, Object>> convert(List<String> columns, EsSqlResult esSqlResult) {
        List<Map<String, Object>> results = new ArrayList<>();
        for(List<Object> row : esSqlResult.getRows()) {
            Map<String, Object> map = new HashMap<>();
            for(int i = 0; i < columns.size(); i++) {
                map.put(columns.get(i), row.get(i));
            }
            results.add(map);
        }
        return results;
    }
}


创建EsSqlQuery


代码如下(示例):


package com.example.designdemo.esquery;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
 * @Author: zhiwei Liao
 * @Date: 2022/9/29 21:29
 * @Description:
 */
@JsonIgnoreProperties
public class EsSqlQuery {
    private String query;
    private Long fetchSize;
    private String cursor;
    public EsSqlQuery(String cursor) {
        this.cursor = cursor;
    }
    public EsSqlQuery(String query, Long fetchSize) {
        this.query = query;
        this.fetchSize = fetchSize;
    }
    public String getQuery() {
        return query;
    }
    public void setQuery(String query) {
        this.query = query;
    }
    public Long getFetchSize() {
        return fetchSize;
    }
    public void setFetchSize(Long fetchSize) {
        this.fetchSize = fetchSize;
    }
    public String getCursor() {
        return cursor;
    }
    public void setCursor(String cursor) {
        this.cursor = cursor;
    }
}


创建EsSqlResult


代码如下(示例):


package com.example.designdemo.esquery;
import java.util.List;
import java.util.Map;
/**
 * @Author: zhiwei Liao
 * @Date: 2022/9/29 21:29
 * @Description:
 */
public class EsSqlResult {
    private List<Map<String, String>> columns;
    private List<List<Object>> rows;
    private String cursor;
    public List<Map<String, String>> getColumns() {
        return columns;
    }
    public void setColumns(List<Map<String, String>> columns) {
        this.columns = columns;
    }
    public List<List<Object>> getRows() {
        return rows;
    }
    public void setRows(List<List<Object>> rows) {
        this.rows = rows;
    }
    public String getCursor() {
        return cursor;
    }
    public void setCursor(String cursor) {
        this.cursor = cursor;
    }
}


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
29天前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
21 0
|
1月前
|
消息中间件 存储 关系型数据库
【微服务】mysql + elasticsearch数据双写设计与实现
【微服务】mysql + elasticsearch数据双写设计与实现
68 2
|
1月前
|
监控 安全 Linux
【Elasticsearch专栏 14】深入探索:Elasticsearch使用Logstash的日期过滤器删除旧数据
使用Logstash的日期过滤器可以有效删除Elasticsearch中的旧数据,释放存储空间并提高集群性能。通过配置Logstash,可以指定索引模式、筛选时间戳早于特定阈值的文档,并在输出阶段删除这些旧数据。执行配置时,需确保Logstash与Elasticsearch连接正常,并监控日志以确保操作安全。定期执行此操作可确保旧数据不会过多积累。总之,Logstash的日期过滤器提供了一种简单而高效的方法,帮助管理和优化Elasticsearch中的数据。
|
1月前
|
存储 搜索推荐 Java
|
2月前
|
存储 固态存储 Java
Elasticsearch中查询性能优化
Elasticsearch中查询性能优化
193 0
|
3月前
Elasticsearch之RestClient查询文档
Elasticsearch之RestClient查询文档
139 1
|
1月前
|
监控 Java 测试技术
【Elasticsearch专栏 13】深入探索:Elasticsearch使用Curator工具删除Elasticsearch中的历史数据
使用Curator工具可以有效管理Elasticsearch中的旧数据,通过编写YAML配置文件定义删除操作。配置中指定了基于索引名称前缀和年龄的过滤器,确保仅删除符合条件的旧索引。执行删除操作时,Curator会应用过滤器识别目标索引,并向Elasticsearch发送删除请求。通过设置选项,如忽略空列表和超时时间,可以确保操作的灵活性和稳定性。使用Curator不仅释放了存储空间,还提高了查询性能,是维护Elasticsearch健康的重要工具
|
1月前
|
JSON 监控 数据管理
【Elasticsearch专栏 12】深入探索:Elasticsearch使用索引生命周期管理(ILM)自动化删除旧数据
Elasticsearch的ILM功能允许用户定义策略,自动管理索引从创建到删除的生命周期。用户可以设置策略,根据索引年龄或大小自动删除旧数据,节省存储空间。通过应用ILM策略于索引模板,新索引将遵循预定义的生命周期。用户还可以监控ILM状态,确保策略按预期执行。使用ILM,用户可以高效地管理数据,确保旧数据及时删除,同时保持数据完整性和安全性。
|
2月前
|
存储 缓存 自然语言处理
【Elasticsearch专栏 05】深入探索:Elasticsearch在处理非结构化数据时,倒排索引有何优势
在处理非结构化数据时,倒排索引的优势在于其高效的查询性能,能够迅速匹配文本中的关键词,实现全文搜索。此外,倒排索引支持复杂的查询操作,可扩展性强,且通过压缩技术优化存储空间。这些特点使倒排索引成为处理非结构化数据的理想选择。
|
1天前
|
设计模式 Go
[设计模式 Go实现] 行为型~迭代器模式
[设计模式 Go实现] 行为型~迭代器模式