springboot 2.0集成elasticsearch 7.6.2(集群)(下)

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: springboot 2.0集成elasticsearch 7.6.2(集群)(下)

正文


五、ES工具类 (索引相关配置不懂的,请查看elasticsearch 7.6.2 - 索引管理)


/**
 * Elasticsearch工具类-用于操作ES
 * @author Kou Shenhai 2413176044@leimingtech.com
 * @version 1.0
 * @date 2021/1/24 0024 下午 5:42
 */
@Slf4j
@Component
public class ElasticsearchUtil {
    private static final String PRIMARY_KEY_NAME = "id";
    @Value("${elasticsearch.synonym.path}")
    private String synonymPath;
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    /**
     * 批量同步数据到ES
     * @param indexName 索引名称
     * @param indexAlias 别名
     * @param jsonDataList 数据列表
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    public boolean saveDataBatch(String indexName,String indexAlias,String jsonDataList,Class clazz) throws IOException {
        //判空
        if (StringUtils.isBlank(jsonDataList)) {
            return false;
        }
        if (syncIndex(indexName, indexAlias, clazz)) {
            return false;
        }
        //3.批量操作Request
        BulkRequest bulkRequest = packBulkIndexRequest(indexName, jsonDataList);
        if (bulkRequest.requests().isEmpty()) {
            return false;
        }
        final BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        if (bulk.hasFailures()) {
            for (BulkItemResponse item : bulk.getItems()) {
                log.error("索引[{}],主键[{}]更新操作失败,状态为:[{}],错误信息:{}",indexName,item.getId(),item.status(),item.getFailureMessage());
            }
            return false;
        }
        // 记录索引新增与修改数量
        Integer createdCount = 0;
        Integer updatedCount = 0;
        for (BulkItemResponse item : bulk.getItems()) {
            if (DocWriteResponse.Result.CREATED.equals(item.getResponse().getResult())) {
                createdCount++;
            } else if (DocWriteResponse.Result.UPDATED.equals(item.getResponse().getResult())){
                updatedCount++;
            }
        }
        log.info("索引[{}]批量同步更新成功,共新增[{}]个,修改[{}]个",indexName,createdCount,updatedCount);
        return true;
    }
    /**
     * ES修改数据
     * @param indexName 索引名称
     * @param id 主键
     * @param paramJson 参数JSON
     * @return
     */
    public boolean updateData(String indexName,String id,String paramJson) {
        UpdateRequest updateRequest = new UpdateRequest(indexName, id);
        //如果修改索引中不存在则进行新增
        updateRequest.docAsUpsert(true);
        //立即刷新数据
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        updateRequest.doc(paramJson,XContentType.JSON);
        try {
            UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            log.info("索引[{}],主键:【{}】操作结果:[{}]",indexName,id,updateResponse.getResult());
            if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) {
                //新增
                log.info("索引:【{}】,主键:【{}】新增成功",indexName,id);
                return true;
            } else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
                //修改
                log.info("索引:【{}】,主键:【{}】修改成功",indexName, id);
                return true;
            } else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) {
                //无变化
                log.info("索引:[{}],主键:[{}]无变化",indexName, id);
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
            log.error("索引:[{}],主键:【{}】,更新异常:[{}]",indexName, id,e);
            return false;
        }
        return false;
    }
    /**
     * 批量修改ES
     * @param indexName 索引名称
     * @param indexAlias 别名
     * @param jsonDataList 数据列表
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    public boolean updateDataBatch(String indexName,String indexAlias, String jsonDataList,Class clazz) throws IOException {
        //1.创建索引
        boolean createIndexFlag = createIndex(indexName,indexAlias, clazz);
        if (!createIndexFlag) {
            return false;
        }
        return this.updateDataBatch(indexName,jsonDataList);
    }
    /**
     * 删除数据
     * @param indexName 索引名称
     * @param id 主键
     * @return
     */
    public boolean deleteData(String indexName,String id) {
        DeleteRequest deleteRequest = new DeleteRequest(indexName);
        deleteRequest.id(id);
        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try {
            DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
            if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult())) {
                log.error("索引:【{}】,主键:【{}】删除失败",indexName, id);
                return false;
            } else {
                log.info("索引【{}】主键【{}】删除成功",indexName, id);
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
            log.error("删除索引【{}】出现异常[{}]",indexName,e);
            return false;
        }
    }
    /**
     * 批量删除ES
     * @param indexName 索引名称
     * @param ids id列表
     * @return
     */
    public boolean deleteDataBatch(String indexName,List<String> ids) {
        if (CollectionUtils.isEmpty(ids)) {
            return false;
        }
        BulkRequest bulkRequest = packBulkDeleteRequest(indexName, ids);
        try {
            BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (bulk.hasFailures()) {
                for (BulkItemResponse item : bulk.getItems()) {
                    log.error("删除索引:[{}],主键:{}失败,信息:{}",indexName,item.getId(),item.getFailureMessage());
                }
                return false;
            }
            //记录索引新增与修改数量
            Integer deleteCount = 0;
            for (BulkItemResponse item : bulk.getItems()) {
                if (DocWriteResponse.Result.DELETED.equals(item.getResponse().getResult())) {
                    deleteCount++;
                }
            }
            log.info("批量删除索引[{}]成功,共删除[{}]个",indexName,deleteCount);
            return true;
        } catch (IOException e) {
            e.printStackTrace();
            log.error("删除索引:【{}】出现异常:{}",indexName,e);
            return false;
        }
    }
    /**
     * 组装删除操作
     * @param indexName 索引名称
     * @param ids id列表
     * @return
     */
    private BulkRequest packBulkDeleteRequest(String indexName, List<String> ids) {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        ids.forEach(id -> {
            DeleteRequest deleteRequest = new DeleteRequest(indexName);
            deleteRequest.id(id);
            bulkRequest.add(deleteRequest);
        });
        return bulkRequest;
    }
    /**
     * 批量修改ES
     * @param indexName 索引名称
     * @param jsonDataList json数据列表
     * @return
     */
    public boolean updateDataBatch(String indexName, String jsonDataList) {
        //判空
        if (StringUtils.isBlank(jsonDataList)) {
            return false;
        }
        BulkRequest bulkRequest = packBulkUpdateRequest(indexName, jsonDataList);
        if (bulkRequest.requests().isEmpty()) {
            return false;
        }
        try {
            //同步执行
            BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (bulk.hasFailures()) {
                for (BulkItemResponse item : bulk.getItems()) {
                    log.error("索引【{}】,主键[{}]修改操作失败,状态为【{}】,错误信息:{}",indexName,item.status(),item.getFailureMessage());
                }
                return false;
            }
            //记录索引新增与修改数量
            Integer createCount = 0;
            Integer updateCount = 0;
            for (BulkItemResponse item : bulk.getItems()) {
                if (DocWriteResponse.Result.CREATED.equals(item.getResponse().getResult())) {
                    createCount++;
                } else if (DocWriteResponse.Result.UPDATED.equals(item.getResponse().getResult())) {
                    updateCount++;
                }
            }
            log.info("索引【{}】批量修改更新成功,共新增[{}]个,修改[{}]个",indexName,createCount,updateCount);
        } catch (IOException e) {
            e.printStackTrace();
            log.error("索引【{}】批量修改更新出现异常",indexName);
            return false;
        }
        return true;
    }
    /**
     * 组装bulkUpdate
     * @param indexName 索引名称
     * @param jsonDataList 数据列表
     * @return
     */
    private BulkRequest packBulkUpdateRequest(String indexName,String jsonDataList) {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
        if (jsonArray.isEmpty()) {
            return bulkRequest;
        }
        jsonArray.forEach(o -> {
            Map<String, String> map = (Map<String, String>) o;
            UpdateRequest updateRequest = new UpdateRequest(indexName,map.get(PRIMARY_KEY_NAME));
            // 修改索引中不存在就新增
            updateRequest.docAsUpsert(true);
            updateRequest.doc(JSON.toJSONString(o),XContentType.JSON);
            bulkRequest.add(updateRequest);
        });
        return bulkRequest;
    }
    /**
     * 删除索引、新建索引
     * @param indexName 索引名称
     * @param indexAlias 别名
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    private boolean syncIndex(String indexName, String indexAlias, Class clazz) throws IOException {
        //1.删除索引
        boolean deleteAllFlag = deleteIndex(indexName);
        if (!deleteAllFlag) {
            return true;
        }
        //2.创建索引
        boolean createIndexFlag = createIndex(indexName, indexAlias, clazz);
        if (!createIndexFlag) {
            return true;
        }
        return false;
    }
    /**
     * 根据主键查询ES
     * @param indexName 索引名称
     * @param id 主键
     * @return
     */
    public String getDataById(String indexName,String id) {
        //判断索引是否存在
        //1.判断索引是否存在
        boolean result = isIndexExists(indexName);
        if (!result) {
            return "";
        }
        GetRequest getRequest = new GetRequest(indexName, id);
        try {
            GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
            String resultJson = getResponse.getSourceAsString();
            log.info("索引【{}】主键【{}】,查询结果:【{}】",indexName,id,resultJson);
            return resultJson;
        } catch (IOException e) {
            e.printStackTrace();
            log.error("索引【{}】主键[{}],查询异常:{}",indexName,id,e);
            return "";
        }
    }
    /**
     * 清空索引内容
     * @param indexName 索引名称
     * @return
     */
    public boolean deleteAll(String indexName) {
        //1.判断索引是否存在
        boolean result = isIndexExists(indexName);
        if (!result) {
            log.error("索引【{}】不存在,删除失败",indexName);
            return false;
        }
        DeleteRequest deleteRequest = new DeleteRequest(indexName);
        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try {
            DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
            if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult())) {
                log.error("索引【{}】删除失败",indexName);
                return false;
            }
            log.info("索引【{}】删除成功",indexName);
            return true;
        } catch (IOException e) {
            e.printStackTrace();
            log.error("删除索引[{}],出现异常[{}]",indexName,e);
            return false;
        }
    }
    /**
     * 批量数据保存到ES-异步
     * @param indexName 索引名称
     * @param indexAlias 索引别名
     * @param jsonDataList 数据列表
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    public boolean saveDataBatchSync(String indexName,String indexAlias,String jsonDataList,Class clazz) throws IOException {
        //判空
        if (StringUtils.isBlank(jsonDataList)) {
            return false;
        }
        if (syncIndex(indexName, indexAlias, clazz)) {
            return false;
        }
        //3.批量操作Request
        BulkRequest bulkRequest = packBulkIndexRequest(indexName, jsonDataList);
        if (bulkRequest.requests().isEmpty()) {
            return false;
        }
        //4.异步执行
        ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkItemResponses) {
                if (bulkItemResponses.hasFailures()) {
                    for (BulkItemResponse item : bulkItemResponses.getItems()) {
                        log.error("索引【{}】,主键【{}】更新失败,状态【{}】,错误信息:{}",indexName,item.getId(),
                                item.status(),item.getFailureMessage());
                    }
                }
            }
            //失败操作
            @Override
            public void onFailure(Exception e) {
                log.error("索引【{}】批量异步更新出现异常:{}",indexName,e);
            }
        };
        restHighLevelClient.bulkAsync(bulkRequest,RequestOptions.DEFAULT,listener);
        log.info("索引批量更新索引【{}】中",indexName);
        return true;
    }
    /**
     * 删除索引
     * @param indexName 索引名称
     * @return
     * @throws IOException
     */
    public boolean deleteIndex(String indexName) throws IOException {
        //1.判断索引是否存在
        boolean result = isIndexExists(indexName);
        if (!result) {
            log.error("索引【{}】不存在,删除失败",indexName);
            return false;
        }
        //2.删除操作Request
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
        deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
        AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
        if (!acknowledgedResponse.isAcknowledged()) {
            log.error("索引【{}】删除失败",indexName);
            return false;
        }
        log.info("索引【{}】删除成功",indexName);
        return true;
    }
    /**
     * 批量操作的Request
     * @param indexName 索引名称
     * @param jsonDataList json数据列表
     * @return
     */
    private BulkRequest packBulkIndexRequest(String indexName,String jsonDataList) {
        BulkRequest bulkRequest = new BulkRequest();
        //IMMEDIATE > 请求向es提交数据,立即进行数据刷新<实时性高,资源消耗高>
        //WAIT_UNTIL >  请求向es提交数据,等待数据完成刷新<实时性高,资源消耗低>
        //NONE > 默认策略<实时性低>
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
        if (jsonArray.isEmpty()) {
            return bulkRequest;
        }
        //循环数据封装bulkRequest
        jsonArray.forEach(obj ->{
            final Map<String, String> map = (Map<String, String>) obj;
            IndexRequest indexRequest = new IndexRequest(indexName);
            indexRequest.source(JSON.toJSONString(obj),XContentType.JSON);
            indexRequest.id(map.get(PRIMARY_KEY_NAME));
            bulkRequest.add(indexRequest);
        });
        return bulkRequest;
    }
    /**
     * 创建索引
     * @param indexName 索引名称
     * @param indexAlias 别名
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    public boolean createIndex(String indexName,String indexAlias,Class clazz) throws IOException {
        //判断索引是否存在
        boolean result = isIndexExists(indexName);
        if (!result) {
            boolean createResult = createIndexAndCreateMapping(indexName,indexAlias, FieldMappingUtil.getFieldInfo(clazz));
            if (!createResult) {
                log.info("索引【{}】创建失败",indexName);
                return false;
            }
        }
        log.info("索引:[{}]创建成功",indexName);
        return true;
    }
    /**
     * 数据同步到ES
     * @param id 主键
     * @param indexName 索引名称
     * @param jsonData json数据
     * @param clazz 类型
     * @return
     */
    public boolean saveData(String id,String indexName,String indexAlias,String jsonData,Class clazz) throws IOException {
        //1.创建索引
        boolean createIndexFlag = createIndex(indexName,indexAlias, clazz);
        if (!createIndexFlag) {
            return false;
        }
        //2.创建操作Request
        IndexRequest indexRequest = new IndexRequest(indexName);
        //3.配置相关信息
        indexRequest.source(jsonData, XContentType.JSON);
        //IMMEDIATE > 立即刷新
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        indexRequest.id(id);
        IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        //4.判断索引是新增还是修改
        if (IndexResponse.Result.CREATED.equals(response.getResult())) {
            log.info("索引【{}】保存成功",indexName);
            return true;
        } else if (IndexResponse.Result.UPDATED.equals(response.getResult())) {
            log.info("索引【{}】修改成功",indexName);
            return true;
        }
        return false;
    }
    /**
     * 判断索引是否存在
     * @param indexName 索引名称
     * @return
     */
    public boolean isIndexExists(String indexName) {
        try {
            GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
            return restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
        }catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
     * 创建索引设置相关配置信息
     * @param indexName 索引名称
     * @param indexAlias 索引别名
     * @param fieldMappingList 数据列表
     * @return
     * @throws IOException
     */
    private boolean createIndexAndCreateMapping(String indexName,String indexAlias, List<FieldMapping> fieldMappingList) throws IOException {
        //封装es索引的mapping
        XContentBuilder mapping = packEsMapping(fieldMappingList, null);
        mapping.endObject().endObject();
        mapping.close();
        //进行索引的创建
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
        //配置分词器
        XContentBuilder settings = packSettingMapping();
        XContentBuilder aliases = packEsAliases(indexAlias);
        log.info("索引配置脚本:{}",settings);
        log.info("索引字段内容:{}",mapping);
        createIndexRequest.settings(settings);
        createIndexRequest.mapping("_doc", mapping);
        createIndexRequest.aliases(aliases);
        //同步方式创建索引
        CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest,RequestOptions.DEFAULT);
        boolean acknowledged = createIndexResponse.isAcknowledged();
        if (acknowledged) {
            log.info("索引:{}创建成功", indexName);
            return true;
        } else {
            log.error("索引:{}创建失败", indexName);
            return false;
        }
    }
    /**
     * 配置ES别名
     * @author Kou Shenhai
     * @param alias 别名
     * @return
     * @throws IOException
     */
    private XContentBuilder packEsAliases(String alias) throws IOException{
        XContentBuilder aliases = XContentFactory.jsonBuilder().startObject()
                .startObject(alias).endObject();
        aliases.endObject();
        aliases.close();
        return aliases;
    }
    /**
     * 配置Mapping
     * @param fieldMappingList 组装的实体类信息
     * @param mapping
     * @return
     * @throws IOException
     */
    private XContentBuilder packEsMapping(List<FieldMapping> fieldMappingList,XContentBuilder mapping) throws IOException {
        if (mapping == null) {
            //如果对象是空,首次进入,设置开始结点
            mapping = XContentFactory.jsonBuilder().startObject()
                    .field("dynamic",true)
                    .startObject("properties");
        }
        //循环实体对象的类型集合封装ES的Mapping
        for (FieldMapping fieldMapping : fieldMappingList) {
            String field = fieldMapping.getField();
            String dataType = fieldMapping.getType();
            Integer participle = fieldMapping.getParticiple();
            //设置分词规则
            if (Constant.NOT_ANALYZED.equals(participle)) {
                if (FieldTypeEnum.DATE.getValue().equals(dataType)) {
                    mapping.startObject(field)
                            .field("type", dataType)
                            .field("format","yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
                            .endObject();
                } else {
                    mapping.startObject(field)
                            .field("type", dataType)
                            .endObject();
                }
            } else if (Constant.IK_INDEX.equals(participle)) {
                mapping.startObject(field)
                        .field("type",dataType)
                        .field("eager_global_ordinals",true)
                        //fielddata=true 用来解决text字段不能进行聚合操作
                        .field("fielddata",true)
                        .field("boost",100.0)
                        .field("analyzer","ik-index-synonym")
                        .field("search_analyzer","ik-search-synonym")
                        .startObject("fields").startObject("pinyin")
                        .field("term_vector", "with_positions_offsets")
                        .field("analyzer","ik-search-pinyin")
                        .field("type",dataType)
                        .field("boost",100.0)
                        .endObject().endObject()
                        .endObject();
            }
        }
        return mapping;
    }
    /**
     * 配置Settings
     * @return
     * @throws IOException
     */
    private XContentBuilder packSettingMapping() throws IOException {
        XContentBuilder setting = XContentFactory.jsonBuilder().startObject()
                .startObject("index")
                .field("number_of_shards",5)
                .field("number_of_replicas",1)
                .field("refresh_interval","120s")
                .endObject()
                .startObject("analysis");
        //ik分词 同义词 拼音
        setting.startObject("analyzer")
                .startObject("ik-search-pinyin")
                .field("type","custom")
                .field("tokenizer","ik_smart")
                .field("char_filter",new String[] {"html_strip"})
                .field("filter", new String[]{"laokou-pinyin","word_delimiter","lowercase", "asciifolding"})
                .endObject();
        setting.startObject("ik-index-synonym")
                .field("type","custom")
                .field("tokenizer","ik_max_word")
                .field("char_filter",new String[] {"html_strip"})
                .field("filter", new String[]{"laokou-remote-synonym"})
                .endObject();
        setting.startObject("ik-search-synonym")
                .field("type","custom")
                .field("tokenizer","ik_smart")
                .field("char_filter",new String[] {"html_strip"})
                .field("filter", new String[]{"laokou-remote-synonym"})
                .endObject();
        setting.endObject();
        //设置拼音分词器 同义词分词
        setting.startObject("filter")
                .startObject("laokou-pinyin")
                .field("type", "pinyin")
                .field("keep_first_letter", false)
                .field("keep_separate_first_letter", false)
                .field("keep_full_pinyin", true)
                .field("keep_original", false)
                .field("keep_joined_full_pinyin",true)
                .field("limit_first_letter_length", 16)
                .field("lowercase", true)
                .field("remove_duplicated_term", true)
                .endObject()
                .startObject("laokou-remote-synonym")
                .field("type","dynamic_synonym")
                .field("synonyms_path", synonymPath)
                .field("interval",120)
                .field("dynamic_reload",true)
                .endObject()
                .endObject();
        setting.endObject().endObject();
        setting.close();
        return setting;
    }
}


