Elastic实战:通过spring data elasticsearch实现索引的CRUD;实现mysql全量/增量同步到ES

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: elasticsearch官方的java客户端有tranport client,rest high level client,但进行索引的增删改查的操作不够简便。因此我们引入spring data elasticsearch来实现索引的CRUD

0. 引言

elasticsearch官方的java客户端有tranport client,rest high level client,但进行索引的增删改查的操作不够简便。因此我们引入spring data elasticsearch来实现索引的CRUD

1. 版本对应关系

在引入spring data之前要先了解版本之间的对应关系,这个我们可以在spring data 官方文档中查询到
在这里插入图片描述
这里我的es用的7.14.0版本,所以需要引入spring data elasticsearch4.3.x版本的依赖

<dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-elasticsearch</artifactId>
            <version>4.3.0</version>
</dependency>

需要注意的是,springboot也整合了spring data

<dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

2. 实现CRUD

1、连接客户端配置,两种方式
(1)配置文件

spring: 
  elasticsearch:
    rest:
      uris: http://localhost:9200 # 多个地址用逗号隔开
      username: elastic # es开启了security的需要添加用户名和账户
      password: elastic # es开启了security的需要添加用户名和账户

(2)配置类,官方推荐的方式

import org.springframework.beans.factory.annotation.Autowired;@Configuration
static class Config {

    @Bean
    RestHighLevelClient client() {
        HttpHeaders headers = new HttpHeaders();
        headers.setBasicAuth("elastic","elastic");
        
        ClientConfiguration clientConfiguration = ClientConfiguration.builder()
            .connectedTo("localhost:9200")
            .withDefaultHeaders(headers) 
            .build();

        return RestClients.create(clientConfiguration).rest();
    }
}

2、创建实体类

/**
 * @author whx
 * @date 2022/1/6
 */
@Data
@Document(indexName = "user")
@Setting( 
    replicas = 0
)
@NoArgsConstructor
public class UserES implements Serializable {
    private static final long serialVersionUID = 1L;
    /**
     * 用户ID
     */
    @Id
    private Long id;
    /**
     * 用户编码
     */
    @Field(type=FieldType.Keyword)
    private String code;
    /**
     * 用户平台
     */
    @Field(type=FieldType.Long)
    private Long userType;
    /**
     * 账号
     */
    @Field(type=FieldType.Text)
    private String account;
    /**
     * 昵称
     */
    @Field(type=FieldType.Text)
    private String name;
    /**
     * 真名
     */
    @Field(type=FieldType.Text)
    private String realName;
    /**
     * 邮箱
     */
    @Field(type=FieldType.Text)
    private String email;
    /**
     * 手机
     */
    @Field(type=FieldType.Keyword)
    private String phone;
    /**
     * 生日
     */
    @Field(type=FieldType.Date)
    private Date birthday;
    /**
     * 性别
     */
    @Field(type=FieldType.Integer)
    private Integer sex;
    /**
     * 角色ID
     */
    @Field(type=FieldType.Long)
    private List<Long> roleIds;
    /**
     * 所在直系部门ID
     */
    @Field(type=FieldType.Keyword)
    private List<String> deptIds;
    /**
     * 岗位ID
     */
    @Field(type=FieldType.Long)
    private List<Long> postIds;
    /**
     * 所有父级部门ID
     */
    @Field(type=FieldType.Long)
    private List<String> parentDeptIds;
    /**
     * 平台类型(微信用户专用)
     */
    @Field(type = FieldType.Keyword)
    private String clientId;
    /**
     * 第三方平台Id(微信用户专用)
     */
    @Field(type= FieldType.Keyword)
    private String thirdPlatformUserId;
    /**
     * PC绑定用户ID
     */
    @Field(type=FieldType.Long)
    private String tenantUserId;
    /**
     * 用户来源:0 pc 1 wx
     */
    @Field(type=FieldType.Integer)
    private Integer userSource;
    /**
     * 租户
     */
    @Field(type=FieldType.Keyword)
    private String tenantId;
    /**
     * 创建人
     */
    @Field(type=FieldType.Long)
    private Long createUser;
    /**
     * 创建部门
     */
    @Field(type=FieldType.Keyword)
    private String createDept;
    /**
     * 创建时间
     */
    @Field(type=FieldType.Date)
    private Date createTime;
 
}

