Springboot整合Elasticsearch、搭建Logstash同步数据

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 个人学习笔记——ES

整合Springboot

通过虚拟机搭建ES,这里使用的版本是6.4.3,引入相应依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    # 使用2.2.2是为了对应ES的版本
    <version>2.2.2.RELEASE</version>
</dependency>

引入依赖之后可以自行查看

spring:
  data:
    elasticsearch:
      # 在es中配置的名称
      cluster-name: es
      # 如果是集群,用,分隔
      cluster-nodes: 192.168.1.7:9300

测试实体类

package com.csea.entity;

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;

/**
 * @author Csea
 * @title
 */
@Data
@Document(indexName = "merchant", type = "doc")
public class Merchant {

    @Id
    private Long merchantId;
    // store = true 表示这是要存储的字段
    @Field(store = true)
    private String name;

    @Field(store = true)
    private String mob;

    @Field(store = true)
    private String address;

    @Field(store = true)
    private String descr;
}

创建索引(文档)

当索引不存在时候,会先创建索引,并将数据插入。

    @Autowired
    private ElasticsearchTemplate elasticsearchTemplate;
    @Test
    public void createIndex() {
        Merchant merchant = new Merchant();
        merchant.setMerchantId(1001L);
        merchant.setName("Csea-杂货铺1");
        merchant.setAddress("大道8888号");
        merchant.setDescr("好吃的不得了");
        merchant.setMob("13899999999");
        IndexQuery indexQuery = new IndexQueryBuilder().withObject(merchant).build();
        elasticsearchTemplate.index(indexQuery);
    }

执行完之后可以看到merchant索引已经创建,并且文档也新增了。

更新文档

    @Test
    public void updateMerchantDoc() {
        Map<String, Object> sourceMap = new HashMap<>();
        sourceMap.put("name", "Csea-杂货铺-update");
        sourceMap.put("adress", "光明大道8888号");
        sourceMap.put("descr", "好吃你就多吃点!~");
        IndexRequest indexRequest = new IndexRequest();
        indexRequest.source(sourceMap);

        UpdateQuery query = new UpdateQueryBuilder().withClass(Merchant.class)
                .withId("1001")
                .withIndexRequest(indexRequest)
                .build();
        elasticsearchTemplate.update(query);
    }

查询文档

    @Test
    public void queryMerchantDoc() {
        GetQuery query = new GetQuery();
        query.setId("1001");
        Merchant merchant = elasticsearchTemplate.queryForObject(query, Merchant.class);
        log.info("查询到的数据是={}", merchant);
    }
com.test.ESTest : 查询到的数据是=Merchant(merchantId=1001, name=Csea-杂货铺-update, mob=13899999999, address=大道8888号, descr=好吃你就多吃点!~)

删除文档数据

    @Test
    public void delMerchantDoc() {
        elasticsearchTemplate.delete(Merchant.class, "1001");
    }

分页文档查询

    @Test
    public void searchPageMerchantDoc() {

        Pageable pageable = PageRequest.of(0, 20);

        SearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.matchQuery("description", "God Engineer "))
                .withPageable(pageable)
                .build();
        AggregatedPage<Merchant> merchants = elasticsearchTemplate.queryForPage(query, Merchant.class);
        List<Merchant> content = merchants.getContent();
        for (Merchant merchant : content) {
            log.info("查询到的文档为:{}", merchant);
        }

    }

高亮分页查询

    @Test
    public void searchHightMerchantDoc() {

        String preTag = "<font color='red'>";
        String postTg = "</font>";

        Pageable pageable = PageRequest.of(0, 20);
        // 排序
        SortBuilder sortBuilder = new FieldSortBuilder("name")
                .order(SortOrder.DESC);


        SearchQuery query = new NativeSearchQueryBuilder()
                // 查询的字段
                .withQuery(QueryBuilders.matchQuery("mob", "88"))
                .withHighlightFields(new HighlightBuilder.Field("mob")
                        .preTags(preTag)
                        .postTags(postTg))
                .withSort(sortBuilder)
                .withPageable(pageable)
                .build();
        AggregatedPage<Merchant> merchants = elasticsearchTemplate.queryForPage(query, Merchant.class,
                new SearchResultMapper() {
                    @Override
                    public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {
                        SearchHits hits = response.getHits();

                        List<Merchant> stuList = new ArrayList<>();
                        for (SearchHit hit : hits) {
                            HighlightField highlightField = hit.getHighlightFields().get("mob");
                            String mob = highlightField.getFragments()[0].toString();

                            Object merchantId = hit.getSourceAsMap().get("merchantId");
                            String name = (String) hit.getSourceAsMap().get("name");
                            String descr = (String) hit.getSourceAsMap().get("descr");
                            String address = (String) hit.getSourceAsMap().get("address");

                            Merchant merchantHL = new Merchant();
                            merchantHL.setDescr(descr);
                            merchantHL.setMerchantId(Long.valueOf(merchantId.toString()));
                            merchantHL.setName(name);
                            merchantHL.setMob(mob);
                            merchantHL.setAddress(address);
                            stuList.add(merchantHL);

                        }

                        if (!stuList.isEmpty()) {
                            return new AggregatedPageImpl<>((List<T>) stuList);
                        }

                        return null;
                    }
                });
        log.info("分页总数:{}", merchants.getTotalPages());
        List<Merchant> content = merchants.getContent();
        for (Merchant merchant : content) {
            log.info("分页文档数据:{}", merchant);
        }

    }

