Elastic实战:canal自定义客户端,实现mysql多表同步到es

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 我们之前讲解了利用canal实现无代码入侵的同步mysql数据到elasticsearch,并且讲解了主子表数据如何同步。但具体生产中,仍然有更加复杂的同步需求,之前也有几位同学咨询过我,因为canal只支持2张表的数据同步,并不支持3张表及以上的同步,当不少的业务需要3表以上的同步,这就需要我们自定义canal客户端来实现了,那么今天我们就来实操演示下自定义canal客户端,实现多表同步

0. 引言

我们之前讲解了利用canal实现无代码入侵的同步mysql数据到elasticsearch,并且讲解了主子表数据如何同步。

通过canal1.1.5实现mysql8.0数据增量/全量同步到elasticsearch7.x
canal同步mysql到es之父子表数据同步|对象型数组同步|nested数组同步

但具体生产中,仍然有更加复杂的同步需求,之前也有几位同学咨询过我,因为canal只支持2张表的数据同步,并不支持3张表及以上的同步,当不少的业务需要3表以上的同步,这就需要我们自定义canal客户端来实现了,那么今天我们就来实操演示下自定义canal客户端,实现多表同步

1. canal简介

anal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等

canal分成服务端deployer和客户端adapter,我们可以部署多个,同时为了方便管理还提供了一个管理端admin,同时我们还可以自定义客户端,我们讲自定义的客户端称为client

canal的数据同步流程如下图所示

在这里插入图片描述

2. 环境准备

2.1 安装jdk

canal是基于java环境的,因此运行前需要先安装jdk,这里我安装的是jdk11。详细步骤就不再累述了。

canal1.1.5使用jdk1.8即可,以下示例的是canal1.1.6。该版本需要使用jdk11+,否则会报错NoSuchMethodError

2.2 安装canal

1、截止本文,canal的稳定版已更新到1.1.6了, 所以本文也以这个版本为例。

这里因为我们要自定义客户端,所以只用下载服务端deployer即可

官方下载地址

在这里插入图片描述

当然也可以通过wget指令直接下载到服务器

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

详细的安装步骤不再累述了,还不清楚的同学可以参考上一篇文章

通过canal来实现mysql数据同步到elasticsearch

2.3 mysql配置

1、因为同步是基于binlog实现的,所以要现在mysql中开启binlog

修改mysql配置文件

vim /etc/my.cnf

修改内容

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式

2、源数据库创建一个canal账号,并且设置slave,dump权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

在这里插入图片描述

3、因为mysql8.0.3后身份检验方式为caching_sha2_password,但canal使用的是mysql_native_password,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
select host,user,plugin from mysql.user ;

3. 实操

3.1 服务端deployer配置

1、查询源mysql服务器的binlog位置

# 源mysql服务器中登陆mysql执行
show binary logs;

在这里插入图片描述

2、进入deployer安装目录

cd deployer

3、我们新建一个实例es专门用于本次演示

cd conf
# 复制example实例配置
cp -R example es

4、修改实例es配置文件instance.properties

cd es
vim instance.properties

修改内容

# position info
# 源数据库地址及端口
canal.instance.master.address=192.168.244.17:3306
# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
canal.instance.master.journal.name=mysql-bin.000001
# 开始同步的binlog文件位置
canal.instance.master.position=0
# 开始同步时间点 时间戳形式
canal.instance.master.timestamp=1546272000000

# 数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# 配置不同步mysql库
canal.instance.filter.black.regex=mysql\..*

mysql数据同步起点说明:

  • canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
  • canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
  • 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)

5、启动服务端

./bin/start.sh

6、查看示例日志,无报错则说明启动成功

cat logs/es/es.log

在这里插入图片描述

针对服务端的详细配置项解释,可以参考官方文档:

配置项解释

在这里插入图片描述

3.2 自定义客户端client

1、新建一个springboot项目,我们结合之前讲解的spring-data-elasticsearch来作为es客户端,这里就不单独说明其配置了,还不知道的同学可以参考之前的文章

从零搭建springboot整合spring data elasticsearch4.2.x环境

引入依赖spring-data-elasticsearchcanal-spring-boot-startermybatis-plus

        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-elasticsearch</artifactId>
            <version>4.2.10</version>
        </dependency>
        
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.2</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

2、修改配置文件application.yml

