docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中(三)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

9.2 修改记录信息测试

同时,我们可以再测试修改,我们将用户头像路径进行修改,看看 es 是否同步了新的数据:

UPDATE `user` SET icon='https:///langlang' WHERE id=1001

查看 es 信息,使用 apipost 发送请求:【GET】http://192.168.65.133:9200/es_demo_collect/_search

// 以下是该请求需要携带的json数据,表示查询es_demo_collect索引中的全部文档数据
{
    "query": {
        "match_all": {}
    }
}

10.实战开发-后端代码

以下只展示我认为比较与本文相关的比较重要的文件,完整源码的获取链接我会放在文章的最后。

10.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.fox</groupId>
    <artifactId>elasticsearch-canal-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!--fastjson依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.33</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.7</version>
        </dependency>
    </dependencies>
</project>

10.2 application.yml配置

server:
  # 服务端口
  port: 9999
elasticsearch:
  # es访问ip
  hostname: 192.168.65.133
  # es访问port
  port: 9200
  blog:
    # 访问索引
    index: es_demo_collect
    # 搜索返回字段
    source_fields: userId,title,username,userIcon,introduce,createTime,updateTime

10.3 ElasticsearchConfig.java配置类

package com.fox.es.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 17:51
 */
@Configuration
public class ElasticsearchConfig {
    @Value("${elasticsearch.hostname}")
    private String hostname;
    @Value("${elasticsearch.port}")
    private Integer port;
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        RestClientBuilder builder = RestClient.builder(
                new HttpHost(hostname, port, "http")
        );
        return new RestHighLevelClient(builder);
    }
}

10.4 ⭐测试是否连接 es 成功

package com.fox.es.controller;
import com.fox.es.entity.Result;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 18:33
 */
@RestController
public class TestController {
    @Resource
    private RestHighLevelClient restHighLevelClient;
    /**
     * 用于测试是否连接 es 成功
     *
     * @return 返回 es 的基本信息,等价于访问:http://127.0.0.1:9200
     * @throws IOException 异常信息
     */
    @GetMapping("/getEsInfo")
    public Result getEsInfo() throws IOException {
        MainResponse info = restHighLevelClient.info(RequestOptions.DEFAULT);
        return Result.ok(info);
    }
}

浏览器访问:http://localhost:9999/getEsInfo

10.5 ⭐搜索服务

10.5.1 controller层

package com.fox.es_canal.controller;
import com.fox.es_canal.constant.BlogConstants;
import com.fox.es_canal.entity.Result;
import com.fox.es_canal.service.BlogService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:16
 */
@RestController
@RequestMapping("/blog")
public class BlogController {
    @Resource
    private BlogService blogService;
    /**
     * 通过关键词获取数据列表
     *
     * @param keyWords 关键词
     * @param pageNo   页码
     * @return 数据列表,按照相关性从高到低进行排序
     */
    @GetMapping("/list")
    public Result list(@RequestParam("keyWords") String keyWords,
                       @RequestParam("pageNo") Integer pageNo) {
        // BlogConstants是我写的一个常量类,里面定义了一个变量 SEARCH_PAGE_NUM = 15
        return blogService.list(keyWords, pageNo, BlogConstants.SEARCH_PAGE_NUM);
    }
}

10.5.2 service接口层

package com.fox.es_canal.service;
import com.fox.es_canal.entity.Result;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:18
 */
public interface BlogService {
    /**
     * 通过关键词获取数据列表
     *
     * @param keyWords 关键词
     * @param pageNo 页码
     * @param pageSize 每页大小
     * @return 数据列表,按照相关性从高到低进行排序
     */
    Result list(String keyWords, int pageNo, int pageSize);
}

10.5.3 service实现层

package com.fox.es_canal.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fox.es_canal.dto.BlogSimpleInfoDTO;
import com.fox.es_canal.entity.Result;
import com.fox.es_canal.service.BlogService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:18
 */
