Apache Doris Binlog Load使用方法及示例

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 下面会介绍一些Apache Doris Binlog Load使用方法及示例。

1. 安装配置 Mysql

  1. 安装Mysql
    快速使用Docker安装配置Mysql,具体参照下面的连接

    如果是在物理机上安装可以参考下面的连接:

    进入 Docker 容器或者物理机上修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,
log_bin=mysql_bin
binlog-format=Row
server-id=1
  1. 然后重启Mysql


systemctl restart mysqld
  1. 创建 Mysql 表


create database demo;
 CREATE TABLE `test_cdc` (
  `id` int NOT NULL AUTO_INCREMENT,
  `sex` TINYINT(1) DEFAULT NULL,
  `name` varchar(20) DEFAULT NULL,
  `address` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB

2. 安装配置Canal


  1. 解压Canal到指定目录:
tar zxvf canal.deployer-1.1.5.tar.gz -C ./canal
  1. 在conf文件夹下新建目录并重命名,作为instance的根目录,目录名你可以自己命名便于识别即可


例如我这里的命名是和我的数据库库名一致:demo

vi conf/demo/instance.properties
  1. 下面给出的是一个我的示例配置:


这里面的参数说明请参考Canal官方文档:QuickStart

#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=12115
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=10.220.146.11:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=zhangfeng
canal.instance.dbPassword=zhangfeng800729)(*Q
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=demo\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
#canal.mq.topic=
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
  1. 启动Canal
sh bin/startup.sh

注意:canal instance user/passwd

1.1.5 版本,在canal.properties里加上这两个配置

canal.user = canal canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

默认密码为canal/canal,canal.passwd的密码值可以通过select password(“xxx”) 来获取

  1. 验证是否启动成功


tail -200f logs/demo/demo.log
  1. image.png

3.开始同步数据


3.1 创建Doris目标表


用户需要先在Doris端创建好与Mysql端对应的目标表


Binlog Load只能支持Unique类型的目标表,且必须激活目标表的Batch Delete功能。


开启Batch Delete的方法可以参考help alter table中的批量删除功能。

CREATE TABLE `doris_mysql_binlog_demo` (
  `id` int NOT NULL,
  `sex` TINYINT(1),
  `name` varchar(20),
  `address` varchar(255) 
) ENGINE=OLAP
UNIQUE KEY(`id`,sex)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`sex`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
-- enable batch delete
ALTER TABLE test_2.doris_mysql_binlog_demo ENABLE FEATURE "BATCH_DELETE";

3.1 创建同步作业


3.1.1 Create Sync Job 语法说明


Name: ‘CREATE SYNC JOB’ Description:


数据同步(Sync Job)功能,支持用户提交一个常驻的数据同步作业,通过从指定的远端地址读取Binlog日志,增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能。


目前数据同步作业只支持对接Canal,从Canal Server上获取解析好的Binlog数据,导入到Doris内。


用户可通过 SHOW SYNC JOB 查看数据同步作业状态。


语法:

CREATE SYNC [db.]job_name
 (
    channel_desc, 
    channel_desc
    ...
 )
binlog_desc
  1. job_name
    同步作业名称,是作业在当前数据库内的唯一标识,相同job_name的作业只能有一个在运行。


  1. channel_desc作业下的数据通道,用来描述mysql源表到doris目标表的映射关系。语法:


FROM mysql_db.src_tbl INTO des_tbl
 [partitions]
 [columns_mapping]
  1. mysql_db.src_tbl
    指定mysql端的数据库和源表。
  2. des_tbl
    指定doris端的目标表,只支持Unique表,且需开启表的batch delete功能(开启方法请看help alter table的’批量删除功能’)。
  3. partitions
    指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
    示例:
PARTITION(p1, p2, p3)
  1. column_mapping

指定mysql源表和doris目标表的列之间的映射关系。如果不指定,FE会默认源表和目标表的列按顺序一一对应。


不支持 col_name = expr 的形式表示列。
示例:

假设目标表列为(k1, k2, v1),
 改变列k1和k2的顺序
 COLUMNS(k2, k1, v1)
 忽略源数据的第四列
 COLUMNS(k2, k1, v1, dummy_column)
  1. binlog_desc用来描述远端数据源,目前仅支持canal一种。语法:
FROM BINLOG
 (
     "key1" = "value1", 
     "key2" = "value2"
 )
  1. Canal 数据源对应的属性,以canal.为前缀
  1. canal.server.ip: canal server的地址
  2. canal.server.port: canal server的端口
  3. canal.destination: instance的标识
  4. canal.batchSize: 获取的batch大小的最大值,默认8192
  5. canal.username: instance的用户名
  6. canal.password: instance的密码
  7. canal.debug: 可选,设置为true时,会将batch和每一行数据的详细信息都打印出来 Examples:
  1. 简单为 test_dbtest_tbl 创建一个名为 job1 的数据同步作业,连接本地的Canal服务器,对应Mysql源表 mysql_db1.tbl1
CREATE SYNC `test_db`.`job1`
 (
     FROM `mysql_db1`.`tbl1` INTO `test_tbl `
 )
 FROM BINLOG 
 (
     "type" = "canal",
     "canal.server.ip" = "127.0.0.1",
     "canal.server.port" = "11111",
     "canal.destination" = "example",
     "canal.username" = "",
     "canal.password" = ""
 );
  1. test_db 的多张表创建一个名为 job1 的数据同步作业,一一对应多张Mysql源表,并显式的指定列映射。
CREATE SYNC `test_db`.`job1` 
 (
     FROM `mysql_db`.`t1` INTO `test1` COLUMNS(k1, k2, v1) PARTITIONS (p1, p2),
     FROM `mysql_db`.`t2` INTO `test2` COLUMNS(k3, k4, v2) PARTITION p1
 ) 
 FROM BINLOG 
 (
     "type" = "canal", 
     "canal.server.ip" = "xx.xxx.xxx.xx", 
     "canal.server.port" = "12111", 
     "canal.destination" = "example",  
     "canal.username" = "username", 
     "canal.password" = "password"
 );

3.1.2 开始同步mysql表里数据到Doris


注意:

创建同步任务之前,首先要在fe.conf里配置enable_create_sync_job=true,这个默认是false不启用,否则就不能创建同步任务

CREATE SYNC test_2.doris_mysql_binlog_demo_job 
(
    FROM demo.test_cdc INTO doris_mysql_binlog_demo
) 
FROM BINLOG 
(
    "type" = "canal", 
    "canal.server.ip" = "10.220.146.10", 
    "canal.server.port" = "11111", 
    "canal.destination" = "demo",  
    "canal.username" = "canal", 
    "canal.password" = "canal"
);

3.1.3 查看同步任务


SHOW SYNC JOB from test_2;

image.png


3.1.4 查看表里的数据


select * from doris_mysql_binlog_demo;

image.png


3.1.5 删除数据


我们在Mysql 数据表里删除数据,然后看Doris表里的变化


delete from test_cdc where id in (12,13)

我们在去看Doris表里,id是12,13这两条数据已经被删除


image.png


3.1.6 多表同步


多表同步只需要像下面这样写法就可以了


CREATE SYNC test_2.doris_mysql_binlog_demo_job 
(
    FROM demo.test_cdc INTO doris_mysql_binlog_demo,
    FROM demo.test_cdc_1 INTO doris_mysql_binlog_demo,
    FROM demo.test_cdc_2 INTO doris_mysql_binlog_demo,
    FROM demo.test_cdc_3 INTO doris_mysql_binlog_demo
)




相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
打赏
0
0
0
0
18
分享
相关文章
数据无界、湖仓无界,Apache Doris 湖仓一体典型场景实战指南(下篇)
Apache Doris 提出“数据无界”和“湖仓无界”理念,提供高效的数据管理方案。本文聚焦三个典型应用场景:湖仓分析加速、多源联邦分析、湖仓数据处理,深入介绍 Apache Doris 的最佳实践,帮助企业快速响应业务需求,提升数据处理和分析效率
数据无界、湖仓无界,Apache Doris 湖仓一体典型场景实战指南(下篇)
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
湖仓一体架构融合了数据湖的低成本、高扩展性,以及数据仓库的高性能、强数据治理能力,高效应对大数据时代的挑战。为助力企业实现湖仓一体的建设,Apache Doris 提出了数据无界和湖仓无界核心理念,并结合自身特性,助力企业加速从 0 到 1 构建湖仓体系,降低转型过程中的风险和成本。本文将对湖仓一体演进及 Apache Doris 湖仓一体方案进行介绍。
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
日志数据已成为企业洞察系统状态、监控网络安全及分析业务动态的宝贵资源。网易云音乐引入 Apache Doris 作为日志库新方案,替换了 ClickHouse。解决了 ClickHouse 运维复杂、不支持倒排索引的问题。目前已经稳定运行 3 个季度,规模达到 50 台服务器, 倒排索引将全文检索性能提升7倍,2PB 数据,每天新增日志量超过万亿条,峰值写入吞吐 6GB/s 。
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
Apache Doris 3.0.4 版本正式发布
该版本持续在存算分离、湖仓一体、异步物化视图等方面进行改进提升与问题修复
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
419 33
The Past, Present and Future of Apache Flink
|
5月前
|
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1077 13
Apache Flink 2.0-preview released
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
184 3
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
您有一份 Apache Flink 社区年度报告请查收~
您有一份 Apache Flink 社区年度报告请查收~
Apache Flink 2.0:Streaming into the Future
本文整理自阿里云智能高级技术专家宋辛童、资深技术专家梅源和高级技术专家李麟在 Flink Forward Asia 2024 主会场的分享。三位专家详细介绍了 Flink 2.0 的四大技术方向:Streaming、Stream-Batch Unification、Streaming Lakehouse 和 AI。主要内容包括 Flink 2.0 的存算分离云原生化、流批一体的 Materialized Table、Flink 与 Paimon 的深度集成,以及 Flink 在 AI 领域的应用。
666 13
Apache Flink 2.0:Streaming into the Future

热门文章

最新文章

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等