SpringBoot集成Elasticsearch7.4 实战(1)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Sprinboot集成Elasticsearch完成对索引的基本操作

elasticsearch

2. SpringBoot集成

开发工具,这里选择的是IDEA 2021.1.2,构建 Gradle 工程等一堆通用操作,不清楚的自行百度 或者 参看 90分钟玩转Gradle

2.1. 依赖配置

我这边选择 spring-boot-starter-data-elasticsearch 方式来集成 spring-boot 中集成的版本号与实际安装版本号的差异,尽量选择一致的版本,否则在集成过程中,会有莫名的问题。读者在选择的时候多加留意。


api("org.springframework.boot:spring-boot-starter-data-elasticsearch")

我在此基础上封装一层 persistence-elasticsearch,更贴近一般项目使用。

  • 中央仓库下载

中央仓库

  • 阿里云的仓库下载

20211227173753

2.2. 核心操作类

为了规范索引管理,这里将所有的操作都封装成一个基类,实现对索引的增删改查。同时还集成了对数据的单个以及批量的插入以及删除。避免针对每个索引都自己写一套实现,杜绝代码的冗余,同时这样的集成对代码的结构本身也是低侵入性。

  • AbstractElasticIndexManger

public abstract class AbstractElasticIndexManger {

    protected ElasticsearchRestTemplate elasticsearchRestTemplate;

    protected RestHighLevelClient restHighLevelClient;

    @Autowired
    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    @Autowired
    public void setElasticsearchRestTemplate(ElasticsearchRestTemplate elasticsearchRestTemplate) {
        this.elasticsearchRestTemplate = elasticsearchRestTemplate;
    }