@Slf4j
@Service
public class BlogServiceImpl implements BlogService {
    @Resource
    private RestHighLevelClient restHighLevelClient;
    @Value("${elasticsearch.blog.index}")
    private String blogIndexStore;
    @Value("${elasticsearch.blog.source_fields}")
    private String blogFields;
    public Result list(String keyWords, int pageNo, int pageSize) {
        // 1.设置索引 - blog
        SearchRequest searchRequest = new SearchRequest(blogIndexStore);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        // 2.source源字段过虑
        String[] sourceFieldsArray = blogFields.split(",");
        searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
        // 3.关键字
        if (StringUtils.hasText(keyWords)) {
            // 哪些字段匹配关键字
            MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(keyWords, "title", "tags", "username", "introduce", "content");
            // 设置匹配占比(表示最少匹配的子句个数,例如有五个可选子句,最少的匹配个数为5*70%=3.5.向下取整为3,这就表示五个子句最少要匹配其中三个才能查到)
            multiMatchQueryBuilder.minimumShouldMatch("70%");
            // 提升字段的Boost值
            multiMatchQueryBuilder.field("title", 15);
            multiMatchQueryBuilder.field("tags", 10);
            multiMatchQueryBuilder.field("introduce", 7);
            multiMatchQueryBuilder.field("content", 3);
            multiMatchQueryBuilder.field("username", 3);
            boolQueryBuilder.must(multiMatchQueryBuilder);
        }
        // 4.分页
        int start = (pageNo - 1) * pageSize;
        searchSourceBuilder.from(start);
        searchSourceBuilder.size(pageSize);
        // 布尔查询
        searchSourceBuilder.query(boolQueryBuilder);
        // 6.高亮设置
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.preTags("<font color='red'>");
        highlightBuilder.postTags("</font>");
        // 设置高亮字段
        ArrayList<HighlightBuilder.Field> fields = new ArrayList<>();
        fields.add(new HighlightBuilder.Field("title"));
        fields.add(new HighlightBuilder.Field("introduce"));
        fields.add(new HighlightBuilder.Field("username"));
        highlightBuilder.fields().addAll(fields);
        searchSourceBuilder.highlighter(highlightBuilder);
        // 请求搜索
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse;
        try {
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("博客搜索异常:{}", e.getMessage());
            return Result.error(e.getMessage());
        }
        // 结果集处理
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        // 记录总数
        long totalHitsCount = hits.getTotalHits().value;
        // 数据列表
        List<BlogSimpleInfoDTO> list = new ArrayList<>();
        for (SearchHit hit : searchHits) {
            JSONObject jsonObject = JSONObject.parseObject(hit.getSourceAsString());
            BlogSimpleInfoDTO blog = new BlogSimpleInfoDTO();
            blog.setId(Integer.parseInt(hit.getId()));
            blog.setUsername(jsonObject.getString("username"));
            blog.setTitle(jsonObject.getString("title"));
            blog.setUserId(Long.parseLong(jsonObject.getString("userId")));
            blog.setUserIcon(jsonObject.getString("userIcon"));
            blog.setIntroduce(jsonObject.getString("introduce"));
            blog.setCreateTime(LocalDateTime.parse(jsonObject.getString("createTime"), DateTimeFormatter.ISO_OFFSET_DATE_TIME));
            blog.setUpdateTime(LocalDateTime.parse(jsonObject.getString("updateTime"), DateTimeFormatter.ISO_OFFSET_DATE_TIME));
            // 取出高亮字段内容
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            if (highlightFields != null) {
                blog.setTitle(parseHighlightStr(blog.getTitle(), highlightFields.get("title")));
                blog.setIntroduce(parseHighlightStr(blog.getIntroduce(), highlightFields.get("introduce")));
                blog.setUsername(parseHighlightStr(blog.getUsername(), highlightFields.get("username")));
            }
            list.add(blog);
        }
        // 封装信息返回前端
        HashMap<String, Object> resultMap = new HashMap<>(4);
        // 页码
        resultMap.put("pageNo", pageNo);
        // 每页记录数量
        resultMap.put("pageSize", pageSize);
        // 总记录数
        resultMap.put("total", totalHitsCount);
        // 该页信息
        resultMap.put("items", list);
        return Result.ok(resultMap);
    }
    public String parseHighlightStr(String text, HighlightField field) {
        if (field != null) {
            Text[] fragments = field.getFragments();
            StringBuilder stringBuilder = new StringBuilder();
            for (Text str : fragments) {
                stringBuilder.append(str.string());
            }
            return stringBuilder.toString();
        } else {
            return text;
        }
    }
}

10.5.4 效果测试

这里我们使用 apipost7 或浏览器 进行测试:

11.源码获取

Java源码地址:Mr-Write/SpringbootDemo: 各种demo案例 (github.com)