# 应用名称
spring:
  application:
      name: canal_client_es
  elasticsearch:
    rest:
      # es 地址
      uris: http://192.168.244.11:9200
      username: elastic
      password: elastic
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    name: defaultDataSource
    url: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8
    username: root
    password: 123456

server:
    port: 8080

# canal服务端地址
canal:
  server: 192.168.244.22:11111
  # 实例名,与deployer中配置的保持统一
  destination: es

# 设置canal消息日志打印级别
logging:
  level:
    top.javatool.canal.client: warn

3、创建es客户端配置


/**
 * @author benjamin
 * @date 2022/10/1
 */
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.canal_client_es")
public class ElasticRestClientConfig extends AbstractElasticsearchConfiguration {

    @Value("${spring.elasticsearch.rest.uris}")
    private String url;
    @Value("${spring.elasticsearch.rest.username}")
    private String username;
    @Value("${spring.elasticsearch.rest.password}")
    private String password;

    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        url = url.replace("http://","");
        String[] urlArr = url.split(",");
        HttpHost[] httpPostArr = new HttpHost[urlArr.length];
        for (int i = 0; i < urlArr.length; i++) {
            HttpHost httpHost = new HttpHost(urlArr[i].split(":")[0].trim(),
                    Integer.parseInt(urlArr[i].split(":")[1].trim()), "http");
            httpPostArr[i] = httpHost;
        }
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(username,password));
        RestClientBuilder builder = RestClient.builder(httpPostArr)
                // 异步httpclient配置
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    // 账号密码登录
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    // httpclient连接数配置
                    httpClientBuilder.setMaxConnTotal(30);
                    httpClientBuilder.setMaxConnPerRoute(10);
                    // httpclient保活策略
                    httpClientBuilder.setKeepAliveStrategy(((response, context) -> Duration.ofMinutes(5).toMillis()));
                    return httpClientBuilder;
                });
        return new RestHighLevelClient(builder);
    }

    @Bean
    public ElasticsearchRestTemplate elasticsearchRestTemplate(RestHighLevelClient elasticsearchClient,ElasticsearchConverter elasticsearchConverter){
        return new ElasticsearchRestTemplate(elasticsearchClient,elasticsearchConverter);
    }

}

4、实现根据实体类自动创建es索引的配置类,不需要可跳过这步

@Configuration
@Slf4j
@AllArgsConstructor
public class ElasticCreateIndexStartUp implements ApplicationListener<ContextRefreshedEvent> {

    private final ElasticsearchRestTemplate restTemplate;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent){
        log.info("[elastic]索引初始化...");
        Reflections f = new Reflections("com.example.canal_client_es.entity");
        Set<Class<?>> classSet = f.getTypesAnnotatedWith(Document.class);
        for (Class<?> clazz : classSet) {
            IndexOperations indexOperations = restTemplate.indexOps(clazz);
            if(!indexOperations.exists()){
                indexOperations.create();
                indexOperations.putMapping();
                log.info(String.format("[elastic]索引%s数据结构创建成功",clazz.getSimpleName()));
            }
        }
        log.info("[elastic]索引初始化完毕");
    }
}

4、创建订单、商品、收货人实体,其中一个订单下有多个商品、多个收货人,我们希望同步订单表时,将商品、收货人两张表的信息同步更新。

同时因为我们需要与数据库做映射,同时也需要与es做映射,所以需要创建面向mysql和es的实体类,当然你也可以将两种整合到一起(如下所示的商品实体、收货人实体),这里为了让大家清晰的认识,我将其分开(如下所示的订单实体)

es实体类

// 订单实体
@Data
@Document(indexName = "my_order")
@Setting(replicas = 0,shards = 1)
public class Order implements Serializable {

    /**
     * 主键
     */
    @Id
    private Long id;

    /**
     * 订单号
     */
    @Field(type = FieldType.Keyword, name="seqNo")
    private String seqNo;

    /**
     * 总价
     */
    @Field(type = FieldType.Double, name="totalPrice")
    private BigDecimal totalPrice;

    /**
     * 数量
     */
    @Field(type = FieldType.Integer, name="quantity")
    private Integer quantity;

    /**
     * 商品清单
     */
    @Field(type = FieldType.Nested, name="productList")
    private List<Product> productList;

    /**
     * 收货人清单
     */
    @Field(type = FieldType.Nested, name="userList")
    private List<User> userList;

}

// 商品实体
@Data
@Table(name = "product")
public class Product implements Serializable {

    @Field(type = FieldType.Long, name="id")
    private Long id;

