实战 | 使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 前面我们介绍了Spring Boot 整合 Elasticsearch 实现数据查询检索的功能,在实际项目中,我们的数据一般存储在数据库中,而且随着业务的发送,数据也会随时变化。那么如何保证数据库中的数据与Elasticsearch存储的索引数据保持一致呢? 最原始的方案就是:当数据发生增删改操作时同步更新Elasticsearch。但是这样的设计耦合太高。接下来我们介绍一种非常简单的数据同步方式:Logstash 数据同步。

前面我们介绍了Spring Boot 整合 Elasticsearch 实现数据查询检索的功能,在实际项目中,我们的数据一般存储在数据库中,而且随着业务的发送,数据也会随时变化。


那么如何保证数据库中的数据与Elasticsearch存储的索引数据保持一致呢? 最原始的方案就是:当数据发生增删改操作时同步更新Elasticsearch。但是这样的设计耦合太高。接下来我们介绍一种非常简单的数据同步方式:Logstash 数据同步。

一、Logstash简介

1.什么是Logstash

logstash是一个开源的服务器端数据处理工具。简单来说,就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。

Logstash常用于日志系统中做日志采集设备,最常用于ELK中作为日志收集器使用。


2.Logstash的架构原理

Logstash的基本流程架构:input=》  filter =》 output 。

  • input(输入):采集各种样式,大小和来源数据,从各个服务器中收集数据。常用的有:jdbc、file、syslog、redis等。
  • filter(过滤器)负责数据处理与转换。主要是将event通过output发出之前对其实现某些处理功能。
  • output(输出):将我们过滤出的数据保存到那些数据库和相关存储中,。

image.png



3.Logstash如何与Elasticsearch数据同步

实际项目中,我们不可能通过手动添加的方式将数据插入索引库,所以需要借助第三方工具,将数据库的数据同步到索引库。此时,Logstash出现了,它可以将不同数据库的数据同步到Elasticsearch中。保证数据库与Elasticsearch的数据保持一致。

image.png


目前支持数据库与ES数据同步的插件有很多,个人认为Logstash是众多同步mysql数据到es的插件中,最稳定并且最容易配置的一个。


二、安装Logstash

Logstash的使用方法也很简单,下面讲解一下,Logstash是如何使用的。需要说明的是:这里以windows 环境为例,演示Logstash的安装和配置。

1.下载Logstash

首先,下载对应版本的Logstash包,可以通过上面提供下载elasticsearch的地址进行下载,完成后解压。

image.png

上面是Logstash解压后的目录,我们需要关注是bin目录中的执行文件和config中的配置文件。一般生产情况下,会使用Linux服务器,并且会将Logstash配置成自启动的服务。这里测试的话,直接启动。


2.配置Logstash

接下来,配置Logstash。需要我们编写配置文件,根据官网和网上提供的配置文件,将其进行修改。

第一步:在Logstash根目录下创建mysql文件夹,添加mysql.conf配置文件,配置Logstash需要的相应信息,具体配置如下:

