docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中(三)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

9.2 修改记录信息测试

同时,我们可以再测试修改,我们将用户头像路径进行修改,看看 es 是否同步了新的数据:

UPDATE `user` SET icon='https:///langlang' WHERE id=1001

查看 es 信息,使用 apipost 发送请求:【GET】http://192.168.65.133:9200/es_demo_collect/_search

// 以下是该请求需要携带的json数据,表示查询es_demo_collect索引中的全部文档数据
{
    "query": {
        "match_all": {}
    }
}

10.实战开发-后端代码

以下只展示我认为比较与本文相关的比较重要的文件,完整源码的获取链接我会放在文章的最后。

10.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.fox</groupId>
    <artifactId>elasticsearch-canal-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!--fastjson依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.33</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.7</version>
        </dependency>
    </dependencies>
</project>

10.2 application.yml配置

server:
  # 服务端口
  port: 9999
elasticsearch:
  # es访问ip
  hostname: 192.168.65.133
  # es访问port
  port: 9200
  blog:
    # 访问索引
    index: es_demo_collect
    # 搜索返回字段
    source_fields: userId,title,username,userIcon,introduce,createTime,updateTime

10.3 ElasticsearchConfig.java配置类

package com.fox.es.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 17:51
 */
@Configuration
public class ElasticsearchConfig {
    @Value("${elasticsearch.hostname}")
    private String hostname;
    @Value("${elasticsearch.port}")
    private Integer port;
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        RestClientBuilder builder = RestClient.builder(
                new HttpHost(hostname, port, "http")
        );
        return new RestHighLevelClient(builder);
    }
}

10.4 ⭐测试是否连接 es 成功

package com.fox.es.controller;
import com.fox.es.entity.Result;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 18:33
 */
@RestController
public class TestController {
    @Resource
    private RestHighLevelClient restHighLevelClient;
    /**
     * 用于测试是否连接 es 成功
     *
     * @return 返回 es 的基本信息,等价于访问:http://127.0.0.1:9200
     * @throws IOException 异常信息
     */
    @GetMapping("/getEsInfo")
    public Result getEsInfo() throws IOException {
        MainResponse info = restHighLevelClient.info(RequestOptions.DEFAULT);
        return Result.ok(info);
    }
}

浏览器访问:http://localhost:9999/getEsInfo

10.5 ⭐搜索服务

10.5.1 controller层

package com.fox.es_canal.controller;
import com.fox.es_canal.constant.BlogConstants;
import com.fox.es_canal.entity.Result;
import com.fox.es_canal.service.BlogService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:16
 */
@RestController
@RequestMapping("/blog")
public class BlogController {
    @Resource
    private BlogService blogService;
    /**
     * 通过关键词获取数据列表
     *
     * @param keyWords 关键词
     * @param pageNo   页码
     * @return 数据列表,按照相关性从高到低进行排序
     */
    @GetMapping("/list")
    public Result list(@RequestParam("keyWords") String keyWords,
                       @RequestParam("pageNo") Integer pageNo) {
        // BlogConstants是我写的一个常量类,里面定义了一个变量 SEARCH_PAGE_NUM = 15
        return blogService.list(keyWords, pageNo, BlogConstants.SEARCH_PAGE_NUM);
    }
}

10.5.2 service接口层

package com.fox.es_canal.service;
import com.fox.es_canal.entity.Result;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:18
 */
public interface BlogService {
    /**
     * 通过关键词获取数据列表
     *
     * @param keyWords 关键词
     * @param pageNo 页码
     * @param pageSize 每页大小
     * @return 数据列表,按照相关性从高到低进行排序
     */
    Result list(String keyWords, int pageNo, int pageSize);
}

10.5.3 service实现层

package com.fox.es_canal.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fox.es_canal.dto.BlogSimpleInfoDTO;
import com.fox.es_canal.entity.Result;
import com.fox.es_canal.service.BlogService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:18
 */
