SpringBoot集成Elasticsearch7.4 实战(1)

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: Sprinboot集成Elasticsearch完成对索引的基本操作

elasticsearch

2. SpringBoot集成

开发工具,这里选择的是IDEA 2021.1.2,构建 Gradle 工程等一堆通用操作,不清楚的自行百度 或者 参看 90分钟玩转Gradle

2.1. 依赖配置

我这边选择 spring-boot-starter-data-elasticsearch 方式来集成 spring-boot 中集成的版本号与实际安装版本号的差异,尽量选择一致的版本,否则在集成过程中,会有莫名的问题。读者在选择的时候多加留意。


api("org.springframework.boot:spring-boot-starter-data-elasticsearch")

我在此基础上封装一层 persistence-elasticsearch,更贴近一般项目使用。

  • 中央仓库下载

中央仓库

  • 阿里云的仓库下载

20211227173753

2.2. 核心操作类

为了规范索引管理,这里将所有的操作都封装成一个基类,实现对索引的增删改查。同时还集成了对数据的单个以及批量的插入以及删除。避免针对每个索引都自己写一套实现,杜绝代码的冗余,同时这样的集成对代码的结构本身也是低侵入性。

  • AbstractElasticIndexManger

public abstract class AbstractElasticIndexManger {

    protected ElasticsearchRestTemplate elasticsearchRestTemplate;

    protected RestHighLevelClient restHighLevelClient;

    @Autowired
    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    @Autowired
    public void setElasticsearchRestTemplate(ElasticsearchRestTemplate elasticsearchRestTemplate) {
        this.elasticsearchRestTemplate = elasticsearchRestTemplate;
    }