input {
    stdin {
    }
    jdbc {
      # mysql数据库连接
      jdbc_connection_string => "jdbc:mysql://localhost:3306/book_test?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
      # mysqly用户名和密码
      jdbc_user => "root"
      jdbc_password => "root"
      # 驱动配置
      jdbc_driver_library => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql-connector-java-8.0.20.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      #jdbc_paging_enabled => "true"
      #jdbc_page_size => "50000"
    jdbc_default_timezone => "Asia/Shanghai"
      # 执行指定的sql文件
      statement_filepath => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\bookquery.sql"
    use_column_value => true
    # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
    lowercase_column_names => false
    # 需要记录的字段,用于增量同步,需是数据库字段
    tracking_column => updatetime
    # Value can be any of: numeric,timestamp,Default value is "numeric"
    tracking_column_type => timestamp
    # record_last_run上次数据存放位置;
    record_last_run => true
    #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
    last_run_metadata_path => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\logstash_default_last_time.log"
    # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
    clean_run => false
      # 设置监听 各字段含义 分 时 天 月  年 ,默认全部为*代表含义:每分钟都更新
      schedule => "* * * * *"
      # 索引类型
      type => "id"
    }
}
output {
    elasticsearch {
        #es服务器
        hosts => ["10.2.1.231:9200"]
        #ES索引名称
        index => "book"
        #自增ID
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}


第二步:将mysql-connector-java.jar 拷贝到前面配置的目录下。上面的mysql.conf配置的是:C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql-connector-java-8.0.20.jar。那么jar包拷贝到此目录下即可:

image.png

上面是mysql的驱动,如果是sqlserver数据库,下载SqlServer对应的驱动即可。放置的位置要与mysql.conf 配置文件中的jdbc_driver_library 地址保持一致。


第三步:创建sql目录,创建bookquery.sql文件用于保存需要执行的sql 脚本。示例代码如下:

select * from book where updatetime >= :sql_last_value
order by updatetime desc

这里使用的增量更新,所以使用:sql_last_value 记录上一次记录的最后时间。


3.启动Logstash

进入logstash的bin目录,执行如下命令:

logstash.bat -f C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql.conf

启动成功之后,Logstash就会自动定时将数据写入到Elasticsearch。如下图所示:

image.png

同步完成后,我们使用Postman查询Elasticsearch,验证索引是否都创建成功。在postman中,发送 Get 请求:http://10.2.1.231:9200/book/_search 。返回结果如下图所示:

image.png

可以看到,数据库中的数据已经通过Logstash同步至Elasticsearch。说明Logstash配置成功。


三、创建查询服务

数据同步完成后,接下来我们使用Spring Boot 构建Elasticsearch查询服务。首先创建Spring Boot项目并整合Elasticsearch,这个之前都已经介绍过,不清楚的朋友可以看我之前的文章。

接下来演示如何封装完整的数据查询服务。

1.数据实体

@Document( indexName = "book" , replicas = 0)
public class Book {
    @Id
    private Long id;
    @Field(analyzer = "ik_max_word",type = FieldType.Text)
    private String bookName;
    @Field(analyzer = "ik_max_word",type = FieldType.Text)
    private String author;
    private float price;
    private int page;
    @Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
    private Date createTime;
    @Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
    private Date updateTime;
    @Field(analyzer = "ik_max_word",type = FieldType.Text)
    private String category;
    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getBookName() {
        return bookName;
    }
    public void setBookName(String bookName) {
        this.bookName = bookName;
    }
    public String getAuthor() {
        return author;
    }
    public void setAuthor(String author) {
        this.author = author;
    }
    public float getPrice() {
        return price;
    }
    public void setPrice(float price) {
        this.price = price;
    }
    public int getPage() {
        return page;
    }
    public void setPage(int page) {
        this.page = page;
    }
    public String getCategory() {
        return category;
    }
    public void setCategory(String category) {
        this.category = category;
    }
    public Book(){
    }
    public Date getCreateTime() {
        return createTime;
    }
    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }
    public Date getUpdateTime() {
        return updateTime;
    }
    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }
}


2.请求封装类

public class BookQuery {
    public String category;
    public String bookName;
    public String author;
    public int priceMin;
    public int priceMax;
    public int pageMin;
    public int pageMax;
    public String sort;
    public String sortType;
    public int page;
    public int limit;
}


3.创建Controller控制器

@RestController
public class ElasticSearchController {
    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;
    /**
     * 查询信息
     * @param
     * @return
     */
    @PostMapping(value = "/book/query")
    public JSONResult query(@RequestBody BookQuery bookQuery){
        Query query= getQueryBuilder(bookQuery);
        SearchHits<Book> searchHits = elasticsearchRestTemplate.search(query, Book.class);
        List<SearchHit<Book>> result = searchHits.getSearchHits();
        return JSONResult.ok(result);
    }
    public Query getQueryBuilder(BookQuery query) {
               BoolQueryBuilder builder = boolQuery();
        // 匹配器 模糊查询部分,分析器使用ik (ik_max_word)
        List<QueryBuilder> must = builder.must();
        if (query.getBookName()!=null && !query.getBookName().isEmpty())
            must.add(wildcardQuery("bookName", "*" +query.getBookName()+ "*"));
        if (query.getCategory()!=null && !query.getCategory().isEmpty())
            must.add(wildcardQuery("category", "*" +query.getCategory()+ "*"));
        if (query.getAuthor()!=null && !query.getAuthor().isEmpty())
            must.add(wildcardQuery("author", "*" +query.getAuthor()+ "*"));
        // 筛选器 精确查询部分
        List<QueryBuilder> filter = builder.filter();
        // 范围查询
        if (query.getPriceMin()>0 && query.getPriceMax()>0) {
            RangeQueryBuilder price = rangeQuery("price").gte(query.getPriceMin()).lte(query.getPriceMax());
            filter.add(price);
        }
        // 范围查询
        if (query.getPageMin()>0 && query.getPageMax()>0) {
            RangeQueryBuilder page = rangeQuery("page").gte(query.getPageMin()).lte(query.getPageMax());
            filter.add(page);
        }
        // 分页
        PageRequest pageable = PageRequest.of(query.getPage() - 1, query.getLimit());
        // 排序
        SortBuilder sort = SortBuilders.fieldSort("price").order(SortOrder.DESC);
        //设置高亮效果
        String preTag = "<font color='#dd4b39'>";//google的色值
        String postTag = "</font>";
        HighlightBuilder.Field highlightFields = new HighlightBuilder.Field("category").preTags(preTag).postTags(postTag);
        Query searchQuery = new NativeSearchQueryBuilder()
                .withQuery(builder)
                .withHighlightFields(highlightFields)
                .withPageable(pageable)
                .withSort(sort)
                .build();
        return searchQuery;
    }
}


