Apache Doris Binlog Load使用方法及示例

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Binlog Load提供了一种使Doris增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能,使用户更方面的完成Mysql数据的导入。

Binlog Load提供了一种使Doris增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能,使用户更方面的完成Mysql数据的导入


注意:

该功能需要在0.15及以后的版本里使用

1. 安装配置 Mysql


  1. 安装Mysql


快速使用Docker安装配置Mysql,具体参照下面的连接

如果是在物理机上安装可以参考下面的连接:

  1. 开启Mysql binlog


进入 Docker 容器或者物理机上修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,

log_bin=mysql_bin
     binlog-format=Row
     server-id=1

然后重启Mysql


systemctl restart mysqld
  1. 创建 Mysql 表
create database demo;
   CREATE TABLE `test_cdc` (
   `id` int NOT NULL AUTO_INCREMENT,
   `sex` TINYINT(1) DEFAULT NULL,
   `name` varchar(20) DEFAULT NULL,
   `address` varchar(255) DEFAULT NULL,
   PRIMARY KEY (`id`)
   ) ENGINE=InnoDB

2. 安装配置Canal



  1. 解压Canal到指定目录:


tar zxvf canal.deployer-1.1.5.tar.gz -C ./canal


  1. 在conf文件夹下新建目录并重命名,作为instance的根目录,目录名你可以自己命名便于识别即可


例如我这里的命名是和我的数据库库名一致:demo


vi conf/demo/instance.properties