因为我这里还需要将mysql数据同步到es中,所以还需要在UserES类中创建转换方法。这里的实体类转换大家可根据具体自己的需求来书写,以下仅供参考

    public static UserES build(User user){
        UserES userES = Objects.requireNonNull(BeanUtil.copy(user, UserES.class));
        userES.userSource = 0;
        if(!StringUtils.isEmpty(user.getRoleId())){
            userES.roleIds = java.util.Arrays.stream(user.getRoleId().split(",")).map(Long::parseLong).collect(Collectors.toList());
        }
        if(!StringUtils.isEmpty(user.getPostId())){
            userES.postIds = java.util.Arrays.stream(user.getPostId().split(",")).map(Long::parseLong).collect(Collectors.toList());
        }
        if(!StringUtils.isEmpty(user.getDeptId())){
            userES.deptIds = java.util.Arrays.stream(user.getDeptId().split(",")).collect(Collectors.toList());
        }
        return userES;
    }

    public static UserES build(UserWxmini user){
        UserES userES = Objects.requireNonNull(BeanUtil.copy(user, UserES.class));
        userES.userSource = 1;
        userES.name = user.getNickName();
        return userES;
    }

    public static List<UserES> buildList(List<User> list){
        return list.stream().map(UserES::build).collect(Collectors.toList());
    }

    public static List<UserES> buildUserWxList(List<UserWxmini> list){
        return list.stream().map(UserES::build).collect(Collectors.toList());
    }

3、创建repository接口,可以看到只需要继承ElasticsearchCrudRepository接口即可

/**
 * 用户ES客户端
 * @author whx
 * @date 2022/1/6
 */
public interface UserRepositoryElastic extends ElasticsearchCrudRepository<UserES,Long> {

}

4、ElasticsearchCrudRepository接口已经自带了常用的CRUD方法,我们可以直接拿来用
在serviceImpl类中引入UserRepositoryElastic

5、ElasticsearchCrudRepository接口常用的CRUD方法

deleteById(id);
findById(id);
findAll();
findAllById(ids);
save(new UserES());
existsById(id);
count();