对应的是 elasticsearch-canal-demo 包模块。

12.其它说明

当我们在Java中写出往MySQL数据库添加、删除、修改博客记录的操作接口时,会同时通过 Canal 同步到es中,因为 canal 同步的本质还是去读 MySQL的 binlog 日志。由于比较简单,在这里就不做演示了。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
16天前
|
存储 SQL 关系型数据库
mysql 的ReLog和BinLog区别
MySQL中的重做日志和二进制日志是确保数据库稳定性和可靠性的关键组件。重做日志主要用于事务的持久性和原子性,通过记录数据页的物理修改信息来恢复未提交的事务;而二进制日志记录SQL语句的逻辑变化,支持数据复制、恢复和审计。两者在写入时机、存储方式及配置参数等方面存在显著差异。
|
3天前
|
关系型数据库 MySQL 数据库
docker高级篇(大厂进阶):安装mysql主从复制
docker高级篇(大厂进阶):安装mysql主从复制
47 24
|
15天前
|
SQL 存储 缓存
MySQL进阶突击系列(02)一条更新SQL执行过程 | 讲透undoLog、redoLog、binLog日志三宝
本文详细介绍了MySQL中update SQL执行过程涉及的undoLog、redoLog和binLog三种日志的作用及其工作原理,包括它们如何确保数据的一致性和完整性,以及在事务提交过程中各自的角色。同时,文章还探讨了这些日志在故障恢复中的重要性,强调了合理配置相关参数对于提高系统稳定性的必要性。
|
1月前
|
关系型数据库 MySQL 数据库
【赵渝强老师】MySQL的binlog日志文件
MySQL的binlog日志记录了所有对数据库的更改操作(不包括SELECT和SHOW),主要用于主从复制和数据恢复。binlog有三种模式,可通过设置binlog_format参数选择。示例展示了如何启用binlog、设置格式、查看日志文件及记录的信息。
|
1月前
|
监控 前端开发 Java
【技术开发】接口管理平台要用什么技术栈?推荐:Java+Vue3+Docker+MySQL
该文档介绍了基于Java后端和Vue3前端构建的管理系统的技术栈及功能模块,涵盖管理后台的访问、登录、首页概览、API接口管理、接口权限设置、接口监控、计费管理、账号管理、应用管理、数据库配置、站点配置及管理员个人设置等内容,并提供了访问地址及操作指南。
|
1月前
|
存储 SQL 关系型数据库
mysql 的ReLog和BinLog区别
MySQL中的重做日志(Redo Log)和二进制日志(Binary Log)是两种重要的日志系统。重做日志主要用于保证事务的持久性和原子性,通过记录数据页的物理修改信息来恢复未提交的事务更改。二进制日志则记录了数据库的所有逻辑变化操作,用于数据的复制、恢复和审计。两者在写入时机、存储方式、配置参数和使用范围上有所不同,共同确保了数据库的稳定性和可靠性。
|
1月前
|
关系型数据库 MySQL Docker
docker环境下mysql镜像启动后权限更改问题的解决
在Docker环境下运行MySQL容器时,权限问题是一个常见的困扰。通过正确设置目录和文件的权限,可以确保MySQL容器顺利启动并正常运行。本文提供了多种解决方案,包括在主机上设置正确的权限、使用Dockerfile和Docker Compose进行配置、在容器启动后手动更改权限以及使用 `init`脚本自动更改权限。根据实际情况选择合适的方法,可以有效解决MySQL容器启动后的权限问题。希望本文对您在Docker环境下运行MySQL容器有所帮助。
155 1
|
2月前
|
关系型数据库 MySQL Linux
Docker安装Mysql5.7,解决无法访问DockerHub问题
当 Docker Hub 无法访问时,可以通过配置国内镜像加速来解决应用安装失败和镜像拉取超时的问题。本文介绍了如何在 CentOS 上一键配置国内镜像加速,并成功拉取 MySQL 5.7 镜像。
580 2
Docker安装Mysql5.7,解决无法访问DockerHub问题
|
2月前
|
弹性计算 关系型数据库 MySQL
Docker安装MySQL
这篇文章详细介绍了如何使用Docker安装MySQL数据库服务,包括拉取镜像、配置数据卷以及启动容器的步骤。
405 0
Docker安装MySQL
|
3月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
下一篇
DataWorks