使用canal将MySQL数据库数据实时同步到oceanbase

本文涉及的产品
RDS AI 助手,专业版
RDS Agent(兼容OpenClaw),2核4GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: 使用canal同步mysql数据至oceanbase


实验的环境是virtulbox虚拟机一台,操作系统为centos7,源库为mariadb,oceanbase监听地址为127.0.0.1,源库和目的库都在这个虚拟及上,canal也安装部署在同一虚拟机上。
1 下载canal.deployer
[root@localhost /]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

2 MySQL数据库操作
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON . TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3 部署 Canal Deployer
  将canal deployer解压到 /usr/local/canal 目录下,canal 目录需要提前创建
tar -zxvf canal.deployer-1.1.5.tar.gz -C /usr/local/canal

      使用手动部署,编辑canal deployer 解压目录(/usr/local/canal)下conf/example/instance.properties文件

[root@localhost example]# vi instance.properties

编辑完的文件如下
[root@localhost example]# cat instance.properties
     

  #################################################
        ## mysql serverId , v1.0.26+ will autoGen
        # canal.instance.mysql.slaveId=0

        # enable gtid use true/false
        canal.instance.gtidon=false

        # position info
        canal.instance.master.address=127.0.0.1:3306
        canal.instance.master.journal.name=
        canal.instance.master.position=
        canal.instance.master.timestamp=
        canal.instance.master.gtid=

        # rds oss binlog
        canal.instance.rds.accesskey=
        canal.instance.rds.secretkey=
        canal.instance.rds.instanceId=

        # table meta tsdb info
        canal.instance.tsdb.enable=true
        #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
        #canal.instance.tsdb.dbUsername=canal
        #canal.instance.tsdb.dbPassword=canal

        #canal.instance.standby.address =
        #canal.instance.standby.journal.name =
        #canal.instance.standby.position =
        #canal.instance.standby.timestamp =
        #canal.instance.standby.gtid=

        # username/password
        canal.instance.dbUsername=canal
        canal.instance.dbPassword=canal
        canal.instance.connectionCharset = UTF-8
        # enable druid Decrypt database password
        canal.instance.enableDruid=false
        #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

        # table regex
        canal.instance.filter.regex=.\..
        # table black regex
        canal.instance.filter.black.regex=mysql\.slave_.*
        # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
        #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
        # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
        #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

        # mq config
        canal.mq.topic=example
        # dynamic topic route by schema or table regex
        #canal.mq.dynamicTopic=mytest1.user,mytest2\..,.\..*
        canal.mq.partition=0
        # hash partition config
        #canal.mq.partitionsNum=3
        #canal.mq.partitionHash=test.table:id^name,.\..
        #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
        #################################################

       这个文件比较长,如果使用缺省的example实例,可以不对实例属性文件进行更改,只在源数据库创建用户canal,密码为canal,授予必要的权限即可。
      启动canal deployer,进入canal解压目录

[root@localhost canal]# sh bin/startup.sh

4 部署canal adapter
    下载canal adapter 解压至目录/usr/local/canal-adapter 目录下,编辑application.yml配置文件,注意冒号后面要有空格
[root@localhost conf]# pwd
/usr/local/canal-adapter/conf
[root@localhost conf]# cat application.yml
      server:
        port: 8081
      spring:
        jackson:
          date-format: yyyy-MM-dd HH:mm:ss
          time-zone: GMT+8
          default-property-inclusion: non_null

      canal.conf:
        mode: tcp #tcp kafka rocketMQ rabbitMQ
        flatMessage: true
        zookeeperHosts:
        syncBatchSize: 1000
        retries: 0
        timeout:
        accessKey:
        secretKey:
        consumerProperties:
          # canal tcp consumer
          canal.tcp.server.host: 127.0.0.1:11111
          canal.tcp.zookeeper.hosts:
          canal.tcp.batch.size: 500
          canal.tcp.username: canal  ##这里填入登陆源库用户名
          canal.tcp.password: canal  ##这里填入登陆源库的用户密码
        canalAdapters:
        - instance: example # canal 实例名
          groups:
          - groupId: g1
            outerAdapters:
            - name: logger
            - name: rdb
              key: mysql1
              properties:
                jdbc.driverClassName: com.mysql.jdbc.Driver ##使用mysql官方驱动
                jdbc.url: jdbc:mysql://127.0.0.1:2883/test?useUnicode=true
                jdbc.username: root@my_tenant#obcluster ##登陆oceanbase数据库的用户名,格式为用户名@租户名#集群名称
                jdbc.password:  ##密码,oceanbase里my_tenant租户root用户没有设置密码,生产环境不能为空

      mytest_user.yml文件,这里选择的镜像数据库方式

