Elastic实战:通过spring data elasticsearch实现索引的CRUD;实现mysql全量/增量同步到ES

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: elasticsearch官方的java客户端有tranport client,rest high level client,但进行索引的增删改查的操作不够简便。因此我们引入spring data elasticsearch来实现索引的CRUD

0. 引言

elasticsearch官方的java客户端有tranport client,rest high level client,但进行索引的增删改查的操作不够简便。因此我们引入spring data elasticsearch来实现索引的CRUD

1. 版本对应关系

在引入spring data之前要先了解版本之间的对应关系,这个我们可以在spring data 官方文档中查询到
在这里插入图片描述
这里我的es用的7.14.0版本,所以需要引入spring data elasticsearch4.3.x版本的依赖

<dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-elasticsearch</artifactId>
            <version>4.3.0</version>
</dependency>

需要注意的是,springboot也整合了spring data

<dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

2. 实现CRUD

1、连接客户端配置,两种方式
(1)配置文件

spring: 
  elasticsearch:
    rest:
      uris: http://localhost:9200 # 多个地址用逗号隔开
      username: elastic # es开启了security的需要添加用户名和账户
      password: elastic # es开启了security的需要添加用户名和账户

(2)配置类,官方推荐的方式

import org.springframework.beans.factory.annotation.Autowired;@Configuration
static class Config {

    @Bean
    RestHighLevelClient client() {
        HttpHeaders headers = new HttpHeaders();
        headers.setBasicAuth("elastic","elastic");
        
        ClientConfiguration clientConfiguration = ClientConfiguration.builder()
            .connectedTo("localhost:9200")
            .withDefaultHeaders(headers) 
            .build();

        return RestClients.create(clientConfiguration).rest();
    }
}

2、创建实体类

/**
 * @author whx
 * @date 2022/1/6
 */
@Data
@Document(indexName = "user")
@Setting( 
    replicas = 0
)
@NoArgsConstructor
public class UserES implements Serializable {
    private static final long serialVersionUID = 1L;
    /**
     * 用户ID
     */
    @Id
    private Long id;
    /**
     * 用户编码
     */
    @Field(type=FieldType.Keyword)
    private String code;
    /**
     * 用户平台
     */
    @Field(type=FieldType.Long)
    private Long userType;
    /**
     * 账号
     */
    @Field(type=FieldType.Text)
    private String account;
    /**
     * 昵称
     */
    @Field(type=FieldType.Text)
    private String name;
    /**
     * 真名
     */
    @Field(type=FieldType.Text)
    private String realName;
    /**
     * 邮箱
     */
    @Field(type=FieldType.Text)
    private String email;
    /**
     * 手机
     */
    @Field(type=FieldType.Keyword)
    private String phone;
    /**
     * 生日
     */
    @Field(type=FieldType.Date)
    private Date birthday;
    /**
     * 性别
     */
    @Field(type=FieldType.Integer)
    private Integer sex;
    /**
     * 角色ID
     */
    @Field(type=FieldType.Long)
    private List<Long> roleIds;
    /**
     * 所在直系部门ID
     */
    @Field(type=FieldType.Keyword)
    private List<String> deptIds;
    /**
     * 岗位ID
     */
    @Field(type=FieldType.Long)
    private List<Long> postIds;
    /**
     * 所有父级部门ID
     */
    @Field(type=FieldType.Long)
    private List<String> parentDeptIds;
    /**
     * 平台类型(微信用户专用)
     */
    @Field(type = FieldType.Keyword)
    private String clientId;
    /**
     * 第三方平台Id(微信用户专用)
     */
    @Field(type= FieldType.Keyword)
    private String thirdPlatformUserId;
    /**
     * PC绑定用户ID
     */
    @Field(type=FieldType.Long)
    private String tenantUserId;
    /**
     * 用户来源:0 pc 1 wx
     */
    @Field(type=FieldType.Integer)
    private Integer userSource;
    /**
     * 租户
     */
    @Field(type=FieldType.Keyword)
    private String tenantId;
    /**
     * 创建人
     */
    @Field(type=FieldType.Long)
    private Long createUser;
    /**
     * 创建部门
     */
    @Field(type=FieldType.Keyword)
    private String createDept;
    /**
     * 创建时间
     */
    @Field(type=FieldType.Date)
    private Date createTime;
 
}

