Apache Doris Binlog Load使用方法及示例

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 下面会介绍一些Apache Doris Binlog Load使用方法及示例。

1. 安装配置 Mysql

  1. 安装Mysql
    快速使用Docker安装配置Mysql,具体参照下面的连接

    如果是在物理机上安装可以参考下面的连接:

    进入 Docker 容器或者物理机上修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,
log_bin=mysql_bin
binlog-format=Row
server-id=1
  1. 然后重启Mysql


systemctl restart mysqld
  1. 创建 Mysql 表


create database demo;
 CREATE TABLE `test_cdc` (
  `id` int NOT NULL AUTO_INCREMENT,
  `sex` TINYINT(1) DEFAULT NULL,
  `name` varchar(20) DEFAULT NULL,
  `address` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB

2. 安装配置Canal


  1. 解压Canal到指定目录:
tar zxvf canal.deployer-1.1.5.tar.gz -C ./canal
  1. 在conf文件夹下新建目录并重命名,作为instance的根目录,目录名你可以自己命名便于识别即可


例如我这里的命名是和我的数据库库名一致:demo

vi conf/demo/instance.properties
  1. 下面给出的是一个我的示例配置:


这里面的参数说明请参考Canal官方文档:QuickStart

#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=12115
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=10.220.146.11:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=zhangfeng
canal.instance.dbPassword=zhangfeng800729)(*Q
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=demo\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
#canal.mq.topic=
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
  1. 启动Canal
sh bin/startup.sh

注意:canal instance user/passwd

1.1.5 版本,在canal.properties里加上这两个配置

canal.user = canal canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

默认密码为canal/canal,canal.passwd的密码值可以通过select password(“xxx”) 来获取

  1. 验证是否启动成功


tail -200f logs/demo/demo.log
  1. image.png

3.开始同步数据


3.1 创建Doris目标表


用户需要先在Doris端创建好与Mysql端对应的目标表


Binlog Load只能支持Unique类型的目标表,且必须激活目标表的Batch Delete功能。


开启Batch Delete的方法可以参考help alter table中的批量删除功能。

CREATE TABLE `doris_mysql_binlog_demo` (
  `id` int NOT NULL,
  `sex` TINYINT(1),
  `name` varchar(20),
  `address` varchar(255) 
) ENGINE=OLAP
UNIQUE KEY(`id`,sex)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`sex`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
-- enable batch delete
ALTER TABLE test_2.doris_mysql_binlog_demo ENABLE FEATURE "BATCH_DELETE";

3.1 创建同步作业


3.1.1 Create Sync Job 语法说明


Name: ‘CREATE SYNC JOB’ Description:


数据同步(Sync Job)功能,支持用户提交一个常驻的数据同步作业,通过从指定的远端地址读取Binlog日志,增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能。


目前数据同步作业只支持对接Canal,从Canal Server上获取解析好的Binlog数据,导入到Doris内。


用户可通过 SHOW SYNC JOB 查看数据同步作业状态。


语法:

CREATE SYNC [db.]job_name
 (
    channel_desc, 
    channel_desc
    ...
 )
binlog_desc
  1. job_name
    同步作业名称,是作业在当前数据库内的唯一标识,相同job_name的作业只能有一个在运行。


  1. channel_desc作业下的数据通道,用来描述mysql源表到doris目标表的映射关系。语法:


FROM mysql_db.src_tbl INTO des_tbl
 [partitions]
 [columns_mapping]
  1. mysql_db.src_tbl
    指定mysql端的数据库和源表。
  2. des_tbl
    指定doris端的目标表,只支持Unique表,且需开启表的batch delete功能(开启方法请看help alter table的’批量删除功能’)。
  3. partitions
    指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
    示例:
PARTITION(p1, p2, p3)
  1. column_mapping

指定mysql源表和doris目标表的列之间的映射关系。如果不指定,FE会默认源表和目标表的列按顺序一一对应。


不支持 col_name = expr 的形式表示列。
示例:

假设目标表列为(k1, k2, v1),
 改变列k1和k2的顺序
 COLUMNS(k2, k1, v1)
 忽略源数据的第四列
 COLUMNS(k2, k1, v1, dummy_column)
  1. binlog_desc用来描述远端数据源,目前仅支持canal一种。语法:
FROM BINLOG
 (
     "key1" = "value1", 
     "key2" = "value2"
 )
  1. Canal 数据源对应的属性,以canal.为前缀
  1. canal.server.ip: canal server的地址
  2. canal.server.port: canal server的端口
  3. canal.destination: instance的标识
  4. canal.batchSize: 获取的batch大小的最大值,默认8192
  5. canal.username: instance的用户名
  6. canal.password: instance的密码
  7. canal.debug: 可选,设置为true时,会将batch和每一行数据的详细信息都打印出来 Examples:
  1. 简单为 test_dbtest_tbl 创建一个名为 job1 的数据同步作业,连接本地的Canal服务器,对应Mysql源表 mysql_db1.tbl1
CREATE SYNC `test_db`.`job1`
 (
     FROM `mysql_db1`.`tbl1` INTO `test_tbl `
 )
 FROM BINLOG 
 (
     "type" = "canal",
     "canal.server.ip" = "127.0.0.1",
     "canal.server.port" = "11111",
     "canal.destination" = "example",
     "canal.username" = "",
     "canal.password" = ""
 );
  1. test_db 的多张表创建一个名为 job1 的数据同步作业,一一对应多张Mysql源表,并显式的指定列映射。
CREATE SYNC `test_db`.`job1` 
 (
     FROM `mysql_db`.`t1` INTO `test1` COLUMNS(k1, k2, v1) PARTITIONS (p1, p2),
     FROM `mysql_db`.`t2` INTO `test2` COLUMNS(k3, k4, v2) PARTITION p1
 ) 
 FROM BINLOG 
 (
     "type" = "canal", 
     "canal.server.ip" = "xx.xxx.xxx.xx", 
     "canal.server.port" = "12111", 
     "canal.destination" = "example",  
     "canal.username" = "username", 
     "canal.password" = "password"
 );

3.1.2 开始同步mysql表里数据到Doris


注意:

创建同步任务之前,首先要在fe.conf里配置enable_create_sync_job=true,这个默认是false不启用,否则就不能创建同步任务

CREATE SYNC test_2.doris_mysql_binlog_demo_job 
(
    FROM demo.test_cdc INTO doris_mysql_binlog_demo
) 
FROM BINLOG 
(
    "type" = "canal", 
    "canal.server.ip" = "10.220.146.10", 
    "canal.server.port" = "11111", 
    "canal.destination" = "demo",  
    "canal.username" = "canal", 
    "canal.password" = "canal"
);

3.1.3 查看同步任务


SHOW SYNC JOB from test_2;

image.png


3.1.4 查看表里的数据


select * from doris_mysql_binlog_demo;

image.png


3.1.5 删除数据


我们在Mysql 数据表里删除数据,然后看Doris表里的变化


delete from test_cdc where id in (12,13)

我们在去看Doris表里,id是12,13这两条数据已经被删除


image.png


3.1.6 多表同步


多表同步只需要像下面这样写法就可以了


CREATE SYNC test_2.doris_mysql_binlog_demo_job 
(
    FROM demo.test_cdc INTO doris_mysql_binlog_demo,
    FROM demo.test_cdc_1 INTO doris_mysql_binlog_demo,
    FROM demo.test_cdc_2 INTO doris_mysql_binlog_demo,
    FROM demo.test_cdc_3 INTO doris_mysql_binlog_demo
)




相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
目录
相关文章
|
21天前
|
存储 自然语言处理 分布式计算
Apache Doris 3.1 正式发布:半结构化分析全面升级,湖仓一体能力再跃新高
Apache Doris 3.1 正式发布!全面升级半结构化分析,支持 VARIANT 稀疏列与模板化 Schema,提升湖仓一体能力,增强 Iceberg/Paimon 集成,优化存储引擎与查询性能,助力高效数据分析。
219 4
Apache Doris 3.1 正式发布:半结构化分析全面升级,湖仓一体能力再跃新高
|
17天前
|
SQL 人工智能 数据挖掘
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
Apache Doris 4.0 原生集成 LLM 函数,将大语言模型能力深度融入 SQL 引擎,实现文本处理智能化与数据分析一体化。通过十大函数,支持智能客服、内容分析、金融风控等场景,提升实时决策效率。采用资源池化管理,保障数据一致性,降低传输开销,毫秒级完成 AI 分析。结合缓存复用、并行执行与权限控制,兼顾性能、成本与安全,推动数据库向 AI 原生演进。
129 0
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
|
2月前
|
存储 分布式计算 Apache
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
小米通过将 Apache Doris(数据库)与 Apache Paimon(数据湖)深度融合,不仅解决了数据湖分析的性能瓶颈,更实现了 “1+1>2” 的协同效应。在这些实践下,小米在湖仓数据分析场景下获得了可观的业务收益。
464 9
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
|
2月前
|
人工智能 运维 监控
智能运维与数据治理:基于 Apache Doris 的 Data Agent 解决方案
本文基于 Apache Doris 数据运维治理 Agent 展开讨论,如何让 AI 成为 Doris 数据运维工程师和数据治理专家的智能助手,并在某些场景下实现对人工操作的全面替代。这种变革不仅仅是技术层面的进步,更是数据运维治理思维方式的根本性转变:从“被动响应”到“主动预防”,从“人工判断”到“智能决策”,从“孤立处理”到“协同治理”。
343 11
智能运维与数据治理:基于 Apache Doris 的 Data Agent 解决方案
|
1月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
436 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
10月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
698 33
The Past, Present and Future of Apache Flink
|
12月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1512 13
Apache Flink 2.0-preview released
|
12月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
413 3
|
7月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
854 1
Apache Flink 2.0.0: 实时数据处理的新纪元

推荐镜像

更多