下面给出的是一个我的示例配置:
这里面的参数说明请参考Canal官方文档:[QuickStart](https://github.com/alibaba/canal/wiki/QuickStart)
#################################################
  ## mysql serverId , v1.0.26+ will autoGen
  canal.instance.mysql.slaveId=12115
  # enable gtid use true/false
  canal.instance.gtidon=false
  # position info
  canal.instance.master.address=10.220.146.11:3306
  canal.instance.master.journal.name=
  canal.instance.master.position=
  canal.instance.master.timestamp=
  canal.instance.master.gtid=
  # rds oss binlog
  canal.instance.rds.accesskey=
  canal.instance.rds.secretkey=
  canal.instance.rds.instanceId=
  # table meta tsdb info
  canal.instance.tsdb.enable=true
  #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
  #canal.instance.tsdb.dbUsername=canal
  #canal.instance.tsdb.dbPassword=canal
  #canal.instance.standby.address =
  #canal.instance.standby.journal.name =
  #canal.instance.standby.position =
  #canal.instance.standby.timestamp =
  #canal.instance.standby.gtid=
  # username/password
  canal.instance.dbUsername=zhangfeng
  canal.instance.dbPassword=zhangfeng800729)(*Q
  canal.instance.connectionCharset = UTF-8
  # enable druid Decrypt database password
  canal.instance.enableDruid=false
  #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
  # table regex
  canal.instance.filter.regex=demo\\..*
  # table black regex
  canal.instance.filter.black.regex=
  # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
  # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
  # mq config
  #canal.mq.topic=
  # dynamic topic route by schema or table regex
  #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
  #canal.mq.partition=0
  # hash partition config
  #canal.mq.partitionsNum=3
  #canal.mq.partitionHash=test.table:id^name,.*\\..*
  #################################################
  1. 启动Canal


sh bin/startup.sh

注意:canal instance user/passwd

1.1.5 版本,在canal.properties里加上这两个配置

canal.user = canal  canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

默认密码为canal/canal,canal.passwd的密码值可以通过select password("xxx") 来获取

  1. 验证是否启动成功


tail -200f logs/demo/demo.log

image.png

image-20211110145044815.png

3.开始同步数据

3.1 创建Doris目标表


用户需要先在Doris端创建好与Mysql端对应的目标表


Binlog Load只能支持Unique类型的目标表,且必须激活目标表的Batch Delete功能。

开启Batch Delete的方法可以参考help alter table中的批量删除功能。

CREATE TABLE `doris_mysql_binlog_demo` (
  `id` int NOT NULL,
  `sex` TINYINT(1),
  `name` varchar(20),
  `address` varchar(255) 
 ) ENGINE=OLAP
 UNIQUE KEY(`id`,sex)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`sex`) BUCKETS 1
 PROPERTIES (
 "replication_allocation" = "tag.location.default: 3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );
 -- enable batch delete
 ALTER TABLE test_2.doris_mysql_binlog_demo ENABLE FEATURE "BATCH_DELETE";

3.1 创建同步作业


3.1.1 Create Sync Job 语法说明


Name: 'CREATE SYNC JOB'  Description:


数据同步(Sync Job)功能,支持用户提交一个常驻的数据同步作业,通过从指定的远端地址读取Binlog日志,增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能。  目前数据同步作业只支持对接Canal,从Canal Server上获取解析好的Binlog数据,导入到Doris内。  用户可通过 SHOW SYNC JOB 查看数据同步作业状态。  语法:


CREATE SYNC [db.]job_name
  (
  channel_desc, 
  channel_desc
  ...
  )
 binlog_desc
  1. job_name
    同步作业名称,是作业在当前数据库内的唯一标识,相同job_name的作业只能有一个在运行。


  1. channel_desc
    作业下的数据通道,用来描述mysql源表到doris目标表的映射关系。
    语法:
FROM mysql_db.src_tbl INTO des_tbl
  [partitions]
  [columns_mapping]
1.  `mysql_db.src_tbl`
    指定mysql端的数据库和源表。
2.  `des_tbl`
    指定doris端的目标表,只支持Unique表,且需开启表的batch delete功能(开启方法请看help alter table的'批量删除功能')。
3.  `partitions`
    指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。

示例:

PARTITION(p1, p2, p3)
4.  `column_mapping`
    指定mysql源表和doris目标表的列之间的映射关系。如果不指定,FE会默认源表和目标表的列按顺序一一对应。

不支持 col_name = expr 的形式表示列。

示例:

假设目标表列为(k1, k2, v1),
改变列k1和k2的顺序
COLUMNS(k2, k1, v1)
忽略源数据的第四列
COLUMNS(k2, k1, v1, dummy_column)</pre>
  1. binlog_desc

用来描述远端数据源,目前仅支持canal一种。


语法:

FROM BINLOG
     (
      "key1" = "value1", 
      "key2" = "value2"
     )
  1. Canal 数据源对应的属性,以canal.为前缀


  1. canal.server.ip: canal server的地址
  2. canal.server.port: canal server的端口
  3. canal.destination: instance的标识
  4. canal.batchSize: 获取的batch大小的最大值,默认8192
  5. canal.username: instance的用户名
  6. canal.password: instance的密码
  7. canal.debug: 可选,设置为true时,会将batch和每一行数据的详细信息都打印出来  Examples:


  1. 简单为 test_dbtest_tbl 创建一个名为 job1 的数据同步作业,连接本地的Canal服务器,对应Mysql源表 mysql_db1.tbl1
CREATE SYNC `test_db`.`job1`
     (
      FROM `mysql_db1`.`tbl1` INTO `test_tbl `
     )
     FROM BINLOG 
     (
      "type" = "canal",
      "canal.server.ip" = "127.0.0.1",
      "canal.server.port" = "11111",
      "canal.destination" = "example",
      "canal.username" = "",
      "canal.password" = ""
     );
  1. test_db 的多张表创建一个名为 job1 的数据同步作业,一一对应多张Mysql源表,并显式的指定列映射。


CREATE SYNC `test_db`.`job1` 
     (
      FROM `mysql_db`.`t1` INTO `test1` COLUMNS(k1, k2, v1) PARTITIONS (p1, p2),
      FROM `mysql_db`.`t2` INTO `test2` COLUMNS(k3, k4, v2) PARTITION p1
     ) 
     FROM BINLOG 
     (
      "type" = "canal", 
      "canal.server.ip" = "xx.xxx.xxx.xx", 
      "canal.server.port" = "12111", 
      "canal.destination" = "example",
      "canal.username" = "username", 
      "canal.password" = "password"
     );

3.1.2 开始同步mysql表里数据到Doris


注意:

创建同步任务之前,首先要在fe.conf里配置enable_create_sync_job=true,这个默认是false不启用,否则就不能创建同步任务

CREATE SYNC test_2.doris_mysql_binlog_demo_job 
 (
  FROM demo.test_cdc INTO doris_mysql_binlog_demo
 ) 
 FROM BINLOG 
 (
  "type" = "canal", 
  "canal.server.ip" = "10.220.146.10", 
  "canal.server.port" = "11111", 
  "canal.destination" = "demo",
  "canal.username" = "canal", 
  "canal.password" = "canal"
 );

3.1.3 查看同步任务


SHOW SYNC JOB from test_2;

image.png


3.1.4 查看表里的数据

select * from doris_mysql_binlog_demo;

image.png


3.1.5 删除数据


我们在Mysql 数据表里删除数据,然后看Doris表里的变化


delete from test_cdc where id in (12,13)

我们在去看Doris表里,id是12,13这两条数据已经被删除


image.png

3.1.6 多表同步


多表同步只需要像下面这样写法就可以了

CREATE SYNC test_2.doris_mysql_binlog_demo_job 
 (
  FROM demo.test_cdc INTO doris_mysql_binlog_demo,
  FROM demo.test_cdc_1 INTO doris_mysql_binlog_demo,
  FROM demo.test_cdc_2 INTO doris_mysql_binlog_demo,
  FROM demo.test_cdc_3 INTO doris_mysql_binlog_demo
 )




相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
21天前
|
存储 自然语言处理 分布式计算
Apache Doris 3.1 正式发布:半结构化分析全面升级,湖仓一体能力再跃新高
Apache Doris 3.1 正式发布!全面升级半结构化分析,支持 VARIANT 稀疏列与模板化 Schema,提升湖仓一体能力,增强 Iceberg/Paimon 集成,优化存储引擎与查询性能,助力高效数据分析。
219 4
Apache Doris 3.1 正式发布:半结构化分析全面升级,湖仓一体能力再跃新高
|
17天前
|
SQL 人工智能 数据挖掘
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
Apache Doris 4.0 原生集成 LLM 函数,将大语言模型能力深度融入 SQL 引擎,实现文本处理智能化与数据分析一体化。通过十大函数,支持智能客服、内容分析、金融风控等场景,提升实时决策效率。采用资源池化管理,保障数据一致性,降低传输开销,毫秒级完成 AI 分析。结合缓存复用、并行执行与权限控制,兼顾性能、成本与安全,推动数据库向 AI 原生演进。
129 0
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
|
2月前
|
存储 分布式计算 Apache
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
小米通过将 Apache Doris(数据库)与 Apache Paimon(数据湖)深度融合,不仅解决了数据湖分析的性能瓶颈,更实现了 “1+1>2” 的协同效应。在这些实践下,小米在湖仓数据分析场景下获得了可观的业务收益。
464 9
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
|
2月前
|
人工智能 运维 监控
智能运维与数据治理:基于 Apache Doris 的 Data Agent 解决方案
本文基于 Apache Doris 数据运维治理 Agent 展开讨论,如何让 AI 成为 Doris 数据运维工程师和数据治理专家的智能助手,并在某些场景下实现对人工操作的全面替代。这种变革不仅仅是技术层面的进步,更是数据运维治理思维方式的根本性转变:从“被动响应”到“主动预防”,从“人工判断”到“智能决策”,从“孤立处理”到“协同治理”。
343 11
智能运维与数据治理:基于 Apache Doris 的 Data Agent 解决方案
|
28天前
|
SQL 运维 关系型数据库
深入探讨MySQL的二进制日志(binlog)选项
总结而言,对MySQL binlogs深度理解并妥善配置对数据库运维管理至关重要;它不仅关系到系统性能优化也是实现高可靠性架构设计必须考虑因素之一。通过精心规划与周密部署可以使得该机能充分发挥作用而避免潜在风险带来影响。
60 6
|
2月前
|
存储 SQL 关系型数据库
MySQL中binlog、redolog与undolog的不同之处解析
每个都扮演回答回溯与错误修正机构角色: BinLog像历史记载员详细记载每件大大小小事件; RedoLog则像紧急救援队伍遇见突發情況追踪最后活动轨迹尽力补救; UndoLog就类似时间机器可倒带历史让一切归位原始样貌同时兼具平行宇宙观察能让多人同时看见各自期望看见历程而互不干扰.
161 9
|
3月前
|
存储 SQL 关系型数据库
MySQL的Redo Log与Binlog机制对照分析
通过合理的配置和细致的管理,这两种日志机制相互配合,能够有效地提升MySQL数据库的可靠性和稳定性。
135 10
|
5月前
|
SQL 监控 关系型数据库
MySQL日志分析:binlog、redolog、undolog三大日志的深度探讨。
数据库管理其实和写小说一样,需要规划,需要修订,也需要有能力回滚。理解这些日志的作用与优化,就像把握写作工具的使用与运用,为我们的数据库保驾护航。
223 23
|
10月前
|
存储 SQL 关系型数据库
mysql 的ReLog和BinLog区别
MySQL中的重做日志和二进制日志是确保数据库稳定性和可靠性的关键组件。重做日志主要用于事务的持久性和原子性,通过记录数据页的物理修改信息来恢复未提交的事务;而二进制日志记录SQL语句的逻辑变化,支持数据复制、恢复和审计。两者在写入时机、存储方式及配置参数等方面存在显著差异。
206 6
|
8月前
|
存储 SQL 关系型数据库
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
649 35
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log

推荐镜像

更多