    /**
     * 设置分片 和 副本
     * 副本作用主要为了保证数据安全
     *
     * @param request 请求
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 19:27
     */
    protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
        request.settings(Settings.builder().put("index.number_of_shards", shards)
                .put("index.number_of_replicas", replicas));
    }

    /**
     * 查询匹配条件的数据量,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param builder    BoolQueryBuilder类型查询实例
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected long count(BoolQueryBuilder builder, String... indexNames) {
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        nativeSearchQueryBuilder.withQuery(builder);
        return elasticsearchRestTemplate.count(nativeSearchQueryBuilder.build(), IndexCoordinates.of(indexNames));
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param builder    BoolQueryBuilder类型查询实例
     * @param clazz      Class对象
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected SearchHits search(BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        nativeSearchQueryBuilder.withQuery(builder);
        Pageable pageable = PageRequest.of(1, 20);
        nativeSearchQueryBuilder.withPageable(pageable);
        return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param page       当前页
     * @param size       每页大小
     * @param builder    BoolQueryBuilder类型查询实例
     * @param clazz      Class对象
     * @param indexNames 索引名,可以一次性查询多个
     * @return SearchHits 命中结果的数据集
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        nativeSearchQueryBuilder.withQuery(builder);
        Pageable pageable = PageRequest.of(page, size);
        nativeSearchQueryBuilder.withPageable(pageable);
        return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
    }

    protected DeleteByQueryRequest builderDeleteRequest(QueryBuilder builder, String... indexNames) {
        DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames);
        request.setQuery(builder);
        request.setBatchSize(0X5F5E0FF);
        request.setConflicts("proceed");
        return request;
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param params     Map形式的 字段名 和 字段内容 组成的条件
     * @param builder    BoolQueryBuilder类型查询实例
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected BulkByScrollResponse update(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
        UpdateByQueryRequest request = buildUpdateByQueryReq(params, builder, indexNames);
        try {
            return restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 构建更新 QueryRequest
     *
     * @param params     参数
     * @param builder    布尔构建
     * @param indexNames 索引
     * @return UpdateByQueryRequest
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-15:50
     **/
    protected UpdateByQueryRequest buildUpdateByQueryReq(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
        Script script = buildScriptType(params);
        UpdateByQueryRequest request = new UpdateByQueryRequest(indexNames);
        request.setQuery(builder);
        request.setScript(script);
        request.setConflicts("proceed");
        request.setRefresh(true);
        request.setTimeout(TimeValue.timeValueMinutes(3));
        return request;
    }

    /**
     * 以 K-V键值对 方式构建条件 Script
     *
     * @param params Map形式的 字段名 和 字段内容 组成的条件
     * @return Script
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-13:19
     **/
    protected Script buildScriptType(Map<String, Object> params) {
        Set<String> keys = params.keySet();
        StringBuffer idOrCodeStb = new StringBuffer();
        for (String key : keys) {
            idOrCodeStb.append("ctx._source.").append(key).append("=params.").append(key).append(";");
        }
        ScriptType type = ScriptType.INLINE;
        return new Script(type, Script.DEFAULT_SCRIPT_LANG, idOrCodeStb.toString(), params);
    }

    /**
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void setBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        mustBuilders(builder, bool);
        mustNotBuilders(builder, bool);
        shouldBuilders(builder, bool);
        filterBuilders(builder, bool);
    }

    /**
     * 构建满足 必须 条件 的方法
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void mustBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> must = bool.getMust();
        if (must.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : must) {
            builder.must(getQueryBuilder(cds));
        }
    }

    /**
     * 构建满足 非必须 条件 的方法
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void mustNotBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> mustNot = bool.getMustNot();
        if (mustNot.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : mustNot) {
            builder.mustNot(getQueryBuilder(cds));
        }
    }

    /**
     * 构建满足 可选 条件 的方法
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void shouldBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> should = bool.getShould();
        if (should.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : should) {
            builder.should(getQueryBuilder(cds));
        }
    }

    /**
     * 构建满足 必须 条件 的方法,推荐使用
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void filterBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> filter = bool.getFilter();
        if (filter.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : filter) {
            builder.filter(getQueryBuilder(cds));
        }
    }

    public QueryBuilder getQueryBuilder(AtomicCondition cds) {
        QueryBuilder queryBuilder;
        Tuple tuple = cds.getTuple();
        switch (cds.getStatus()) {
            case (Constants.SUFFIX_QUERY):
                queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.MULTI_CHARACTER + tuple.v1());
                break;
            case (Constants.SUFFIX_SINGLE_QUERY):
                queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.SINGLE_CHARACTER + tuple.v1());
                break;
            case (Constants.RANGE_QUERY):
                queryBuilder = QueryBuilders.rangeQuery(cds.getField()).from(tuple.v1()).to(tuple.v2());
                break;
            case (Constants.PREFIX_QUERY):
                queryBuilder = QueryBuilders.prefixQuery(cds.getField(), tuple.v1().toString());
                break;
            case (Constants.REG_QUERY):
                queryBuilder = QueryBuilders.regexpQuery(cds.getField(), tuple.v1().toString());
                break;
            default:
                queryBuilder = QueryBuilders.termQuery(cds.getField(), tuple.v1().toString());
                break;
        }
        return queryBuilder;
    }
}

  • ElasticIndexManger

public class ElasticIndexManger extends AbstractElasticIndexManger {


    /**
     * 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping) {
        createIndex(indexName, mapping, 0, 1);
    }


    /**
     * 指定索引结构创建索引
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @param replicas  副本的数量
     * @param shards    分片数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping, int replicas, int shards) {
        try {
            if (!this.existIndex(indexName)) {
                log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
                return;
            }
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            buildSetting(request, replicas, shards);
            request.mapping(mapping, XContentType.JSON);
            CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            if (!res.isAcknowledged()) {
                throw new RuntimeException("初始化失败");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(0);
        }
    }

    /**
     * 获取所有索引,默认为所有索引
     *
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List getAllIndex() {
        return getAllIndex(Constants.MULTI_CHARACTER);
    }

    /**
     * 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
     *
     * @param inPattern 正则表达式
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List<String> getAllIndex(String inPattern) {
        GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
        try {
            GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
            String[] indices = getIndexResponse.getIndices();
            return Arrays.asList(indices);
        } catch (IOException e) {
            log.error("获取索引失败 {} 已经存在", e.getMessage());
        } catch (ElasticsearchStatusException e) {
            log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
        }
        return Collections.EMPTY_LIST;
    }

    /**
     * 制定配置项的判断索引是否存在,注意与 isExistsIndex 区别
     * <ul>
     *     <li>1、可以指定 用本地检索 还是用 主动节点方式检索</li>
     *     <li>2、是否适应被人读的方式</li>
     *     <li>3、返回默认设置</li>
     * </ul>
     *
     * @param indexName index名
     * @return boolean
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:27
     */
    public boolean existIndex(String indexName) throws IOException {
        GetIndexRequest request = new GetIndexRequest(indexName);
        //TRUE-返回本地信息检索状态,FALSE-还是从主节点检索状态
        request.local(false);
        //是否适应被人可读的格式返回
        request.humanReadable(true);
        //是否为每个索引返回所有默认设置
        request.includeDefaults(false);
        //控制如何解决不可用的索引以及如何扩展通配符表达式,忽略不可用索引的索引选项,仅将通配符扩展为开放索引,并且不允许从通配符表达式解析任何索引
        request.indicesOptions(IndicesOptions.lenientExpandOpen());
        return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
    }

    /**
     * 单纯断某个索引是否存在
     *
     * @param indexName index名
     * @return boolean 存在为True,不存在则 False
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:27
     */
    public boolean isIndexExists(String indexName) throws Exception {
        return restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
    }

    /**
     * 批量插入数据,通过 {@link List} 的对象集合进行插入,此处对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
     *
     * @param indexName index
     * @param list      列表
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:26
     */
    public void batch(String indexName, List<? extends BasePo> list) throws IOException {
        int sleep = 15;
        BulkRequest request = new BulkRequest();
        list.forEach(item -> request.add(new IndexRequest(indexName)
                .id(item.getId().toString())
                .source(JSON.toJSONString(item), XContentType.JSON)));
        try {
            BulkResponse bulkResponse = bulk(request);
            log.error("[Verification BulkResponse bulk 操作结果] {}, 文件大小 {} ", bulkResponse.status(), list.size());
            if (bulkResponse.hasFailures()) {
                log.error(bulkResponse.buildFailureMessage());
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    if (bulkItemResponse.isFailed()) {
                        log.error(bulkItemResponse.getFailureMessage());
                    }
                }
                log.error("批量操作失败,重新再提交一次,间隔时间{}, 文件大小 {} ", sleep, list.size());
                TimeUnit.SECONDS.sleep(sleep);
                bulkResponse = bulk(request);
                if (bulkResponse.hasFailures()) {
                    log.error("再次提交失败,需要写入MQ , 文件大小 {} ", list.size());
                }
            }
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * bulk 方式批量提交
     *
     * @param request {@link BulkRequest} 请求
     * @return BulkResponse
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/24-15:50
     **/
    private BulkResponse bulk(BulkRequest request) throws IOException {
        return restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
    }

    /**
     * <p>
     * 批量插入数据,通过 {@link List} 的对象集合进行插入,提交前,判断 该索引是否存在不存在则直接创建 该索引
     * 并对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
     * </p>
     *
     * @param indexName index
     * @param list      列表
     * @param created   当索引不存在,则创建索引,默认为 true,即索引不存在,创建该索引,此时 mapping 应该不为空
     * @param mapping   索引定义,JSON形式的字符串
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:26
     */
    public void batch(List<? extends BasePo> list, String indexName, boolean created, String mapping) throws Exception {
        try {
            if (!isIndexExists(indexName)) {
                log.error("[Index does not exist] Rebuilding index. IndexName ={}", indexName);
                if (created && StringUtils.isNotBlank(mapping)) {
                    createIndex(indexName, mapping);
                } else {
                    log.error("[Index does not exist , No index creation] IndexName ={}", indexName);
                    return;
                }
            }
            batch(indexName, list);
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 批量删除,根据索引名称,删除索引下数据
     *
     * @param indexName index
     * @param idList    待删除列表
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:14
     */
    public <T> void deleteBatch(String indexName, Collection<T> idList) {
        BulkRequest request = new BulkRequest();
        idList.forEach(item -> request.add(new DeleteRequest(indexName, item.toString())));
        try {
            restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 根据索引名称,和 {@link SearchSourceBuilder} 条件,以及返回对象实体类,返回列表
     *
     * @param indexName index
     * @param builder   查询参数
     * @param clazz     结果类对象
     * @return java.util.List<T>
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:14
     */
    public <T> List<T> search(String indexName, SearchSourceBuilder builder, Class<T> clazz) {
        List res = Collections.EMPTY_LIST;
        try {
            SearchRequest request = new SearchRequest(indexName);
            request.source(builder);
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            SearchHit[] hits = response.getHits().getHits();
            res = new ArrayList<>(hits.length);
            for (SearchHit hit : hits) {
                res.add(JSON.parseObject(hit.getSourceAsString(), clazz));
            }
        } catch (IOException e) {
            log.error("[ElasticSearch] connect err ,err-msg {}", e.getMessage());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return res;
    }

    /**
     * 删除 index,以及索引下数据
     *
     * @param indexName 索引名字
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:13
     */
    public void deleteIndex(String indexName) {
        try {
            restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除索引下数据,但是不删除索引结构
     *
     * @param builder    条件构建模式
     * @param indexNames 索引名称列表
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:13
     */
    public void deleteByQuery(QueryBuilder builder, String... indexNames) {
        try {
            DeleteByQueryRequest request = builderDeleteRequest(builder, indexNames);
            BulkByScrollResponse response = restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 不推荐使用,原因为不够灵活,获取该索引下可以匹配的数量,支持 模糊查询和精确查询,
     * 用法 在 方法 <b>field</b> 的处理上。
     * <ul>
     *     <li>模糊匹配模式:字段</li>
     *     <li>精确匹配模式:字段.类型</li>
     * </ul>
     *
     * @param indexName 文档索引名
     * @param field     字段
     * @param text      内容
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2018/07/20-20:47
     **/
    @Deprecated
    public long countMatchPhrasePrefixQuery(String indexName, String field, String text) {
        CountRequest countRequest = new CountRequest(indexName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(matchPhrasePrefixQuery(field, text));
        countRequest.source(searchSourceBuilder);
        CountResponse countResponse = null;
        try {
            countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return countResponse == null ? 0L : countResponse.getCount();
    }


    /**
     * 按照字段 内容进行精确匹配,返回匹配的数量
     *
     * @param field      字段名
     * @param content    内容
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:49
     **/
    public long exactCondition(String field, String content, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.termQuery(field, content));
        return count(builder, indexNames);
    }


    /**
     * 按照字段的前缀内容进行匹配,返回匹配的数量
     *
     * @param field      字段名
     * @param prefix     前缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:49
     **/
    public long prefix(String field, String prefix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.prefixQuery(field, prefix));
        return count(builder, indexNames);
    }


    /**
     * 按照字段对 内容进行后缀匹配,返回匹配的数量
     *
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:56
     **/
    public long suffix(String field, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }


    /**
     * 字段的前缀和后缀都必须满足条件
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixAndSuffix(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.prefixQuery(field, prefix));
        builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }

    /**
     * 字段的前缀和后缀都满足一个条件按即可
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixOrSuffix(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.should(QueryBuilders.prefixQuery(field, prefix));
        builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }

    /**
     * 字段的前缀必须满足,而 后缀则不要求 不一定满足
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixMustSuffixShould(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.prefixQuery(field, prefix));
        builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }

    /**
     * 字段的前缀选择性满足,而 后缀则一定要满足
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixShouldSuffixMust(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.should(QueryBuilders.prefixQuery(field, prefix));
        builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }


    /**
     * 查询总数
     *
     * @param indexNames 索引文档名称,可以是多个
     * @return long 匹配的数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/29-21:11
     **/
    public long total(String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        return count(builder, indexNames);
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param params     Map形式的 字段名 和 字段内容 组成的条件
     * @param bool      复合条件封装
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    public BulkByScrollResponse update(Map<String, Object> params, BoolCondition bool, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        setBuilders(builder,bool);
        return update(params,builder,indexNames);
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param page    当前页
     * @param size    每页大小
     * @param clazz      Class对象
     * @param indexNames 索引名,可以一次性查询多个
     * @return SearchHits 命中结果的数据集
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolCondition bool,Class<? extends BasePo> clazz, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        setBuilders(builder,bool);
        return searchPage(page,size,builder, clazz, indexNames);
    }

}

3. 项目代码

通过以上的集成,我们看到完成在项目中对 elasticsearch 的集成,同时也用基类,将所有可能的操作都封装起来。下来我们通过对基类的讲解,来逐个说明!

3.1. 索引管理

由于在ElasticIndexManger类定义了所有方法,直接调用即可。

3.1.1. 创建索引

我们在创建索引过程中需要先判断是否有这个索引,否则不允许创建,由于我案例采用的是手动指定 indexNameSettings ,大家看的过程中要特别注意下,而且还有一点 indexName 必须是小写,如果是大写在创建过程中会有错误

官方索引创建说明

索引名大写

。详细的代码实现见如下:


 /**
     * 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping) {
        createIndex(indexName, mapping, 0, 1);
    }


    /**
     * 指定索引结构创建索引
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @param replicas  副本的数量
     * @param shards    分片数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping, int replicas, int shards) {
        try {
            if (!this.existIndex(indexName)) {
                log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
                return;
            }
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            buildSetting(request, replicas, shards);
            request.mapping(mapping, XContentType.JSON);
            CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            if (!res.isAcknowledged()) {
                throw new RuntimeException("初始化失败");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(0);
        }
    }

创建索引需要设置分片,这里采用Settings.Builder方式,当然也可以 JSON 自定义方式,本文篇幅有限,不做演示。

index.number_of_shards:分片数

number_of_replicas:副本数

    /**
     * 设置分片 和 副本
     * 副本作用主要为了保证数据安全
     *
     * @param request 请求
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 19:27
     */
    protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
        request.settings(Settings.builder().put("index.number_of_shards", shards)
                .put("index.number_of_replicas", replicas));
    }

[elastic@localhost elastic]$ curl -H "Content-Type: application/json" -X GET "http://localhost:9200/_cat/indices?v"

health status index        uuid                   pri rep docs.count docs.deleted store.size pri.store.size

yellow open   twitter      scSSD1SfRCio4F77Hh8aqQ   3   2          2            0      8.3kb          8.3kb

yellow open   idx_location _BJ_pOv0SkS4tv-EC3xDig   3   2          1            0        4kb            4kb

yellow open   wongs        uT13XiyjSW-VOS3GCqao8w   3   2          1            0      3.4kb          3.4kb

yellow open   idx_locat    Kr3wGU7JT_OUrRJkyFSGDw   3   2          3            0     13.2kb         13.2kb

yellow open   idx_copy_to  HouC9s6LSjiwrJtDicgY3Q   3   2          1            0        4kb            4kb
  

说明创建成功,这里总是通过命令行来验证,有点繁琐,既然我都有WEB服务,为什么不直接通过HTTP验证了?

3.1.2. 查看索引

查看索引这个操作支持模糊操作,即以通配符 * 作为一个或者多个字符匹配,这个操作在实际应用非常好用,将来有机会说到 Index 设计过程中就显得尤为重要。


    /**
     * 获取所有索引,默认为所有索引
     *
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List getAllIndex() {
        return getAllIndex(Constants.MULTI_CHARACTER);
    }

    /**
     * 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
     *
     * @param inPattern 正则表达式
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List<String> getAllIndex(String inPattern) {
        GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
        try {
            GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
            String[] indices = getIndexResponse.getIndices();
            return Arrays.asList(indices);
        } catch (IOException e) {
            log.error("获取索引失败 {} 已经存在", e.getMessage());
        } catch (ElasticsearchStatusException e) {
            log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
        }
        return Collections.EMPTY_LIST;
    }

3.1.3. 删除索引

删除的逻辑就比较简单,这里就不多说。


    /**
     * 删除 index,以及索引下数据
     *
     * @param indexName 索引名字
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:13
     */
    public void deleteIndex(String indexName) {
        try {
            restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

3.2. 引用依赖

构建一个工程,我这里依然用 Gralde 工程作为样例说明, Maven 项目类似。

3.2.1. 依赖管理


implementation("io.github.rothschil:persistence-elasticsearch:1.2.3.RELEASE")

3.2.2. 依赖注入

在工程项目中直接使用 ElasticIndexManger 作为实例注入进来,后面我们可以直接使用它提供的各种方法。样例中我是定义一个精确查询作为说明,TermQueryBuilder("sysCode","crm") 中参数分别代表匹配条件的列名和列的值; 在索引列名中我这里用的是 通配符,即可以在多个索引之间查询; AccLog.class 这是我自定义的类,用以接收查询出来的结果进行实例化映射。


@Component
public class LogIndexManager{

    private ElasticIndexManger elasticIndexManger;

    @Autowired
    public void setElasticIndexManger(ElasticIndexManger elasticIndexManger) {
        this.elasticIndexManger = elasticIndexManger;
    }

    public List<AccLog> query(){
        QueryBuilder queryBuilder = new TermQueryBuilder("sysCode","crm");
        SearchSourceBuilder sb = new SearchSourceBuilder();
        sb.query(queryBuilder);
        return elasticIndexManger.search("hnqymh_hpg*",sb,AccLog.class);
    }
}

4. 源码

Gitee演示源码,记得给Star

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
17天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
52 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
JSON Java 网络架构
elasticsearch学习四:使用springboot整合 rest 进行搭建elasticsearch服务
这篇文章介绍了如何使用Spring Boot整合REST方式来搭建和操作Elasticsearch服务。
122 4
elasticsearch学习四:使用springboot整合 rest 进行搭建elasticsearch服务
|
19天前
|
JSON Java API
springboot集成ElasticSearch使用completion实现补全功能
springboot集成ElasticSearch使用completion实现补全功能
23 1
|
24天前
|
自然语言处理 Java API
Spring Boot 接入大模型实战:通义千问赋能智能应用快速构建
【10月更文挑战第23天】在人工智能(AI)技术飞速发展的今天,大模型如通义千问(阿里云推出的生成式对话引擎)等已成为推动智能应用创新的重要力量。然而,对于许多开发者而言,如何高效、便捷地接入这些大模型并构建出功能丰富的智能应用仍是一个挑战。
97 6
|
1月前
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
174 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
1月前
|
自然语言处理 Java Maven
elasticsearch学习二:使用springboot整合TransportClient 进行搭建elasticsearch服务
这篇博客介绍了如何使用Spring Boot整合TransportClient搭建Elasticsearch服务,包括项目创建、Maven依赖、业务代码和测试示例。
95 0
elasticsearch学习二:使用springboot整合TransportClient 进行搭建elasticsearch服务
|
1月前
|
Dart Android开发
鸿蒙Flutter实战:03-鸿蒙Flutter开发中集成Webview
本文介绍了在OpenHarmony平台上集成WebView的两种方法:一是使用第三方库`flutter_inappwebview`,通过配置pubspec.lock文件实现;二是编写原生ArkTS代码,自定义PlatformView,涉及创建入口能力、注册视图工厂、处理方法调用及页面构建等步骤。
48 0
|
1月前
|
开发框架 监控 搜索推荐
GoFly快速开发框架集成ZincSearch全文搜索引擎 - Elasticsearch轻量级替代为ZincSearch全文搜索引擎
本文介绍了在项目开发中使用ZincSearch作为全文搜索引擎的优势,包括其轻量级、易于安装和使用、资源占用低等特点,以及如何在GoFly快速开发框架中集成和使用ZincSearch,提供了详细的开发文档和实例代码,帮助开发者高效地实现搜索功能。
118 0
|
1月前
|
自然语言处理 搜索推荐 Java
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(一)
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图
49 0
|
8天前
|
存储 安全 数据管理
如何在 Rocky Linux 8 上安装和配置 Elasticsearch
本文详细介绍了在 Rocky Linux 8 上安装和配置 Elasticsearch 的步骤,包括添加仓库、安装 Elasticsearch、配置文件修改、设置内存和文件描述符、启动和验证 Elasticsearch,以及常见问题的解决方法。通过这些步骤,你可以快速搭建起这个强大的分布式搜索和分析引擎。
21 5