4.测试验证

启动项目,在Postman中,请求http://localhost:8080/book/query 接口查询书籍信息数据。查看接口返回情况。

image.png

我们看到接口成功返回数据。说明数据查询服务创建成功。



最后

以上,我们就把使用Spring Boot + Elasticsearch + Logstash 实现完整的数据查询检索服务介绍完了。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
4天前
|
安全 Java 测试技术
Spring Boot集成支付宝支付:概念与实战
【4月更文挑战第29天】在电子商务和在线业务应用中,集成有效且安全的支付解决方案是至关重要的。支付宝作为中国领先的支付服务提供商,其支付功能的集成可以显著提升用户体验。本篇博客将详细介绍如何在Spring Boot应用中集成支付宝支付功能,并提供一个实战示例。
19 2
|
4天前
|
Java 关系型数据库 数据库
Spring Boot多数据源及事务管理:概念与实战
【4月更文挑战第29天】在复杂的企业级应用中,经常需要访问和管理多个数据源。Spring Boot通过灵活的配置和强大的框架支持,可以轻松实现多数据源的整合及事务管理。本篇博客将探讨如何在Spring Boot中配置多数据源,并详细介绍事务管理的策略和实践。
24 3
|
2天前
|
Java 调度 Maven
Springboot实战篇--Springboot框架通过@Scheduled实现定时任务
Spring Boot的Scheduled定时任务无需额外Maven依赖,通过`@EnableScheduling`开启。任务调度有两种方式:fixedRate和fixedDelay,前者任务结束后立即按设定间隔执行,后者在任务完成后等待设定时间再执行。更灵活的是cron表达式,例如`0 0 3 * * ?`表示每天3点执行。实现定时任务时,需注意默认单线程执行可能导致的任务交错,可通过自定义线程池解决。
|
3天前
|
XML Java API
Spring Boot 整合 LiteFlow 规则引擎:概念与实战
【4月更文挑战第30天】在现代软件开发中,规则引擎允许我们以声明式的方式定义业务逻辑和决策路径。LiteFlow 是一个轻量级、易于使用的组件式规则引擎,它可以与 Spring Boot 应用无缝整合。本文将介绍如何在 Spring Boot 项目中引入 LiteFlow,实现灵活的业务流程管理。
15 0
|
3天前
|
开发框架 Java Maven
SpringBoot-Starter 概念与实战
【4月更文挑战第30天】Spring Boot 是一个基于 Spring Framework 的开发框架,旨在简化 Spring 应用程序的搭建和开发。Spring Boot 提供了大量的 Starter(启动器)来简化项目的依赖管理和配置,其中最为常见的是 SpringBoot-Starter。
15 1
|
4天前
|
安全 Java 测试技术
利用Java反射机制提高Spring Boot的代码质量:概念与实战
【4月更文挑战第29天】Java反射机制提供了一种强大的方法来在运行时检查或修改类和对象的行为。在Spring Boot应用中,合理利用反射可以提高代码的灵活性和可维护性。本篇博客将探讨Java反射的核心概念,并展示如何通过反射提高Spring Boot项目的代码质量。
20 0
|
4天前
|
监控 Java 测试技术
Spring Boot与事务钩子函数:概念与实战
【4月更文挑战第29天】在复杂的业务逻辑中,事务管理是确保数据一致性和完整性的关键。Spring Boot提供了强大的事务管理机制,其中事务钩子函数(Transaction Hooks)允许开发者在事务的不同阶段插入自定义逻辑。本篇博客将详细探讨事务钩子函数的概念及其在Spring Boot中的应用。
14 1
|
4天前
|
安全 Java 数据安全/隐私保护
Spring Boot优雅实现多租户架构:概念与实战
【4月更文挑战第29天】在多租户系统中,一个应用实例服务于多个租户,每个租户享有独立的数据视图,而应用的基础设施被共享。这样的架构不仅优化了资源使用,还能降低维护和运营成本。本文将详细介绍如何在Spring Boot中实现多租户架构,并提供具体的实战案例。
26 2
|
4天前
|
前端开发 Java 数据安全/隐私保护
Spring Boot使用拦截器:概念与实战
【4月更文挑战第29天】拦截器(Interceptors)在Spring Boot应用中常用于在请求处理的前后执行特定的代码,如日志记录、认证校验、权限控制等。本篇博客将详细介绍Spring Boot中拦截器的概念及其实战应用,帮助开发者理解和利用拦截器来增强应用的功能。
14 0
|
4天前
|
Java 调度 开发者
Spring Boot与定时任务:整合与实战
【4月更文挑战第29天】定时任务是现代应用中常见的需求,用于执行周期性的活动,如数据备份、报告生成等。Spring Boot通过集成Spring Task的功能,提供了一种简单有效的方式来调度和执行定时任务。
18 1