docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中(三)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

9.2 修改记录信息测试

同时,我们可以再测试修改,我们将用户头像路径进行修改,看看 es 是否同步了新的数据:

UPDATE `user` SET icon='https:///langlang' WHERE id=1001

查看 es 信息,使用 apipost 发送请求:【GET】http://192.168.65.133:9200/es_demo_collect/_search

// 以下是该请求需要携带的json数据,表示查询es_demo_collect索引中的全部文档数据
{
    "query": {
        "match_all": {}
    }
}

10.实战开发-后端代码

以下只展示我认为比较与本文相关的比较重要的文件,完整源码的获取链接我会放在文章的最后。

10.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.fox</groupId>
    <artifactId>elasticsearch-canal-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!--fastjson依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.33</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.7</version>
        </dependency>
    </dependencies>
</project>

10.2 application.yml配置

server:
  # 服务端口
  port: 9999
elasticsearch:
  # es访问ip
  hostname: 192.168.65.133
  # es访问port
  port: 9200
  blog:
    # 访问索引
    index: es_demo_collect
    # 搜索返回字段
    source_fields: userId,title,username,userIcon,introduce,createTime,updateTime

10.3 ElasticsearchConfig.java配置类

package com.fox.es.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 17:51
 */
@Configuration
public class ElasticsearchConfig {
    @Value("${elasticsearch.hostname}")
    private String hostname;
    @Value("${elasticsearch.port}")
    private Integer port;
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        RestClientBuilder builder = RestClient.builder(
                new HttpHost(hostname, port, "http")
        );
        return new RestHighLevelClient(builder);
    }
}

10.4 ⭐测试是否连接 es 成功

package com.fox.es.controller;
import com.fox.es.entity.Result;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 18:33
 */
@RestController
public class TestController {
    @Resource
    private RestHighLevelClient restHighLevelClient;
    /**
     * 用于测试是否连接 es 成功
     *
     * @return 返回 es 的基本信息,等价于访问:http://127.0.0.1:9200
     * @throws IOException 异常信息
     */
    @GetMapping("/getEsInfo")
    public Result getEsInfo() throws IOException {
        MainResponse info = restHighLevelClient.info(RequestOptions.DEFAULT);
        return Result.ok(info);
    }
}

浏览器访问:http://localhost:9999/getEsInfo

10.5 ⭐搜索服务

10.5.1 controller层

package com.fox.es_canal.controller;
import com.fox.es_canal.constant.BlogConstants;
import com.fox.es_canal.entity.Result;
import com.fox.es_canal.service.BlogService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:16
 */
@RestController
@RequestMapping("/blog")
public class BlogController {
    @Resource
    private BlogService blogService;
    /**
     * 通过关键词获取数据列表
     *
     * @param keyWords 关键词
     * @param pageNo   页码
     * @return 数据列表,按照相关性从高到低进行排序
     */
    @GetMapping("/list")
    public Result list(@RequestParam("keyWords") String keyWords,
                       @RequestParam("pageNo") Integer pageNo) {
        // BlogConstants是我写的一个常量类,里面定义了一个变量 SEARCH_PAGE_NUM = 15
        return blogService.list(keyWords, pageNo, BlogConstants.SEARCH_PAGE_NUM);
    }
}

10.5.2 service接口层

package com.fox.es_canal.service;
import com.fox.es_canal.entity.Result;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:18
 */
public interface BlogService {
    /**
     * 通过关键词获取数据列表
     *
     * @param keyWords 关键词
     * @param pageNo 页码
     * @param pageSize 每页大小
     * @return 数据列表,按照相关性从高到低进行排序
     */
    Result list(String keyWords, int pageNo, int pageSize);
}

10.5.3 service实现层

package com.fox.es_canal.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fox.es_canal.dto.BlogSimpleInfoDTO;
import com.fox.es_canal.entity.Result;
import com.fox.es_canal.service.BlogService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:18
 */