删除索引

    @Test
    public void deleteDelMerchantIndex() {
        elasticsearchTemplate.deleteIndex(Merchant.class);
    }

小结

不建议使用ElasticsearchTemplate对索引进行管理(创建索引、更新映射、删除索引),因为就像使用Mysql,不会通过java代码去改变表结构,ES也是如此,更多是用来进行CRUD操作。

Logstash

概念

  1. 作为数据采集的工具;
  2. 以id或update_time作为同步边界;

以id同步:初次同步的时候会将所有数据同步过来,之后logstash的定时任务回去检查,比如上次同步到id为2000的数据,那么这次就同步2000之后的数据,使用id作为同步有很大的弊端,就只只能新增数据,无法更新。
以update_time同步:初次同步就将所有的数据同步过来,之后如果有新增或更新的操作,那么就把以上一次更新时间为界,之后的数据全部做一次新增或更新。

  1. 使用logstash-input-jdbd插件同步;
  2. 使用logstash需要跟ES的版本保持一致,比如两者都要是6.4.3版本;
  3. 同步数据时,需要事先创建索引。

安装配置

logstash下载地址
上传并解压,再上传mysql的驱动jar包,以及jdk(logstash需要jdk)

cd进入logstash目录之后,创建文件夹

mkdir sync

进入sync目录,创建配置文件

# 先创建同步配置文件
vim logstash-db-sync.conf
# 拷贝数据库驱动到该路径下
cp /usr/local/mysql-connector-java-8.0.13.jar .

修改logstash-db-sync.conf

input {
    jdbc {
        # 设置Mysql/MariaDB 数据库url以及数据库名称
        jdbc_connection_string => "jdbc:mysql://192.168.1.6:3306/xxg?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"
        # 数据库用户名密码
        jdbc_user => "root"
        jdbc_password => "123456"
        # 数据库驱动所在位置,可以是绝对路径或相对路径
        #jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5.1.49.jar"
        jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-8.0.13.jar"
        # 驱动类名
        #jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        # 开启分页
        jdbc_paging_enabled => "true"
        # 分页每页数量,可以自定义
        jdbc_page_size => "10000"
        # 执行的sql文件路径
        statement_filepath => "/usr/local/logstash-6.4.3/sync/product.sql"
        # 设置定时任务间隔
        schedule => "* * * * *"
        # 索引类型
        type => "_doc"
        # 是否开启记录上次追踪的结构,也就是上次更新的实际,这个会记录到 last_run_metadata_path 的文件
        use_column_value => true
        # 记录上一次追踪的结果值
        last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time"
        # 如果 use_column_value 为true,配置本参数,追踪的colume名,可以是自增id或者时间
        tracking_column => "update_time"
        # tracking_column 对应字段的类型
        tracking_column_type => "timestamp"
        # 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有
        clean_run => false
        # 数据库字段名称大写转小写
        lowercase_column_names => false
    }
}

output {
    elasticsearch {
        # es地址
        hosts => ["192.168.1.7:9200"]
        # 同步索引名
        index => "product"
        # 设置_docID和数据相同,按照执行sql中的字段来
        document_id => "%{product_id}"
    }
    # 日志输出
    stdout {
        codec => json_lines
    }
}

添加执行同步查询的sql脚本

vim product.sql
SELECT
    * 
FROM
    jh_product 
WHERE
    is_delete = 0 
    AND update_time >= :sql_last_value

:sql_last_value是logstash中要赋值的地方,可以理解为占位符

启动logstash,先cd到目录下的bin目录

./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf

启动之后就可以看到数据同步过去,在ES的索引中查看数据

自定义模板

因为需要用到中文分词,所以需要自定义模板
Get:http://{IP}:{port}/_template/logstash
查看模板信息

将拿到的template复制出来进行自定义修改