问题思考比如说,我有几条记录,文章记录,聊天记录,订单记录,它们是不同的索引,需要单独建立索引,怎么根据不同的数据类型来创建不同的索引?你会怎么做?


六、索引管理工具类


/**
 * 索引管理
 * @author Kou Shenhai 2413176044@leimingtech.com
 * @version 1.0
 * @date 2021/10/31 0031 上午 10:11
 */
public class FieldUtil {
    public static final String MESSAGE_INDEX = "message";
    private static final Map<String,Class<?>> classMap = new HashMap<>(16);
    static {
        classMap.put(FieldUtil.MESSAGE_INDEX, MessageIndex.class);
    }
    public static Class<?> getClazz(final String indexName) {
        return classMap.getOrDefault(indexName,Object.class);
    }
}


七、测试es


/**
 * Elasticsearch API 服务
 * @author Kou Shenhai 2413176044@leimingtech.com
 * @version 1.0
 * @date 2021/2/8 0008 下午 6:33
 */
@RestController
@RequestMapping("/api")
@Api(tags = "Elasticsearch API 服务")
public class ElasticsearchController {
    @Autowired
    private ElasticsearchUtil elasticsearchUtil;
    @PostMapping("/sync")
    @ApiOperation("同步数据到ES")
    @CrossOrigin
    public void syncIndex(@RequestBody final ElasticsearchModel model) throws IOException {
        String id = model.getId();
        String indexName = model.getIndexName();
        String indexAlias = model.getIndexAlias();
        String jsonData = model.getData();
        Class<?> clazz = FieldUtil.getClazz(indexAlias);
        elasticsearchUtil.saveData(id,indexName,indexAlias,jsonData,clazz);
    }
    @PostMapping("/batchSync")
    @ApiOperation("批量数据保存到ES-异步")
    @CrossOrigin
    public void batchSyncIndex(@RequestBody final ElasticsearchModel model) throws IOException {
        String indexName = model.getIndexName();
        String indexAlias = model.getIndexAlias();
        String jsonDataList = model.getData();
        Class<?> clazz = FieldUtil.getClazz(indexAlias);
        elasticsearchUtil.saveDataBatchSync(indexName,indexAlias,jsonDataList,clazz);
    }
    @PostMapping("/batch")
    @ApiOperation("批量同步数据到ES")
    @CrossOrigin
    public void saveBatchIndex(@RequestBody final ElasticsearchModel model) throws IOException {
        String indexName = model.getIndexName();
        String indexAlias = model.getIndexAlias();
        String jsonDataList = model.getData();
        Class<?> clazz = FieldUtil.getClazz(indexAlias);
        elasticsearchUtil.saveDataBatch(indexName,indexAlias,jsonDataList,clazz);
    }
    @GetMapping("/get")
    @ApiOperation("根据主键获取ES")
    @CrossOrigin
    @ApiImplicitParams({
            @ApiImplicitParam(name = "indexName",value = "索引名称",required = true,paramType = "query",dataType = "String"),
            @ApiImplicitParam(name = "id",value = "主键",required = true,paramType = "query",dataType = "String")
    })
    public HttpResultUtil<String> getDataById(@RequestParam("indexName")String indexName,@RequestParam("id")String id) {
        return new HttpResultUtil<String>().ok(elasticsearchUtil.getDataById(indexName,id));
    }
    @PutMapping("/batch")
    @ApiOperation("批量修改ES")
    @CrossOrigin
    public void updateDataBatch(@RequestBody final ElasticsearchModel model) throws IOException {
        String indexName = model.getIndexName();
        String indexAlias = model.getIndexAlias();
        String jsonDataList = model.getData();
        Class<?> clazz = FieldUtil.getClazz(indexAlias);
        elasticsearchUtil.updateDataBatch(indexName,indexAlias,jsonDataList,clazz);
    }
    @PutMapping("/sync")
    @ApiOperation("同步修改ES")
    @CrossOrigin
    public void updateData(@RequestBody final ElasticsearchModel model) {
        String id = model.getId();
        String indexName = model.getIndexName();
        String paramJson = model.getData();
        elasticsearchUtil.updateData(indexName,id,paramJson);
    }
    @DeleteMapping("/batch")
    @ApiOperation("批量删除ES")
    @CrossOrigin
    public void deleteDataBatch(@RequestParam("indexName")String indexName,@RequestParam("ids")List<String> ids) {
        elasticsearchUtil.deleteDataBatch(indexName,ids);
    }
    @DeleteMapping("/sync")
    @ApiOperation("同步删除ES")
    @CrossOrigin
    public void deleteData(@RequestParam("indexName")String indexName,@RequestParam("id")String id) {
        elasticsearchUtil.deleteData(indexName,id);
    }
    @DeleteMapping("/all")
    @ApiOperation("清空ES")
    @CrossOrigin
    public void deleteAll(@RequestParam("indexName")String indexName) {
        elasticsearchUtil.deleteAll(indexName);
    }
}