@Slf4j
@Service
public class BlogServiceImpl implements BlogService {
    @Resource
    private RestHighLevelClient restHighLevelClient;
    @Value("${elasticsearch.blog.index}")
    private String blogIndexStore;
    @Value("${elasticsearch.blog.source_fields}")
    private String blogFields;
    public Result list(String keyWords, int pageNo, int pageSize) {
        // 1.设置索引 - blog
        SearchRequest searchRequest = new SearchRequest(blogIndexStore);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        // 2.source源字段过虑
        String[] sourceFieldsArray = blogFields.split(",");
        searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
        // 3.关键字
        if (StringUtils.hasText(keyWords)) {
            // 哪些字段匹配关键字
            MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(keyWords, "title", "tags", "username", "introduce", "content");
            // 设置匹配占比(表示最少匹配的子句个数,例如有五个可选子句,最少的匹配个数为5*70%=3.5.向下取整为3,这就表示五个子句最少要匹配其中三个才能查到)
            multiMatchQueryBuilder.minimumShouldMatch("70%");
            // 提升字段的Boost值
            multiMatchQueryBuilder.field("title", 15);
            multiMatchQueryBuilder.field("tags", 10);
            multiMatchQueryBuilder.field("introduce", 7);
            multiMatchQueryBuilder.field("content", 3);
            multiMatchQueryBuilder.field("username", 3);
            boolQueryBuilder.must(multiMatchQueryBuilder);
        }
        // 4.分页
        int start = (pageNo - 1) * pageSize;
        searchSourceBuilder.from(start);
        searchSourceBuilder.size(pageSize);
        // 布尔查询
        searchSourceBuilder.query(boolQueryBuilder);
        // 6.高亮设置
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.preTags("<font color='red'>");
        highlightBuilder.postTags("</font>");
        // 设置高亮字段
        ArrayList<HighlightBuilder.Field> fields = new ArrayList<>();
        fields.add(new HighlightBuilder.Field("title"));
        fields.add(new HighlightBuilder.Field("introduce"));
        fields.add(new HighlightBuilder.Field("username"));
        highlightBuilder.fields().addAll(fields);
        searchSourceBuilder.highlighter(highlightBuilder);
        // 请求搜索
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse;
        try {
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("博客搜索异常:{}", e.getMessage());
            return Result.error(e.getMessage());
        }
        // 结果集处理
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        // 记录总数
        long totalHitsCount = hits.getTotalHits().value;
        // 数据列表
        List<BlogSimpleInfoDTO> list = new ArrayList<>();
        for (SearchHit hit : searchHits) {
            JSONObject jsonObject = JSONObject.parseObject(hit.getSourceAsString());
            BlogSimpleInfoDTO blog = new BlogSimpleInfoDTO();
            blog.setId(Integer.parseInt(hit.getId()));
            blog.setUsername(jsonObject.getString("username"));
            blog.setTitle(jsonObject.getString("title"));
            blog.setUserId(Long.parseLong(jsonObject.getString("userId")));
            blog.setUserIcon(jsonObject.getString("userIcon"));
            blog.setIntroduce(jsonObject.getString("introduce"));
            blog.setCreateTime(LocalDateTime.parse(jsonObject.getString("createTime"), DateTimeFormatter.ISO_OFFSET_DATE_TIME));
            blog.setUpdateTime(LocalDateTime.parse(jsonObject.getString("updateTime"), DateTimeFormatter.ISO_OFFSET_DATE_TIME));
            // 取出高亮字段内容
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            if (highlightFields != null) {
                blog.setTitle(parseHighlightStr(blog.getTitle(), highlightFields.get("title")));
                blog.setIntroduce(parseHighlightStr(blog.getIntroduce(), highlightFields.get("introduce")));
                blog.setUsername(parseHighlightStr(blog.getUsername(), highlightFields.get("username")));
            }
            list.add(blog);
        }
        // 封装信息返回前端
        HashMap<String, Object> resultMap = new HashMap<>(4);
        // 页码
        resultMap.put("pageNo", pageNo);
        // 每页记录数量
        resultMap.put("pageSize", pageSize);
        // 总记录数
        resultMap.put("total", totalHitsCount);
        // 该页信息
        resultMap.put("items", list);
        return Result.ok(resultMap);
    }
    public String parseHighlightStr(String text, HighlightField field) {
        if (field != null) {
            Text[] fragments = field.getFragments();
            StringBuilder stringBuilder = new StringBuilder();
            for (Text str : fragments) {
                stringBuilder.append(str.string());
            }
            return stringBuilder.toString();
        } else {
            return text;
        }
    }
}