因为我这里还需要将mysql数据同步到es中,所以还需要在UserES类中创建转换方法。这里的实体类转换大家可根据具体自己的需求来书写,以下仅供参考

    public static UserES build(User user){
        UserES userES = Objects.requireNonNull(BeanUtil.copy(user, UserES.class));
        userES.userSource = 0;
        if(!StringUtils.isEmpty(user.getRoleId())){
            userES.roleIds = java.util.Arrays.stream(user.getRoleId().split(",")).map(Long::parseLong).collect(Collectors.toList());
        }
        if(!StringUtils.isEmpty(user.getPostId())){
            userES.postIds = java.util.Arrays.stream(user.getPostId().split(",")).map(Long::parseLong).collect(Collectors.toList());
        }
        if(!StringUtils.isEmpty(user.getDeptId())){
            userES.deptIds = java.util.Arrays.stream(user.getDeptId().split(",")).collect(Collectors.toList());
        }
        return userES;
    }

    public static UserES build(UserWxmini user){
        UserES userES = Objects.requireNonNull(BeanUtil.copy(user, UserES.class));
        userES.userSource = 1;
        userES.name = user.getNickName();
        return userES;
    }

    public static List<UserES> buildList(List<User> list){
        return list.stream().map(UserES::build).collect(Collectors.toList());
    }

    public static List<UserES> buildUserWxList(List<UserWxmini> list){
        return list.stream().map(UserES::build).collect(Collectors.toList());
    }

3、创建repository接口,可以看到只需要继承ElasticsearchCrudRepository接口即可

/**
 * 用户ES客户端
 * @author whx
 * @date 2022/1/6
 */
public interface UserRepositoryElastic extends ElasticsearchCrudRepository<UserES,Long> {

}

4、ElasticsearchCrudRepository接口已经自带了常用的CRUD方法,我们可以直接拿来用
在serviceImpl类中引入UserRepositoryElastic

5、ElasticsearchCrudRepository接口常用的CRUD方法

deleteById(id);
findById(id);
findAll();
findAllById(ids);
save(new UserES());
existsById(id);
count();

3. 如何自定义方法

3.1 通过spring data自带的语法来自动生成衍生方法

如:根据名称来查询

public interface UserRepositoryElastic extends ElasticsearchCrudRepository<UserES,Long> {
     
    Page<UserES> findByName(String name,Pageable page);
}

支持的语法有
在这里插入图片描述

3.2 通过@Query自定义查询

query中的就是查询的DSL语句,?0表示第一个参数

@Query("{"bool" : {"must" : {"field" : {"name" : "?0"}}}}")
Page<EsProduct> findByName(String name,Pageable pageable);

3.3 聚合与其他操作

spring data elasticsearch本身也集合了TransportClient和HighLevelRestClient。所以对于复杂的聚合查询和其他操作时,仍然可以使用原生的client来实现
示例,通过HighLevelRestClient来实现聚合

@Autowired
 private ElasticsearchTemplate elasticsearchTemplate;
 
 //聚合
    public Map<String, Integer> polymerizationQuery() {
        String aggName = "popularBrand";
        NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
        //聚合
        queryBuilder.addAggregation(AggregationBuilders.terms("popularBrand").field("brand"));
        //查询并返回带聚合结果
        AggregatedPage<Item> result = elasticsearchTemplate.queryForPage(queryBuilder.build(), Item.class);
        //解析聚合
        Aggregations aggregations = result.getAggregations();
        //获取指定名称的聚合
        StringTerms terms = aggregations.get(aggName);
        //获取桶
        List<StringTerms.Bucket> buckets = terms.getBuckets();
        //遍历打印
        Map<String, Integer> map = new HashMap<>();
        for (StringTerms.Bucket bucket : buckets) {
            map.put(bucket.getKeyAsString(), (int) bucket.getDocCount());
            System.out.println("key = " + bucket.getKeyAsString());
            System.out.println("DocCount = " + bucket.getDocCount());
        }
        return map;
    }

更多操作可以查看ES官方文档

针对复杂的操作更多的还是需要自己去实操才能熟练,如果有复杂DSL语句如果在java中是实现的问题,也可以留言告诉我,一起探讨。

4 mysql数据到es中

4.1 mysql全量同步至es中

全量同步应该只需要调用一次,后续的更新通过增量同步来实现

@Service
@AllArgsConstructor
public class UserServiceImpl extends BaseServiceImpl<UserMapper, User> implements IUserService { 
    private final UserRepositoryElastic userRepositoryElastic;

    @Override
    @TenantIgnore
    public R transferFromMysqlToEs(){
        // 查询所有用户数据 (这里是直接调用的mybatis-plus框架自带的selectList方法)
        List<User> users = baseMapper.selectList(Wrappers.lambdaQuery());
        // 将所有用户数据同步到es中
        userRepositoryElastic.saveAll(UserES.buildList(users));
        return R.success("操作成功");
    }
}

4.2 mysql增量同步到es中