@Slf4j
@Service
public class BlogServiceImpl implements BlogService {
    @Resource
    private RestHighLevelClient restHighLevelClient;
    @Value("${elasticsearch.blog.index}")
    private String blogIndexStore;
    @Value("${elasticsearch.blog.source_fields}")
    private String blogFields;
    public Result list(String keyWords, int pageNo, int pageSize) {
        // 1.设置索引 - blog
        SearchRequest searchRequest = new SearchRequest(blogIndexStore);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        // 2.source源字段过虑
        String[] sourceFieldsArray = blogFields.split(",");
        searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
        // 3.关键字
        if (StringUtils.hasText(keyWords)) {
            // 哪些字段匹配关键字
            MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(keyWords, "title", "tags", "username", "introduce", "content");
            // 设置匹配占比(表示最少匹配的子句个数,例如有五个可选子句,最少的匹配个数为5*70%=3.5.向下取整为3,这就表示五个子句最少要匹配其中三个才能查到)
            multiMatchQueryBuilder.minimumShouldMatch("70%");
            // 提升字段的Boost值
            multiMatchQueryBuilder.field("title", 15);
            multiMatchQueryBuilder.field("tags", 10);
            multiMatchQueryBuilder.field("introduce", 7);
            multiMatchQueryBuilder.field("content", 3);
            multiMatchQueryBuilder.field("username", 3);
            boolQueryBuilder.must(multiMatchQueryBuilder);
        }
        // 4.分页
        int start = (pageNo - 1) * pageSize;
        searchSourceBuilder.from(start);
        searchSourceBuilder.size(pageSize);
        // 布尔查询
        searchSourceBuilder.query(boolQueryBuilder);
        // 6.高亮设置
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.preTags("<font color='red'>");
        highlightBuilder.postTags("</font>");
        // 设置高亮字段
        ArrayList<HighlightBuilder.Field> fields = new ArrayList<>();
        fields.add(new HighlightBuilder.Field("title"));
        fields.add(new HighlightBuilder.Field("introduce"));
        fields.add(new HighlightBuilder.Field("username"));
        highlightBuilder.fields().addAll(fields);
        searchSourceBuilder.highlighter(highlightBuilder);
        // 请求搜索
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse;
        try {
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("博客搜索异常:{}", e.getMessage());
            return Result.error(e.getMessage());
        }
        // 结果集处理
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        // 记录总数
        long totalHitsCount = hits.getTotalHits().value;
        // 数据列表
        List<BlogSimpleInfoDTO> list = new ArrayList<>();
        for (SearchHit hit : searchHits) {
            JSONObject jsonObject = JSONObject.parseObject(hit.getSourceAsString());
            BlogSimpleInfoDTO blog = new BlogSimpleInfoDTO();
            blog.setId(Integer.parseInt(hit.getId()));
            blog.setUsername(jsonObject.getString("username"));
            blog.setTitle(jsonObject.getString("title"));
            blog.setUserId(Long.parseLong(jsonObject.getString("userId")));
            blog.setUserIcon(jsonObject.getString("userIcon"));
            blog.setIntroduce(jsonObject.getString("introduce"));
            blog.setCreateTime(LocalDateTime.parse(jsonObject.getString("createTime"), DateTimeFormatter.ISO_OFFSET_DATE_TIME));
            blog.setUpdateTime(LocalDateTime.parse(jsonObject.getString("updateTime"), DateTimeFormatter.ISO_OFFSET_DATE_TIME));
            // 取出高亮字段内容
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            if (highlightFields != null) {
                blog.setTitle(parseHighlightStr(blog.getTitle(), highlightFields.get("title")));
                blog.setIntroduce(parseHighlightStr(blog.getIntroduce(), highlightFields.get("introduce")));
                blog.setUsername(parseHighlightStr(blog.getUsername(), highlightFields.get("username")));
            }
            list.add(blog);
        }
        // 封装信息返回前端
        HashMap<String, Object> resultMap = new HashMap<>(4);
        // 页码
        resultMap.put("pageNo", pageNo);
        // 每页记录数量
        resultMap.put("pageSize", pageSize);
        // 总记录数
        resultMap.put("total", totalHitsCount);
        // 该页信息
        resultMap.put("items", list);
        return Result.ok(resultMap);
    }
    public String parseHighlightStr(String text, HighlightField field) {
        if (field != null) {
            Text[] fragments = field.getFragments();
            StringBuilder stringBuilder = new StringBuilder();
            for (Text str : fragments) {
                stringBuilder.append(str.string());
            }
            return stringBuilder.toString();
        } else {
            return text;
        }
    }
}

10.5.4 效果测试

这里我们使用 apipost7 或浏览器 进行测试:

11.源码获取

Java源码地址:Mr-Write/SpringbootDemo: 各种demo案例 (github.com)

对应的是 elasticsearch-canal-demo 包模块。

12.其它说明

当我们在Java中写出往MySQL数据库添加、删除、修改博客记录的操作接口时,会同时通过 Canal 同步到es中,因为 canal 同步的本质还是去读 MySQL的 binlog 日志。由于比较简单,在这里就不做演示了。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
3天前
|
SQL 存储 关系型数据库
MySQL Cluster集群安装及使用
MySQL Cluster集群安装及使用
|
18天前
|
关系型数据库 MySQL 数据库
mysql卸载、下载、安装(window版本)
mysql卸载、下载、安装(window版本)
|
29天前
|
存储 自然语言处理 关系型数据库
ElasticSearch索引 和MySQL索引那个更高效实用那个更合适
ElasticSearch索引 和MySQL索引那个更高效实用那个更合适
38 0
|
7天前
|
关系型数据库 MySQL 数据库
《MySQL 简易速速上手小册》第1章:MySQL 基础和安装(2024 最新版)
《MySQL 简易速速上手小册》第1章:MySQL 基础和安装(2024 最新版)
33 4
|
1天前
|
关系型数据库 MySQL 数据安全/隐私保护
安装mysql和远程连接
安装mysql和远程连接
8 0
|
1天前
|
关系型数据库 MySQL Java
Linux 安装 JDK、MySQL、Tomcat(图文并茂)
Linux 安装 JDK、MySQL、Tomcat(图文并茂)
14 2
|
2天前
|
NoSQL 关系型数据库 MySQL
[AIGC] 对比MySQL全文索引,RedisSearch,和Elasticsearch的详细区别
[AIGC] 对比MySQL全文索引,RedisSearch,和Elasticsearch的详细区别
|
3天前
|
关系型数据库 MySQL Windows
windows安装MySQL5.7教程
windows安装MySQL5.7教程
14 0
|
10天前
|
关系型数据库 MySQL Linux
Linux联网安装MySQL Server
Linux联网安装MySQL Server
22 0
|
11天前
|
关系型数据库 MySQL Linux
centos7安装mysql-带网盘安装包
centos7安装mysql-带网盘安装包
61 2