6、当启动出现如下报错时,可以参考这篇博客解决:
[Elastic: IllegalStateException: availableProcessors is already set to [8], rejecting [8]](https://blog.csdn.net/qq_24950043/article/details/122364272)

3. 如何自定义方法

3.1 通过spring data自带的语法来自动生成衍生方法

如:根据名称来查询

public interface UserRepositoryElastic extends ElasticsearchCrudRepository<UserES,Long> {
     
    Page<UserES> findByName(String name,Pageable page);
}

支持的语法有
在这里插入图片描述

3.2 通过@Query自定义查询

query中的就是查询的DSL语句,?0表示第一个参数

@Query("{"bool" : {"must" : {"field" : {"name" : "?0"}}}}")
Page<EsProduct> findByName(String name,Pageable pageable);

3.3 聚合与其他操作

spring data elasticsearch本身也集合了TransportClient和HighLevelRestClient。所以对于复杂的聚合查询和其他操作时,仍然可以使用原生的client来实现
示例,通过HighLevelRestClient来实现聚合

@Autowired
 private ElasticsearchTemplate elasticsearchTemplate;
 
 //聚合
    public Map<String, Integer> polymerizationQuery() {
        String aggName = "popularBrand";
        NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
        //聚合
        queryBuilder.addAggregation(AggregationBuilders.terms("popularBrand").field("brand"));
        //查询并返回带聚合结果
        AggregatedPage<Item> result = elasticsearchTemplate.queryForPage(queryBuilder.build(), Item.class);
        //解析聚合
        Aggregations aggregations = result.getAggregations();
        //获取指定名称的聚合
        StringTerms terms = aggregations.get(aggName);
        //获取桶
        List<StringTerms.Bucket> buckets = terms.getBuckets();
        //遍历打印
        Map<String, Integer> map = new HashMap<>();
        for (StringTerms.Bucket bucket : buckets) {
            map.put(bucket.getKeyAsString(), (int) bucket.getDocCount());
            System.out.println("key = " + bucket.getKeyAsString());
            System.out.println("DocCount = " + bucket.getDocCount());
        }
        return map;
    }

更多操作可以查看ES官方文档

针对复杂的操作更多的还是需要自己去实操才能熟练,如果有复杂DSL语句如果在java中是实现的问题,也可以留言告诉我,一起探讨。

4 mysql数据到es中

4.1 mysql全量同步至es中

全量同步应该只需要调用一次,后续的更新通过增量同步来实现

@Service
@AllArgsConstructor
public class UserServiceImpl extends BaseServiceImpl<UserMapper, User> implements IUserService { 
    private final UserRepositoryElastic userRepositoryElastic;

    @Override
    @TenantIgnore
    public R transferFromMysqlToEs(){
        // 查询所有用户数据 (这里是直接调用的mybatis-plus框架自带的selectList方法)
        List<User> users = baseMapper.selectList(Wrappers.lambdaQuery());
        // 将所有用户数据同步到es中
        userRepositoryElastic.saveAll(UserES.buildList(users));
        return R.success("操作成功");
    }
}

4.2 mysql增量同步到es中

使用spring data elasticsearch的增量同步,就是通过在原有的操作代码中插入针对es的操作,比如新增修改用户信息时,同步修改es中的数据

public R<Boolean> submit(User user){ 
        boolean res = this.saveOrUpdate(user);
        if(res){
            userRepositoryElastic.save(UserES.build(user));
        }
        return R.data(res);
}

删除时同步删除es中的数据

public R remove(List<Long> ids){
        baseMapper.deleteBatchIds(ids);
        ids.forEach(userRepositoryElastic::deleteById);
        return R.success("删除成功");
}

4.3 优缺点

优点:通过spring data elasticsearch来同步数据,因为是基于代码实现,所以可以实现比较复杂的转换逻辑,无需部署第三方插件。

缺点:代码入侵性强,如果需要同步的业务数据种类较多,那么就需要大量修改源码,工作量大。且会增加原始方法的耗时。

5. mysql同步到ES的其他同步方案

5.1 通过canal实现mysql同步到ES

安装:通过canal实现mysql同步到ES
优点:基于bin log来实现,保障性能,无代码入侵。且可以通过自定义代码来实现数据转换。
缺点:需要安装和维护canal。有一致性要求的数据,需要做好canal集群的高可用。未开启binlog之前的历史数据无法实现全量同步,这一点可以通过logstash来补足

5.2 通过logstash-input-jdbc实现mysql同步到ES

安装:通过logstash-input-jdbc实现mysql同步到ES
优点:ELK体系下logstash的来实现,如果本身在使用ELK则无较大的部署成本,支持全量增量同步,无需开启bin log
缺点:需要安装和维护logstash,性能不如canal

5.3 推荐方案

使用logstash-input-jdbc来实现全量同步,canal来实现增量同步。

如果想用canal来实现全量+增量同步,那么可以将未开启binlog之前的数据重新导出再导入一遍,以此生成binlog,从而实现全量同

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
4天前
|
XML Java 测试技术
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
这篇文章介绍了Spring5框架的三个新特性:支持@Nullable注解以明确方法返回、参数和属性值可以为空;引入函数式风格的GenericApplicationContext进行对象注册和管理;以及如何整合JUnit5进行单元测试,同时讨论了JUnit4与JUnit5的整合方法,并提出了关于配置文件加载的疑问。
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
|
1天前
|
SQL XML Java
Spring5入门到实战------12、使用JdbcTemplate操作数据库(增删改查)。具体代码+讲解 【上篇】
这篇文章是Spring5框架的实战教程,详细讲解了如何使用JdbcTemplate进行数据库的增删改查操作,包括在项目中引入依赖、配置数据库连接池、创建实体类、定义DAO接口及其实现,并提供了具体的代码示例和测试结果,最后还提供了完整的XML配置文件和测试代码。
Spring5入门到实战------12、使用JdbcTemplate操作数据库(增删改查)。具体代码+讲解 【上篇】
|
1天前
|
NoSQL Java Redis
Redis6入门到实战------ 八、Redis与Spring Boot整合
这篇文章详细介绍了如何在Spring Boot项目中整合Redis,包括在`pom.xml`中添加依赖、配置`application.properties`文件、创建配置类以及编写测试类来验证Redis的连接和基本操作。
Redis6入门到实战------ 八、Redis与Spring Boot整合
|
4天前
|
SQL 数据库
Spring5入门到实战------13、使用JdbcTemplate操作数据库(批量增删改)。具体代码+讲解 【下篇】
这篇文章是Spring5框架的实战教程,深入讲解了如何使用JdbcTemplate进行数据库的批量操作,包括批量添加、批量修改和批量删除的具体代码实现和测试过程,并通过完整的项目案例展示了如何在实际开发中应用这些技术。
Spring5入门到实战------13、使用JdbcTemplate操作数据库(批量增删改)。具体代码+讲解 【下篇】
|
4天前
|
XML Java Maven
Spring5入门到实战------16、Spring5新功能 --整合日志框架(Log4j2)
这篇文章是Spring5框架的入门到实战教程,介绍了Spring5的新功能——整合日志框架Log4j2,包括Spring5对日志框架的通用封装、如何在项目中引入Log4j2、编写Log4j2的XML配置文件,并通过测试类展示了如何使用Log4j2进行日志记录。
Spring5入门到实战------16、Spring5新功能 --整合日志框架(Log4j2)
|
4天前
|
XML Java 数据库
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式
这篇文章是Spring5框架的实战教程,详细介绍了事务的概念、ACID特性、事务操作的场景,并通过实际的银行转账示例,演示了Spring框架中声明式事务管理的实现,包括使用注解和XML配置两种方式,以及如何配置事务参数来控制事务的行为。
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式
|
4天前
|
XML 数据库 数据格式
Spring5入门到实战------14、完全注解开发形式 ----JdbcTemplate操作数据库(增删改查、批量增删改)。具体代码+讲解 【终结篇】
这篇文章是Spring5框架的实战教程的终结篇,介绍了如何使用注解而非XML配置文件来实现JdbcTemplate的数据库操作,包括增删改查和批量操作,通过创建配置类来注入数据库连接池和JdbcTemplate对象,并展示了完全注解开发形式的项目结构和代码实现。
Spring5入门到实战------14、完全注解开发形式 ----JdbcTemplate操作数据库(增删改查、批量增删改)。具体代码+讲解 【终结篇】
|
4天前
|
SQL XML Java
Spring5入门到实战------12、使用JdbcTemplate操作数据库(增删改查)。具体代码+讲解 【上篇】
这篇文章是Spring5框架的实战教程,详细讲解了如何使用JdbcTemplate进行数据库的增删改查操作,包括在项目中引入依赖、配置数据库连接池、创建实体类、定义DAO接口及其实现,并提供了具体的代码示例和测试结果,最后还提供了完整的XML配置文件和测试代码。
Spring5入门到实战------12、使用JdbcTemplate操作数据库(增删改查)。具体代码+讲解 【上篇】
|
1天前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
28 0
|
1天前
|
数据可视化 Docker 容器
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
这篇文章提供了通过Docker安装Elasticsearch和Kibana的详细过程和图解,包括下载镜像、创建和启动容器、处理可能遇到的启动失败情况(如权限不足和配置文件错误)、测试Elasticsearch和Kibana的连接,以及解决空间不足的问题。文章还特别指出了配置文件中空格的重要性以及环境变量中字母大小写的问题。
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】

推荐镜像

更多