    @Field(type = FieldType.Keyword, name="seqNo")
    @Column(name = "seq_no")
    private String seqNo;

    @Field(type = FieldType.Double, name="price")
    private BigDecimal price;

    @Field(type = FieldType.Text, name="name", analyzer = "ik_smart")
    private String name;

}

// 收货人实体
@Data
@Table(name = "user")
public class User implements Serializable {

    @Field(type = FieldType.Long, name="id")
    private Long id;

    @Field(type = FieldType.Keyword, name="seqNo")
    @Column(name = "seq_no")
    private String seqNo;

    @Field(type = FieldType.Keyword, name="name")
    private String name;

    @Field(type = FieldType.Integer, name="age")
    private Integer age;

    @Field(type = FieldType.Text, name="address", analyzer = "ik_smart")
    private String address;
}

数据库实体,并用jpa的注解@Column来映射字段名。商品、收货人的数据库实体则整合到es实体中了,如上

@Data
@Table(name = "my_order")
public class OrderPO implements Serializable {

    /**
     * 主键
     */
    @Column(name = "id")
    private Long id;

    /**
     * 订单号
     */
    @Column(name = "seq_no")
    private String seqNo;

    /**
     * 总价
     */
    @Column(name = "total_price")
    private BigDecimal totalPrice;

    /**
     * 数量
     */
    @Column(name = "quantity")
    private Integer quantity;

}

5、我们基于mybatis-plus来操作数据库,因此需要创建实体的mapper、service。详细的代码大家按照mybatis-plus的用法创建即可,或者通过本文最后下载源码查看。这里不再累叙。

在这里插入图片描述

6、操作到这里,最好把你的项目启动一下,如果正常则继续往下操作,如果不正常也好提前排错,不要压到最后发现一堆错,也不知道错在哪里。

7、接下来我们基于canal-client提供的EntryHandler类来实现对于数据表的监控,从而达到数据的增删改同步

@CanalTable("my_order")
@Component
@AllArgsConstructor
@Slf4j
public class OrderHandler implements EntryHandler<OrderPO> {

    private final ElasticsearchRestTemplate elasticsearchRestTemplate;

    private final IProductService productService;

    private final IUserService userService;

    @Override
    public void insert(OrderPO orderPO) {
        Order order = new Order();
        BeanUtils.copyProperties(orderPO,order);
        List<Product> productList = productService.list(Wrappers.<Product>lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));
        order.setProductList(productList);
        List<User> userList = userService.list(Wrappers.<User>lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));
        order.setUserList(userList);
        elasticsearchRestTemplate.save(order);
    }

    @Override
    public void update(OrderPO before, OrderPO after) {
        Order order = new Order();
        BeanUtils.copyProperties(after,order);
        List<Product> productList = productService.list(Wrappers.<Product>lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));
        order.setProductList(productList);
        List<User> userList = userService.list(Wrappers.<User>lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));
        order.setUserList(userList);
        elasticsearchRestTemplate.save(order);
    }

    @Override
    public void delete(OrderPO orderPO) {
        elasticsearchRestTemplate.delete(orderPO.getId().toString(),Order.class);
    }
}

3.3 测试

1、新增一条订单数据

在这里插入图片描述

2、kibana中查看索引数据

GET my_order/_search

在这里插入图片描述

结果显示新增的订单表同步成功,并且两张子表的数据也成功同步了。

3、再修改一下订单数据

在这里插入图片描述

kibana查看索引,显示同步成功

在这里插入图片描述

4、我们将刚刚新增的订单数据在数据库中删除

在这里插入图片描述

同时kibana中也删除成功,说明我们删除的同步也生效了。

在这里插入图片描述

3.4 子表数据修改,同步主表

上述我们演示了主表数据修改时,同步主表以及两张子表的数据;有时我们需要修改子表数据,但也需要实现数据同步。

这就需要我们实现一个子表的EntryHandler,用于监听子表的数据变化,其逻辑是子表数据更新时,查询主子表的数据,再同步更新到索引中即可。

注意要监听的是子表,每张子表一个监听器,如果需要监听两张子表,那么就需要分别创建两个监听器

@CanalTable("product")
@Component
@AllArgsConstructor
@Slf4j
public class ProductHandler implements EntryHandler<Product> {

    private final ElasticsearchRestTemplate elasticsearchRestTemplate;

    @Override
    public void insert(Product product) {
        // TODO
    }

    @Override
    public void update(Product before, Product after) {
        // TODO
    }