    /**
     * 设置分片 和 副本
     * 副本作用主要为了保证数据安全
     *
     * @param request 请求
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 19:27
     */
    protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
        request.settings(Settings.builder().put("index.number_of_shards", shards)
                .put("index.number_of_replicas", replicas));
    }

    /**
     * 查询匹配条件的数据量,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param builder    BoolQueryBuilder类型查询实例
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected long count(BoolQueryBuilder builder, String... indexNames) {
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        nativeSearchQueryBuilder.withQuery(builder);
        return elasticsearchRestTemplate.count(nativeSearchQueryBuilder.build(), IndexCoordinates.of(indexNames));
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param builder    BoolQueryBuilder类型查询实例
     * @param clazz      Class对象
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected SearchHits search(BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        nativeSearchQueryBuilder.withQuery(builder);
        Pageable pageable = PageRequest.of(1, 20);
        nativeSearchQueryBuilder.withPageable(pageable);
        return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param page       当前页
     * @param size       每页大小
     * @param builder    BoolQueryBuilder类型查询实例
     * @param clazz      Class对象
     * @param indexNames 索引名,可以一次性查询多个
     * @return SearchHits 命中结果的数据集
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        nativeSearchQueryBuilder.withQuery(builder);
        Pageable pageable = PageRequest.of(page, size);
        nativeSearchQueryBuilder.withPageable(pageable);
        return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
    }

    protected DeleteByQueryRequest builderDeleteRequest(QueryBuilder builder, String... indexNames) {
        DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames);
        request.setQuery(builder);
        request.setBatchSize(0X5F5E0FF);
        request.setConflicts("proceed");
        return request;
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param params     Map形式的 字段名 和 字段内容 组成的条件
     * @param builder    BoolQueryBuilder类型查询实例
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected BulkByScrollResponse update(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
        UpdateByQueryRequest request = buildUpdateByQueryReq(params, builder, indexNames);
        try {
            return restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 构建更新 QueryRequest
     *
     * @param params     参数
     * @param builder    布尔构建
     * @param indexNames 索引
     * @return UpdateByQueryRequest
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-15:50
     **/
    protected UpdateByQueryRequest buildUpdateByQueryReq(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
        Script script = buildScriptType(params);
        UpdateByQueryRequest request = new UpdateByQueryRequest(indexNames);
        request.setQuery(builder);
        request.setScript(script);
        request.setConflicts("proceed");
        request.setRefresh(true);
        request.setTimeout(TimeValue.timeValueMinutes(3));
        return request;
    }

    /**
     * 以 K-V键值对 方式构建条件 Script
     *
     * @param params Map形式的 字段名 和 字段内容 组成的条件
     * @return Script
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-13:19
     **/
    protected Script buildScriptType(Map<String, Object> params) {
        Set<String> keys = params.keySet();
        StringBuffer idOrCodeStb = new StringBuffer();
        for (String key : keys) {
            idOrCodeStb.append("ctx._source.").append(key).append("=params.").append(key).append(";");
        }
        ScriptType type = ScriptType.INLINE;
        return new Script(type, Script.DEFAULT_SCRIPT_LANG, idOrCodeStb.toString(), params);
    }

    /**
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void setBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        mustBuilders(builder, bool);
        mustNotBuilders(builder, bool);
        shouldBuilders(builder, bool);
        filterBuilders(builder, bool);
    }

    /**
     * 构建满足 必须 条件 的方法
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void mustBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> must = bool.getMust();
        if (must.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : must) {
            builder.must(getQueryBuilder(cds));
        }
    }

    /**
     * 构建满足 非必须 条件 的方法
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void mustNotBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> mustNot = bool.getMustNot();
        if (mustNot.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : mustNot) {
            builder.mustNot(getQueryBuilder(cds));
        }
    }

    /**
     * 构建满足 可选 条件 的方法
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void shouldBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> should = bool.getShould();
        if (should.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : should) {
            builder.should(getQueryBuilder(cds));
        }
    }

    /**
     * 构建满足 必须 条件 的方法,推荐使用
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void filterBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> filter = bool.getFilter();
        if (filter.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : filter) {
            builder.filter(getQueryBuilder(cds));
        }
    }

    public QueryBuilder getQueryBuilder(AtomicCondition cds) {
        QueryBuilder queryBuilder;
        Tuple tuple = cds.getTuple();
        switch (cds.getStatus()) {
            case (Constants.SUFFIX_QUERY):
                queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.MULTI_CHARACTER + tuple.v1());
                break;
            case (Constants.SUFFIX_SINGLE_QUERY):
                queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.SINGLE_CHARACTER + tuple.v1());
                break;
            case (Constants.RANGE_QUERY):
                queryBuilder = QueryBuilders.rangeQuery(cds.getField()).from(tuple.v1()).to(tuple.v2());
                break;
            case (Constants.PREFIX_QUERY):
                queryBuilder = QueryBuilders.prefixQuery(cds.getField(), tuple.v1().toString());
                break;
            case (Constants.REG_QUERY):
                queryBuilder = QueryBuilders.regexpQuery(cds.getField(), tuple.v1().toString());
                break;
            default:
                queryBuilder = QueryBuilders.termQuery(cds.getField(), tuple.v1().toString());
                break;
        }
        return queryBuilder;
    }
}

  • ElasticIndexManger

public class ElasticIndexManger extends AbstractElasticIndexManger {


    /**
     * 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping) {
        createIndex(indexName, mapping, 0, 1);
    }


    /**
     * 指定索引结构创建索引
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @param replicas  副本的数量
     * @param shards    分片数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping, int replicas, int shards) {
        try {
            if (!this.existIndex(indexName)) {
                log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
                return;
            }
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            buildSetting(request, replicas, shards);
            request.mapping(mapping, XContentType.JSON);
            CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            if (!res.isAcknowledged()) {
                throw new RuntimeException("初始化失败");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(0);
        }
    }

    /**
     * 获取所有索引,默认为所有索引
     *
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List getAllIndex() {
        return getAllIndex(Constants.MULTI_CHARACTER);
    }

    /**
     * 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
     *
     * @param inPattern 正则表达式
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List<String> getAllIndex(String inPattern) {
        GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
        try {
            GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
            String[] indices = getIndexResponse.getIndices();
            return Arrays.asList(indices);
        } catch (IOException e) {
            log.error("获取索引失败 {} 已经存在", e.getMessage());
        } catch (ElasticsearchStatusException e) {
            log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
        }
        return Collections.EMPTY_LIST;
    }

    /**
     * 制定配置项的判断索引是否存在,注意与 isExistsIndex 区别
     * <ul>
     *     <li>1、可以指定 用本地检索 还是用 主动节点方式检索</li>
     *     <li>2、是否适应被人读的方式</li>
     *     <li>3、返回默认设置</li>
     * </ul>
     *
     * @param indexName index名
     * @return boolean
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:27
     */
    public boolean existIndex(String indexName) throws IOException {
        GetIndexRequest request = new GetIndexRequest(indexName);
        //TRUE-返回本地信息检索状态,FALSE-还是从主节点检索状态
        request.local(false);
        //是否适应被人可读的格式返回
        request.humanReadable(true);
        //是否为每个索引返回所有默认设置
        request.includeDefaults(false);
        //控制如何解决不可用的索引以及如何扩展通配符表达式,忽略不可用索引的索引选项,仅将通配符扩展为开放索引,并且不允许从通配符表达式解析任何索引
        request.indicesOptions(IndicesOptions.lenientExpandOpen());
        return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
    }

    /**
     * 单纯断某个索引是否存在
     *
     * @param indexName index名
     * @return boolean 存在为True,不存在则 False
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:27
     */
    public boolean isIndexExists(String indexName) throws Exception {
        return restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
    }

    /**
     * 批量插入数据,通过 {@link List} 的对象集合进行插入,此处对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
     *
     * @param indexName index
     * @param list      列表
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:26
     */
    public void batch(String indexName, List<? extends BasePo> list) throws IOException {
        int sleep = 15;
        BulkRequest request = new BulkRequest();
        list.forEach(item -> request.add(new IndexRequest(indexName)
                .id(item.getId().toString())
                .source(JSON.toJSONString(item), XContentType.JSON)));
        try {
            BulkResponse bulkResponse = bulk(request);
            log.error("[Verification BulkResponse bulk 操作结果] {}, 文件大小 {} ", bulkResponse.status(), list.size());
            if (bulkResponse.hasFailures()) {
                log.error(bulkResponse.buildFailureMessage());
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    if (bulkItemResponse.isFailed()) {
                        log.error(bulkItemResponse.getFailureMessage());
                    }
                }
                log.error("批量操作失败,重新再提交一次,间隔时间{}, 文件大小 {} ", sleep, list.size());
                TimeUnit.SECONDS.sleep(sleep);
                bulkResponse = bulk(request);
                if (bulkResponse.hasFailures()) {
                    log.error("再次提交失败,需要写入MQ , 文件大小 {} ", list.size());
                }
            }
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * bulk 方式批量提交
     *
     * @param request {@link BulkRequest} 请求
     * @return BulkResponse
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/24-15:50
     **/
    private BulkResponse bulk(BulkRequest request) throws IOException {
        return restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
    }

    /**
     * <p>
     * 批量插入数据,通过 {@link List} 的对象集合进行插入,提交前,判断 该索引是否存在不存在则直接创建 该索引
     * 并对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
     * </p>
     *
     * @param indexName index
     * @param list      列表
     * @param created   当索引不存在,则创建索引,默认为 true,即索引不存在,创建该索引,此时 mapping 应该不为空
     * @param mapping   索引定义,JSON形式的字符串
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:26
     */
    public void batch(List<? extends BasePo> list, String indexName, boolean created, String mapping) throws Exception {
        try {
            if (!isIndexExists(indexName)) {
                log.error("[Index does not exist] Rebuilding index. IndexName ={}", indexName);
                if (created && StringUtils.isNotBlank(mapping)) {
                    createIndex(indexName, mapping);
                } else {
                    log.error("[Index does not exist , No index creation] IndexName ={}", indexName);
                    return;
                }
            }
            batch(indexName, list);
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 批量删除,根据索引名称,删除索引下数据
     *
     * @param indexName index
     * @param idList    待删除列表
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:14
     */
    public <T> void deleteBatch(String indexName, Collection<T> idList) {
        BulkRequest request = new BulkRequest();
        idList.forEach(item -> request.add(new DeleteRequest(indexName, item.toString())));
        try {
            restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 根据索引名称,和 {@link SearchSourceBuilder} 条件,以及返回对象实体类,返回列表
     *
     * @param indexName index
     * @param builder   查询参数
     * @param clazz     结果类对象
     * @return java.util.List<T>
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:14
     */
    public <T> List<T> search(String indexName, SearchSourceBuilder builder, Class<T> clazz) {
        List res = Collections.EMPTY_LIST;
        try {
            SearchRequest request = new SearchRequest(indexName);
            request.source(builder);
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            SearchHit[] hits = response.getHits().getHits();
            res = new ArrayList<>(hits.length);
            for (SearchHit hit : hits) {
                res.add(JSON.parseObject(hit.getSourceAsString(), clazz));
            }
        } catch (IOException e) {
            log.error("[ElasticSearch] connect err ,err-msg {}", e.getMessage());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return res;
    }

    /**
     * 删除 index,以及索引下数据
     *
     * @param indexName 索引名字
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:13
     */
    public void deleteIndex(String indexName) {
        try {
            restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除索引下数据,但是不删除索引结构
     *
     * @param builder    条件构建模式
     * @param indexNames 索引名称列表
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:13
     */
    public void deleteByQuery(QueryBuilder builder, String... indexNames) {
        try {
            DeleteByQueryRequest request = builderDeleteRequest(builder, indexNames);
            BulkByScrollResponse response = restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 不推荐使用,原因为不够灵活,获取该索引下可以匹配的数量,支持 模糊查询和精确查询,
     * 用法 在 方法 <b>field</b> 的处理上。
     * <ul>
     *     <li>模糊匹配模式:字段</li>
     *     <li>精确匹配模式:字段.类型</li>
     * </ul>
     *
     * @param indexName 文档索引名
     * @param field     字段
     * @param text      内容
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2018/07/20-20:47
     **/
    @Deprecated
    public long countMatchPhrasePrefixQuery(String indexName, String field, String text) {
        CountRequest countRequest = new CountRequest(indexName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(matchPhrasePrefixQuery(field, text));
        countRequest.source(searchSourceBuilder);
        CountResponse countResponse = null;
        try {
            countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return countResponse == null ? 0L : countResponse.getCount();
    }


    /**
     * 按照字段 内容进行精确匹配,返回匹配的数量
     *
     * @param field      字段名
     * @param content    内容
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:49
     **/
    public long exactCondition(String field, String content, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.termQuery(field, content));
        return count(builder, indexNames);
    }


    /**
     * 按照字段的前缀内容进行匹配,返回匹配的数量
     *
     * @param field      字段名
     * @param prefix     前缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:49
     **/
    public long prefix(String field, String prefix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.prefixQuery(field, prefix));
        return count(builder, indexNames);
    }


    /**
     * 按照字段对 内容进行后缀匹配,返回匹配的数量
     *
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:56
     **/
    public long suffix(String field, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }


    /**
     * 字段的前缀和后缀都必须满足条件
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixAndSuffix(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.prefixQuery(field, prefix));
        builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }

    /**
     * 字段的前缀和后缀都满足一个条件按即可
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixOrSuffix(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.should(QueryBuilders.prefixQuery(field, prefix));
        builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }

    /**
     * 字段的前缀必须满足,而 后缀则不要求 不一定满足
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixMustSuffixShould(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.prefixQuery(field, prefix));
        builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }

    /**
     * 字段的前缀选择性满足,而 后缀则一定要满足
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixShouldSuffixMust(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.should(QueryBuilders.prefixQuery(field, prefix));
        builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }


    /**
     * 查询总数
     *
     * @param indexNames 索引文档名称,可以是多个
     * @return long 匹配的数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/29-21:11
     **/
    public long total(String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        return count(builder, indexNames);
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param params     Map形式的 字段名 和 字段内容 组成的条件
     * @param bool      复合条件封装
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    public BulkByScrollResponse update(Map<String, Object> params, BoolCondition bool, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        setBuilders(builder,bool);
        return update(params,builder,indexNames);
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param page    当前页
     * @param size    每页大小
     * @param clazz      Class对象
     * @param indexNames 索引名,可以一次性查询多个
     * @return SearchHits 命中结果的数据集
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolCondition bool,Class<? extends BasePo> clazz, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        setBuilders(builder,bool);
        return searchPage(page,size,builder, clazz, indexNames);
    }

}

3. 项目代码

通过以上的集成,我们看到完成在项目中对 elasticsearch 的集成,同时也用基类,将所有可能的操作都封装起来。下来我们通过对基类的讲解,来逐个说明!

3.1. 索引管理

由于在ElasticIndexManger类定义了所有方法,直接调用即可。

3.1.1. 创建索引

我们在创建索引过程中需要先判断是否有这个索引,否则不允许创建,由于我案例采用的是手动指定 indexNameSettings ,大家看的过程中要特别注意下,而且还有一点 indexName 必须是小写,如果是大写在创建过程中会有错误

官方索引创建说明

索引名大写

。详细的代码实现见如下:


 /**
     * 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping) {
        createIndex(indexName, mapping, 0, 1);
    }


    /**
     * 指定索引结构创建索引
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @param replicas  副本的数量
     * @param shards    分片数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping, int replicas, int shards) {
        try {
            if (!this.existIndex(indexName)) {
                log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
                return;
            }
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            buildSetting(request, replicas, shards);
            request.mapping(mapping, XContentType.JSON);
            CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            if (!res.isAcknowledged()) {
                throw new RuntimeException("初始化失败");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(0);
        }
    }

创建索引需要设置分片,这里采用Settings.Builder方式,当然也可以 JSON 自定义方式,本文篇幅有限,不做演示。

index.number_of_shards:分片数

number_of_replicas:副本数

    /**
     * 设置分片 和 副本
     * 副本作用主要为了保证数据安全
     *
     * @param request 请求
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 19:27
     */
    protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
        request.settings(Settings.builder().put("index.number_of_shards", shards)
                .put("index.number_of_replicas", replicas));
    }

[elastic@localhost elastic]$ curl -H "Content-Type: application/json" -X GET "http://localhost:9200/_cat/indices?v"

health status index        uuid                   pri rep docs.count docs.deleted store.size pri.store.size

yellow open   twitter      scSSD1SfRCio4F77Hh8aqQ   3   2          2            0      8.3kb          8.3kb

yellow open   idx_location _BJ_pOv0SkS4tv-EC3xDig   3   2          1            0        4kb            4kb

yellow open   wongs        uT13XiyjSW-VOS3GCqao8w   3   2          1            0      3.4kb          3.4kb

yellow open   idx_locat    Kr3wGU7JT_OUrRJkyFSGDw   3   2          3            0     13.2kb         13.2kb

yellow open   idx_copy_to  HouC9s6LSjiwrJtDicgY3Q   3   2          1            0        4kb            4kb
  

说明创建成功,这里总是通过命令行来验证,有点繁琐,既然我都有WEB服务,为什么不直接通过HTTP验证了?

3.1.2. 查看索引

查看索引这个操作支持模糊操作,即以通配符 * 作为一个或者多个字符匹配,这个操作在实际应用非常好用,将来有机会说到 Index 设计过程中就显得尤为重要。


    /**
     * 获取所有索引,默认为所有索引
     *
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List getAllIndex() {
        return getAllIndex(Constants.MULTI_CHARACTER);
    }

    /**
     * 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
     *
     * @param inPattern 正则表达式
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List<String> getAllIndex(String inPattern) {
        GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
        try {
            GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
            String[] indices = getIndexResponse.getIndices();
            return Arrays.asList(indices);
        } catch (IOException e) {
            log.error("获取索引失败 {} 已经存在", e.getMessage());
        } catch (ElasticsearchStatusException e) {
            log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
        }
        return Collections.EMPTY_LIST;
    }

3.1.3. 删除索引

删除的逻辑就比较简单,这里就不多说。


    /**
     * 删除 index,以及索引下数据
     *
     * @param indexName 索引名字
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:13
     */
    public void deleteIndex(String indexName) {
        try {
            restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

3.2. 引用依赖

构建一个工程,我这里依然用 Gralde 工程作为样例说明, Maven 项目类似。

3.2.1. 依赖管理


implementation("io.github.rothschil:persistence-elasticsearch:1.2.3.RELEASE")

3.2.2. 依赖注入

在工程项目中直接使用 ElasticIndexManger 作为实例注入进来,后面我们可以直接使用它提供的各种方法。样例中我是定义一个精确查询作为说明,TermQueryBuilder("sysCode","crm") 中参数分别代表匹配条件的列名和列的值; 在索引列名中我这里用的是 通配符,即可以在多个索引之间查询; AccLog.class 这是我自定义的类,用以接收查询出来的结果进行实例化映射。


@Component
public class LogIndexManager{

    private ElasticIndexManger elasticIndexManger;

    @Autowired
    public void setElasticIndexManger(ElasticIndexManger elasticIndexManger) {
        this.elasticIndexManger = elasticIndexManger;
    }

    public List<AccLog> query(){
        QueryBuilder queryBuilder = new TermQueryBuilder("sysCode","crm");
        SearchSourceBuilder sb = new SearchSourceBuilder();
        sb.query(queryBuilder);
        return elasticIndexManger.search("hnqymh_hpg*",sb,AccLog.class);
    }
}

4. 源码

Gitee演示源码,记得给Star

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
目录
相关文章
|
15天前
|
人工智能 Java API
Java与大模型集成实战:构建智能Java应用的新范式
随着大型语言模型(LLM)的API化,将其强大的自然语言处理能力集成到现有Java应用中已成为提升应用智能水平的关键路径。本文旨在为Java开发者提供一份实用的集成指南。我们将深入探讨如何使用Spring Boot 3框架,通过HTTP客户端与OpenAI GPT(或兼容API)进行高效、安全的交互。内容涵盖项目依赖配置、异步非阻塞的API调用、请求与响应的结构化处理、异常管理以及一些面向生产环境的最佳实践,并附带完整的代码示例,助您快速将AI能力融入Java生态。
183 12
|
18天前
|
人工智能 自然语言处理 API
快速集成GPT-4o:下一代多模态AI实战指南
快速集成GPT-4o:下一代多模态AI实战指南
213 101
|
22天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
254 1
存储 JSON Java
238 0
|
2月前
|
人工智能 自然语言处理 分布式计算
AI 驱动传统 Java 应用集成的关键技术与实战应用指南
本文探讨了如何将AI技术与传统Java应用集成,助力企业实现数字化转型。内容涵盖DJL、Deeplearning4j等主流AI框架选择,技术融合方案,模型部署策略,以及智能客服、财务审核、设备诊断等实战应用案例,全面解析Java系统如何通过AI实现智能化升级与效率提升。
225 0
|
4月前
|
缓存 监控 安全
通义大模型与现有企业系统集成实战《CRM案例分析与安全最佳实践》
本文档详细介绍了基于通义大模型的CRM系统集成架构设计与优化实践。涵盖混合部署架构演进(新增向量缓存、双通道同步)、性能基准测试对比、客户意图分析模块、商机预测系统等核心功能实现。同时,深入探讨了安全防护体系、三级缓存架构、请求批处理优化及故障处理机制,并展示了实时客户画像生成和动态提示词工程。通过实施,显著提升客服响应速度(425%)、商机识别准确率(37%)及客户满意度(15%)。最后,规划了技术演进路线图,从单点集成迈向自主优化阶段,推动业务效率与价值持续增长。
159 7
|
5月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
514 4
|
5月前
|
人工智能 安全 Shell
Jupyter MCP服务器部署实战:AI模型与Python环境无缝集成教程
Jupyter MCP服务器基于模型上下文协议(MCP),实现大型语言模型与Jupyter环境的无缝集成。它通过标准化接口,让AI模型安全访问和操作Jupyter核心组件,如内核、文件系统和终端。本文深入解析其技术架构、功能特性及部署方法。MCP服务器解决了传统AI模型缺乏实时上下文感知的问题,支持代码执行、变量状态获取、文件管理等功能,提升编程效率。同时,严格的权限控制确保了安全性。作为智能化交互工具,Jupyter MCP为动态计算环境与AI模型之间搭建了高效桥梁。
338 2
Jupyter MCP服务器部署实战:AI模型与Python环境无缝集成教程
|
5月前
|
JSON JavaScript API
MCP 实战:用配置与真实代码玩转 GitHub 集成
MCP 实战:用配置与真实代码玩转 GitHub 集成
1238 4
|
6月前
|
人工智能 自然语言处理 运维
让搜索引擎“更懂你”:AI × Elasticsearch MCP Server 开源实战
本文介绍基于Model Context Protocol (MCP)标准的Elasticsearch MCP Server,它为AI助手(如Claude、Cursor等)提供与Elasticsearch数据源交互的能力。文章涵盖MCP概念、Elasticsearch MCP Server的功能特性及实际应用场景,例如数据探索、开发辅助。通过自然语言处理,用户无需掌握复杂查询语法即可操作Elasticsearch,显著降低使用门槛并提升效率。项目开源地址:&lt;https://github.com/awesimon/elasticsearch-mcp&gt;,欢迎体验与反馈。
1543 1