大功告成


补充:可根据自己的业务进行数据分区


111.png






相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
打赏
0
0
0
0
1
分享
相关文章
|
1月前
|
SpringBoot2.3.1集成Knife4j接口文档
SpringBoot2.3.1集成Knife4j接口文档
134 44
Shiro简介及SpringBoot集成Shiro(狂神说视频简易版)
Shiro简介及SpringBoot集成Shiro(狂神说视频简易版)
92 6
SpringBoot集成Ehcache缓存使用指南
以上是SpringBoot集成Ehcache缓存的基本操作指南,帮助你在实际项目中轻松实现缓存功能。当然,Ehcache还有诸多高级特性,通过学习和实践,你可以更好地发挥它的威力。
128 20
【Azure Application Insights】为Spring Boot应用集成Application Insight SDK
本文以Java Spring Boot项目为例,详细说明如何集成Azure Application Insights SDK以收集和展示日志。内容包括三步配置:1) 在`pom.xml`中添加依赖项`applicationinsights-runtime-attach`和`applicationinsights-core`;2) 在main函数中调用`ApplicationInsights.attach()`;3) 配置`applicationinsights.json`文件。同时提供问题排查建议及自定义日志方法示例,帮助用户顺利集成并使用Application Insights服务。
|
3月前
|
CentOS环境搭建Elasticsearch集群
至此,您已成功在CentOS环境下搭建了Elasticsearch集群。通过以上介绍和步骤,相信您对部署Elasticsearch集群有了充分的了解。最后祝您在使用Elasticsearch集群的过程中顺利开展工作!
166 22
|
4月前
|
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
151 0
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
151 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——发布/订阅消息的生产和消费
本文详细讲解了Spring Boot中ActiveMQ的发布/订阅消息机制,包括消息生产和消费的具体实现方式。生产端通过`sendMessage`方法发送订阅消息,消费端则需配置`application.yml`或自定义工厂以支持topic消息监听。为解决点对点与发布/订阅消息兼容问题,可通过设置`containerFactory`实现两者共存。最后,文章还提供了测试方法及总结,帮助读者掌握ActiveMQ在异步消息处理中的应用。
137 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ集成
本文介绍了在 Spring Boot 中集成 ActiveMQ 的详细步骤。首先通过引入 `spring-boot-starter-activemq` 依赖并配置 `application.yml` 文件实现基本设置。接着,创建 Queue 和 Topic 消息类型,分别使用 `ActiveMQQueue` 和 `ActiveMQTopic` 类完成配置。随后,利用 `JmsMessagingTemplate` 实现消息发送功能,并通过 Controller 和监听器实现点对点消息的生产和消费。最后,通过浏览器访问测试接口验证消息传递的成功性。
121 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装
本教程介绍ActiveMQ的安装与基本使用。首先从官网下载apache-activemq-5.15.3版本,解压后即可完成安装,非常便捷。启动时进入解压目录下的bin文件夹,根据系统选择win32或win64,运行activemq.bat启动服务。通过浏览器访问`http://127.0.0.1:8161/admin/`可进入管理界面,默认用户名密码为admin/admin。ActiveMQ支持两种消息模式:点对点(Queue)和发布/订阅(Topic)。前者确保每条消息仅被一个消费者消费,后者允许多个消费者同时接收相同消息。
107 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等