{
    "order": 10,
    "version": 1,
    "index_patterns": ["*"],
    "settings": {
        "index": {
            "refresh_interval": "5s"
        }
    },
    "mappings": {
        "_default_": {
            "dynamic_templates": [
                {
                    "message_field": {
                        "path_match": "message",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "text",
                            "norms": false
                        }
                    }
                },
                {
                    "string_fields": {
                        "match": "*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "text",
                            "norms": false,
                            "analyzer": "ik_max_word",
                            "fields": {
                                "keyword": {
                                    "type": "keyword",
                                    "ignore_above": 256
                                }
                            }
                        }
                    }
                }
            ],
            "properties": {
                "@timestamp": {
                    "type": "date"
                },
                "@version": {
                    "type": "keyword"
                },
                "geoip": {
                    "dynamic": true,
                    "properties": {
                        "ip": {
                            "type": "ip"
                        },
                        "location": {
                            "type": "geo_point"
                        },
                        "latitude": {
                            "type": "half_float"
                        },
                        "longitude": {
                            "type": "half_float"
                        }
                    }
                }
            }
        }
    },
    "aliases": {}
}

创建完毕之后,在logstash目录的sync下创建新的json文件

vim logstash-ik.json

将自定义模板内容放入json文件中

然后修改logstash-db-sync.conf 配置,在output中添加如下配置,保存之后,重启logstash即可

output {
    elasticsearch {
        # 定义模板名称
        template_name => "myik"
        # 模板所在位置
        template => "/usr/local/logstash-6.4.3/sync/logstash-ik.json"
        # 重写模板
        template_overwrite => true
        #默认为true,false关闭logstash自动管理模板功能,如果自定义模板,则设置false
        manage_template => false
    }
}

中文分词不生效

如果出现中文分词"analyzer": "ik_max_word"不生效的情况可以先将manage_template改为true,然后启动logstash同步,同步之后,使用:
Get:http://{IP}:{port}/_template/{template名称}
来查看模板是否已经上传到es

同步之后将manage_template改为false,重新同步即可。
主要就是模板没有被上传到ES导致的。
另外可以修改模板中order的值,值越大,在 merge 规则的时候优先级越高。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
5天前
|
存储 JSON 数据格式
Elasticsearch 8.X 可以按照数组下标取数据吗?
Elasticsearch 8.X 可以按照数组下标取数据吗?
14 0
|
2天前
|
SQL Java 数据库
springboot用户创建的业务数据只能是同一组织能看的见
springboot用户创建的业务数据只能是同一组织能看的见
|
1天前
|
搜索推荐 Java 数据库
springboot集成ElasticSearch的具体操作(系统全文检索)
springboot集成ElasticSearch的具体操作(系统全文检索)
|
5天前
|
存储 数据处理 索引
Elasticsearch 8.X 小技巧:使用存储脚本优化数据索引与转换过程
Elasticsearch 8.X 小技巧:使用存储脚本优化数据索引与转换过程
30 6
|
5天前
|
JSON 测试技术 数据格式
Elasticsearch 8.X 如何生成 TB 级的测试数据 ?
Elasticsearch 8.X 如何生成 TB 级的测试数据 ?
13 0
|
5天前
|
监控 API 索引
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
10 0
|
5天前
|
NoSQL 关系型数据库 数据库
数据库同步 Elasticsearch 后数据不一致,怎么办?
数据库同步 Elasticsearch 后数据不一致,怎么办?
11 0
|
5天前
|
API 索引
Elasticsearch 8.X 如何基于用户指定 ID 顺序召回数据?
Elasticsearch 8.X 如何基于用户指定 ID 顺序召回数据?
11 0
|
10天前
|
JSON JavaScript Java
从前端Vue到后端Spring Boot:接收JSON数据的正确姿势
从前端Vue到后端Spring Boot:接收JSON数据的正确姿势
22 0
|
12天前
|
SQL Java 数据库连接
Springboot框架整合Spring JDBC操作数据
JDBC是Java数据库连接API,用于执行SQL并访问多种关系数据库。它包括一系列Java类和接口,用于建立数据库连接、创建数据库操作对象、定义SQL语句、执行操作并处理结果集。直接使用JDBC涉及七个步骤,包括加载驱动、建立连接、创建对象、定义SQL、执行操作、处理结果和关闭资源。Spring Boot的`spring-boot-starter-jdbc`简化了这些步骤,提供了一个在Spring生态中更便捷使用JDBC的封装。集成Spring JDBC需要添加相关依赖,配置数据库连接信息,并通过JdbcTemplate进行数据库操作,如插入、更新、删除和查询。

热门文章

最新文章