10.5.4 效果测试

这里我们使用 apipost7 或浏览器 进行测试:

11.源码获取

Java源码地址:Mr-Write/SpringbootDemo: 各种demo案例 (github.com)

对应的是 elasticsearch-canal-demo 包模块。

12.其它说明

当我们在Java中写出往MySQL数据库添加、删除、修改博客记录的操作接口时,会同时通过 Canal 同步到es中,因为 canal 同步的本质还是去读 MySQL的 binlog 日志。由于比较简单,在这里就不做演示了。

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
相关文章
|
10月前
|
关系型数据库 应用服务中间件 nginx
Docker一键安装中间件(RocketMq、Nginx、MySql、Minio、Jenkins、Redis)
本系列脚本提供RocketMQ、Nginx、MySQL、MinIO、Jenkins和Redis的Docker一键安装与配置方案,适用于快速部署微服务基础环境。
|
7月前
|
应用服务中间件 Linux nginx
在虚拟机Docker环境下部署Nginx的步骤。
以上就是在Docker环境下部署Nginx的步骤。需要注意,Docker和Nginix都有很多高级用法和细节需要掌握,以上只是一个基础入门级别的教程。如果你想要更深入地学习和使用它们,请参考官方文档或者其他专业书籍。
348 5
|
7月前
|
NoSQL 算法 Redis
【Docker】(3)学习Docker中 镜像与容器数据卷、映射关系!手把手带你安装 MySql主从同步 和 Redis三主三从集群!并且进行主从切换与扩容操作,还有分析 哈希分区 等知识点!
Union文件系统(UnionFS)是一种**分层、轻量级并且高性能的文件系统**,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下(unite several directories into a single virtual filesystem) Union 文件系统是 Docker 镜像的基础。 镜像可以通过分层来进行继承,基于基础镜像(没有父镜像),可以制作各种具体的应用镜像。
791 6
|
7月前
|
存储 关系型数据库 MySQL
MySQL Docker 容器化部署全指南
MySQL是一款开源关系型数据库,广泛用于Web及企业应用。Docker容器化部署可解决环境不一致、依赖冲突问题,实现高效、隔离、轻量的MySQL服务运行,支持数据持久化与快速迁移,适用于开发、测试及生产环境。
997 4
|
9月前
|
缓存 Ubuntu Docker
Ubuntu环境下删除Docker镜像与容器、配置静态IP地址教程。
如果遇见问题或者想回滚改动, 可以重启系统.
555 16
|
11月前
|
消息中间件 监控 Docker
Docker环境下快速部署RabbitMQ教程。
就这样,你成功地用魔法召唤出了RabbitMQ,还把它和你的应用程序连接了起来。现在,消息会像小溪流水一样,在你的系统中自由流淌。别忘了,兔子们不喜欢孤独,他们需要你细心的关怀,不时地监控它们,确保他们的世界运转得井井有条。
676 18
|
10月前
|
弹性计算 关系型数据库 Nacos
低配阿里云 ECS 如何 docker 环境部署 NACOS : 单机版模式
NACOS 单机版 Docker 安装指南。使用指定端口和 custom.env 配置文件启动 Nacos 服务,适用于 2.X 版本,包含 gRPC 支持及 MySQL 数据源配置。 -e MODE=standalone \
771 5
|
11月前
|
消息中间件 监控 Docker
Docker环境下快速部署RabbitMQ教程。
至此,这次神秘而简明的部署之旅告一段落。祝你在利用RabbitMQ打造消息队列时,一切顺风顺水!
709 8
|
9月前
|
关系型数据库 MySQL 数据库
为什么 MySQL 不推荐用 Docker 部署?
本文探讨了MySQL是否适合容器化的问题,分析了Docker容器在数据安全、性能瓶颈、状态管理及资源隔离等方面的挑战,并指出目前主流分布式数据库如TDSQL和OceanBase仍倾向于部署在物理机或KVM上。
435 0