[root@localhost rdb]# pwd
    /usr/local/canal-adapter/conf/rdb

[root@localhost rdb]# vi mytest_user.yml
    ## Mirror schema synchronize config
    dataSourceKey: defaultDS
    destination: example  ##这里canal server 的实例名
    groupId: g1
    outerAdapterKey: mysql1   ##填入application.yml的outerAdapter key
    concurrent: true
    dbMapping:
      mirrorDb: true
      database: test          ##要镜像的数据库

5 测试数据实时迁移效果
5.1 数据插入
    源库操作
MariaDB [test]> INSERT INTO user
        ->        (name, age, address, sex)
        -> VALUES
        ->        ('zhangsan', 21, 'jiangxi', 0),
        ->        ('lisi', 22, 'hubei', 0),
        ->        ('wangwu', 23, 'hunan', 0),
        ->        ('lilei', 24, 'henan', 1),
        ->        ('hanmeimei', 25, 'hebei', 1),
        ->        ('xiaoming', 26, 'shandong', 1),
        ->        ('xiaoli', 27, 'shanxi', 1) ;
    Query OK, 7 rows affected (0.01 sec)
    Records: 7  Duplicates: 0  Warnings: 0

    MariaDB [test]> select * from user;
    +----+-----------+------+----------+------+
    | id | name      | age  | address  | sex  |
    +----+-----------+------+----------+------+
    |  1 | zhangsan  |   21 | jiangxi  |    0 |
    |  2 | lisi      |   22 | hubei    |    0 |
    |  3 | wangwu    |   23 | hunan    |    0 |
    |  4 | lilei     |   24 | henan    |    1 |
    |  5 | hanmeimei |   25 | hebei    |    1 |
    |  6 | xiaoming  |   26 | shandong |    1 |
    |  7 | xiaoli    |   27 | shanxi   |    1 |
    +----+-----------+------+----------+------+
    7 rows in set (0.00 sec)

oceanbase目标库查询
 

  MySQL [test]> select * from user;
    +----+-----------+------+----------+------+
    | id | name      | age  | address  | sex  |
    +----+-----------+------+----------+------+
    |  1 | zhangsan  |   21 | jiangxi  |    0 |
    |  2 | lisi      |   22 | hubei    |    0 |
    |  3 | wangwu    |   23 | hunan    |    0 |
    |  4 | lilei     |   24 | henan    |    1 |
    |  5 | hanmeimei |   25 | hebei    |    1 |
    |  6 | xiaoming  |   26 | shandong |    1 |
    |  7 | xiaoli    |   27 | shanxi   |    1 |
    +----+-----------+------+----------+------+
    7 rows in set (0.003 sec)

5.2 创建表
源库操作
MariaDB [test]> CREATE TABLE student_score(sid INT PRIMARY KEY NOT NULL, sname VARCHAR(30), sage INT, ssex VARCHAR(8), score INT(11));
    Query OK, 0 rows affected (0.07 sec)

MariaDB [test]> show tables like 'student_score';
    +--------------------------------+
    | Tables_in_test (student_score) |
    +--------------------------------+
    | student_score                  |
    +--------------------------------+
    1 row in set (0.00 sec)

目标库查询

MySQL [test]> show tables like 'stu%';
    +-----------------------+
    | Tables_in_test (stu%) |
    +-----------------------+
    | student_score         |
    +-----------------------+
    1 row in set (0.006 sec)
5.3 给表添加列

源库操作

MariaDB [test]> alter table student_score add column class varchar(20);
    Query OK, 0 rows affected (0.09 sec)
    Records: 0  Duplicates: 0  Warnings: 0

MariaDB [test]> desc student_score;
    +-------+-------------+------+-----+---------+-------+
    | Field | Type        | Null | Key | Default | Extra |
    +-------+-------------+------+-----+---------+-------+
    | sid   | int(11)     | NO   | PRI | NULL    |       |
    | sname | varchar(30) | YES  |     | NULL    |       |
    | sage  | int(11)     | YES  |     | NULL    |       |
    | ssex  | varchar(8)  | YES  |     | NULL    |       |
    | score | int(11)     | YES  |     | NULL    |       |
    | class | varchar(20) | YES  |     | NULL    |       |
    +-------+-------------+------+-----+---------+-------+
    6 rows in set (0.01 sec)

目标库操作