使用spring data elasticsearch的增量同步,就是通过在原有的操作代码中插入针对es的操作,比如新增修改用户信息时,同步修改es中的数据

public R<Boolean> submit(User user){ 
        boolean res = this.saveOrUpdate(user);
        if(res){
            userRepositoryElastic.save(UserES.build(user));
        }
        return R.data(res);
}

删除时同步删除es中的数据

public R remove(List<Long> ids){
        baseMapper.deleteBatchIds(ids);
        ids.forEach(userRepositoryElastic::deleteById);
        return R.success("删除成功");
}

4.3 优缺点

优点:通过spring data elasticsearch来同步数据,因为是基于代码实现,所以可以实现比较复杂的转换逻辑,无需部署第三方插件。

缺点:代码入侵性强,如果需要同步的业务数据种类较多,那么就需要大量修改源码,工作量大。且会增加原始方法的耗时。

5. mysql同步到ES的其他同步方案

5.1 通过canal实现mysql同步到ES

安装:通过canal实现mysql同步到ES
优点:基于bin log来实现,保障性能,无代码入侵。且可以通过自定义代码来实现数据转换。
缺点:需要安装和维护canal。有一致性要求的数据,需要做好canal集群的高可用。未开启binlog之前的历史数据无法实现全量同步,这一点可以通过logstash来补足

5.2 通过logstash-input-jdbc实现mysql同步到ES

安装:通过logstash-input-jdbc实现mysql同步到ES
优点:ELK体系下logstash的来实现,如果本身在使用ELK则无较大的部署成本,支持全量增量同步,无需开启bin log
缺点:需要安装和维护logstash,性能不如canal

5.3 推荐方案

使用logstash-input-jdbc来实现全量同步,canal来实现增量同步。

如果想用canal来实现全量+增量同步,那么可以将未开启binlog之前的数据重新导出再导入一遍,以此生成binlog,从而实现全量同

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
25天前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
140 0
|
24天前
|
存储 关系型数据库 MySQL
阿里面试:为什么要索引?什么是MySQL索引?底层结构是什么?
尼恩是一位资深架构师,他在自己的读者交流群中分享了关于MySQL索引的重要知识点。索引是帮助MySQL高效获取数据的数据结构,主要作用包括显著提升查询速度、降低磁盘I/O次数、优化排序与分组操作以及提升复杂查询的性能。MySQL支持多种索引类型,如主键索引、唯一索引、普通索引、全文索引和空间数据索引。索引的底层数据结构主要是B+树,它能够有效支持范围查询和顺序遍历,同时保持高效的插入、删除和查找性能。尼恩还强调了索引的优缺点,并提供了多个面试题及其解答,帮助读者在面试中脱颖而出。相关资料可在公众号【技术自由圈】获取。
|
15天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
77 1
|
21天前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
48 5
|
24天前
|
自然语言处理 Java API
Spring Boot 接入大模型实战:通义千问赋能智能应用快速构建
【10月更文挑战第23天】在人工智能(AI)技术飞速发展的今天,大模型如通义千问(阿里云推出的生成式对话引擎)等已成为推动智能应用创新的重要力量。然而,对于许多开发者而言,如何高效、便捷地接入这些大模型并构建出功能丰富的智能应用仍是一个挑战。
94 6
|
25天前
|
存储 关系型数据库 MySQL
如何在MySQL中进行索引的创建和管理?
【10月更文挑战第16天】如何在MySQL中进行索引的创建和管理?
54 1
|
27天前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
69 2
|
1月前
|
架构师 关系型数据库 MySQL
MySQL最左前缀优化原则:深入解析与实战应用
【10月更文挑战第12天】在数据库架构设计与优化中,索引的使用是提升查询性能的关键手段之一。其中,MySQL的最左前缀优化原则(Leftmost Prefix Principle)是复合索引(Composite Index)应用中的核心策略。作为资深架构师,深入理解并掌握这一原则,对于平衡数据库性能与维护成本至关重要。本文将详细解读最左前缀优化原则的功能特点、业务场景、优缺点、底层原理,并通过Java示例展示其实现方式。
68 1
|
16天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第26天】数据库作为现代应用系统的核心组件,其性能优化至关重要。本文主要探讨MySQL的索引策略与查询性能调优。通过合理创建索引(如B-Tree、复合索引)和优化查询语句(如使用EXPLAIN、优化分页查询),可以显著提升数据库的响应速度和稳定性。实践中还需定期审查慢查询日志,持续优化性能。
47 0
|
27天前
|
监控 关系型数据库 MySQL
mysql8索引优化
综上所述,深入理解和有效实施这些索引优化策略,是解锁MySQL 8.0数据库高性能查询的关键。
28 0

推荐镜像

更多