    @Override
    public void delete(Product product) {
        // TODO
        
    }
}

演示源码

文中演示源码可在如下地址下载:

git源码地址

总结

自此我们的数据同步就演示完成了,如果有更加复杂的同步逻辑,也可以在代码中自定义实现,并且第三方组件canal-spring-boot-starter极大的简化了我们自定义canal客户端的难度。

不过遗憾的是canal-spring-boot-starter的作者目前已经停止了对其的维护,其最新版对应的canal实际是1.1.3版本的,不过实测还不影响我们对接canal1.1.6。如果大家对canal客户端又更高性能的需求,可以研究源码,高度二开。

后续我们将给大家讲解如何实现类canal-spring-boot-starter这样的第三方依赖组件。感兴趣的同学可以关注专栏。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
30天前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
21 0
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
SQL DataWorks 关系型数据库
DataWorks常见问题之dataworks同步Rds任务失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
28天前
|
SQL 关系型数据库 MySQL
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
96 0
|
1月前
|
关系型数据库 MySQL 数据库
rds安装数据库客户端工具
安装阿里云RDS的数据库客户端涉及在本地安装对应类型(如MySQL、PostgreSQL)的客户端工具。对于MySQL,可选择MySQL Command-Line Client或图形化工具如Navicat,安装后输入RDS实例的连接参数进行连接。对于PostgreSQL,可以使用`psql`命令行工具或图形化客户端如PgAdmin。首先从阿里云控制台获取连接信息,然后按照官方文档安装客户端,最后配置客户端连接以确保遵循安全指引。
86 1
|
1天前
|
SQL 关系型数据库 MySQL
不允许你不知道的 MySQL 优化实战(一)
不允许你不知道的 MySQL 优化实战(一)
|
4天前
|
关系型数据库 MySQL 中间件
【MySQL实战笔记】07 | 行锁功过:怎么减少行锁对性能的影响?-02 死锁和死锁检测
【4月更文挑战第19天】在高并发环境下,死锁发生在多个线程间循环等待资源时,导致无限期等待。MySQL中,死锁可通过`innodb_lock_wait_timeout`参数设置超时或`innodb_deadlock_detect`开启死锁检测来解决。默认的50s超时可能不适用于在线服务,而频繁检测会消耗大量CPU。应对热点行更新引发的性能问题,可以暂时关闭死锁检测(风险是产生大量超时),控制并发度,或通过分散记录减少锁冲突,例如将数据分拆到多行以降低死锁概率。
19 1
|
7天前
|
SQL 关系型数据库 MySQL
Python与MySQL数据库交互:面试实战
【4月更文挑战第16天】本文介绍了Python与MySQL交互的面试重点,包括使用`mysql-connector-python`或`pymysql`连接数据库、执行SQL查询、异常处理、防止SQL注入、事务管理和ORM框架。易错点包括忘记关闭连接、忽视异常处理、硬编码SQL、忽略事务及过度依赖低效查询。通过理解这些问题和提供策略,可提升面试表现。
27 6
|
14天前
|
存储 关系型数据库 MySQL
【MySQL实战笔记】 04 | 深入浅出索引(上)-02
【4月更文挑战第9天】InnoDB数据库使用B+树作为索引模型,其中主键索引的叶子节点存储完整行数据,非主键索引则存储主键值。主键查询只需搜索一棵树,而非主键查询需两次搜索,因此推荐使用主键查询以提高效率。在插入新值时,B+树需要维护有序性,可能导致数据页分裂影响性能。自增主键在插入时可避免数据挪动和页分裂,且占用存储空间小,通常更为理想。然而,如果场景仅需唯一索引,可直接设为主键以减少查询步骤。
15 1
【MySQL实战笔记】 04 | 深入浅出索引(上)-02
|
16天前
|
存储 SQL 关系型数据库
【MySQL实战笔记】03.事务隔离:为什么你改了我还看不见?-02
【4月更文挑战第7天】数据库通过视图实现事务隔离,不同隔离级别如读未提交、读已提交、可重复读和串行化采用不同策略。以可重复读为例,MySQL使用多版本并发控制(MVCC),每个事务有其独立的视图。回滚日志在无更早视图时被删除。长事务可能导致大量存储占用,应避免。事务启动可显式用`begin`或设置`autocommit=0`,但后者可能意外开启长事务。建议使用`autocommit=1`并显式管理事务,若需减少交互,可使用`commit work and chain`。
30 5

推荐镜像

更多