MySQL [test]> desc student_score;
    +-------+-------------+------+-----+---------+-------+
    | Field | Type        | Null | Key | Default | Extra |
    +-------+-------------+------+-----+---------+-------+
    | sid   | int(11)     | NO   | PRI | NULL    |       |
    | sname | varchar(30) | YES  |     | NULL    |       |
    | sage  | int(11)     | YES  |     | NULL    |       |
    | ssex  | varchar(8)  | YES  |     | NULL    |       |
    | score | int(11)     | YES  |     | NULL    |       |
    | class | varchar(20) | YES  |     | NULL    |       |
    +-------+-------------+------+-----+---------+-------+
    6 rows in set (0.011 sec)

6 总结及注意事项

   canal安装配置还是比较简单的,测试中发现create table as select 只能同步表定义,数据则没有同步过去。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
11月前
|
存储 JSON 关系型数据库
【干货满满】解密 API 数据解析:从 JSON 到数据库存储的完整流程
本文详解电商API开发中JSON数据解析与数据库存储的全流程,涵盖数据提取、清洗、转换及优化策略,结合Python实战代码与主流数据库方案,助开发者构建高效、可靠的数据处理管道。
|
存储 缓存 数据库
数据库数据删除策略:硬删除vs软删除的最佳实践指南
在项目开发中,“删除”操作常见但方式多样,主要分为硬删除与软删除。硬删除直接从数据库移除数据,操作简单、高效,但不可恢复;适用于临时或敏感数据。软删除通过标记字段保留数据,支持恢复和审计,但增加查询复杂度与数据量;适合需追踪历史或可恢复的场景。两者各有优劣,实际开发中常结合使用以满足不同需求。
1327 4
|
9月前
|
数据采集 关系型数据库 MySQL
python爬取数据存入数据库
Python爬虫结合Scrapy与SQLAlchemy,实现高效数据采集并存入MySQL/PostgreSQL/SQLite。通过ORM映射、连接池优化与批量提交,支持百万级数据高速写入,具备良好的可扩展性与稳定性。
|
10月前
|
存储 数据管理 数据库
数据字典是什么?和数据库、数据仓库有什么关系?
在数据处理中,你是否常困惑于字段含义、指标计算或数据来源?数据字典正是解答这些问题的关键工具,它清晰定义数据的名称、类型、来源、计算方式等,服务于开发者、分析师和数据管理者。本文详解数据字典的定义、组成及其与数据库、数据仓库的关系,助你夯实数据基础。
数据字典是什么?和数据库、数据仓库有什么关系?
|
9月前
|
人工智能 Java 关系型数据库
使用数据连接池进行数据库操作
使用数据连接池进行数据库操作
227 11
|
10月前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL数据库的WAL日志与数据写入的过程
PostgreSQL中的WAL(预写日志)是保证数据完整性的关键技术。在数据修改前,系统会先将日志写入WAL,确保宕机时可通过日志恢复数据。它减少了磁盘I/O,提升了性能,并支持手动切换日志文件。WAL文件默认存储在pg_wal目录下,采用16进制命名规则。此外,PostgreSQL提供pg_waldump工具解析日志内容。
938 0
|
存储 SQL Java
数据存储使用文件还是数据库,哪个更合适?
数据库和文件系统各有优劣:数据库读写性能较低、结构 rigid,但具备计算能力和数据一致性保障;文件系统灵活易管理、读写高效,但缺乏计算能力且无法保证一致性。针对仅需高效存储与灵活管理的场景,文件系统更优,但其计算短板可通过开源工具 SPL(Structured Process Language)弥补。SPL 提供独立计算语法及高性能文件格式(如集文件、组表),支持复杂计算与多源混合查询,甚至可替代数据仓库。此外,SPL 易集成、支持热切换,大幅提升开发运维效率,是后数据库时代文件存储的理想补充方案。
|
canal 关系型数据库 MySQL
Canal是怎么伪装成 MySQL slave?
Canal是怎么伪装成 MySQL slave?
10284 41
|
存储 关系型数据库 MySQL
【赵渝强老师】OceanBase数据库从零开始:MySQL模式
《OceanBase数据库从零开始:MySQL模式》是一门包含11章的课程,涵盖OceanBase分布式数据库的核心内容。从体系架构、安装部署到租户管理、用户安全,再到数据库对象操作、事务与锁机制,以及应用程序开发、备份恢复、数据迁移等方面进行详细讲解。此外,还涉及连接路由管理和监控诊断等高级主题,帮助学员全面掌握OceanBase数据库的使用与管理。
633 5

推荐镜像

更多