【建议收藏】Mysql+Flink CDC+Doris 数据同步实战(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 【建议收藏】Mysql+Flink CDC+Doris 数据同步实战

配置文件

doris
├── doris.conf
├── flink.conf
└── tables
mysql
├── flink.conf
├── mysql.conf
└── tables
flink
├── flink_conf
注意mysql的tables和doris的tables是一一对应的关系

文件详解

```mysql中flink.conf
-- 指定binlog消费方式
'scan.startup.mode' = 'initial'
```flink.conf
-- 运行的yarn模式,需要用户配置flink on yarn
set 'execution.target' = 'yarn-per-job';
-- 运行的yarn任务的名称
set 'yarn.application.name' = 'flinkjob_database';
-- checkpoint配置
set 'state.backend' = 'filesystem';
set 'state.checkpoints.dir' = 'hdfs:///ck';
set 'execution.checkpointing.interval' = '6000';
set 'execution.checkpointing.tolerable-failed-checkpoints' = '1';
set 'execution.checkpointing.timeout' ='600000';
set 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
set 'execution.checkpointing.max-concurrent-checkpoints' = '1';
-- 资源配置
set 'jobmanager.memory.process.size' = '1600m';
set 'taskmanager.memory.process.size' = '1780m';
set 'taskmanager.memory.managed.size' = '100m';
set 'taskmanager.numberoftaskslots' = '1';
```doris中flink.conf
-- 两阶段提交
'sink.properties.two_phase_commit' = 'true'

开始执行,去bin目录下执行脚本生成文件

文件生成格式:

每个Mysql库等于一个Flink任务等于一个Flink脚本文件

每个Flink脚本文件由一个Mysql脚本语句和一个Doris映射语句和一个insert同步语句构成

所以生成格式为

mysql_db.sql
doris_mysqldb.sql
insert_mysqldb.sql
flink_mysqldb.sql

原因如下:

首先为了统一命名格式便于理解

其次如果以doris库命名很有可能mysql10个库同步至doris一个库当中,就导致只会生成一个flinkjob

测试先:准备好insert_into和delete语句

INSERT INTO emp_2.employees_2 VALUES (11,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(22,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(33,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(44,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02')
delete from emp_2.employees_2 where emp_no = 11;
delete from emp_2.employees_2 where emp_no = 22;
delete from emp_2.employees_2 where emp_no = 33;
delete from emp_2.employees_2 where emp_no = 44;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10091 ;
update emp_2.employees_2 set first_name = 'toms' where emp_no= 10092;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10093;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10094;
统计语句
use emp_1;
select count(1) from employees_1;
select count(1) from employees_2;
use emp_2;
select count(1) from employees_1;
select count(1) from employees_2;

insert端的实现:

for m_table in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}')
        do
        let a++
        m_d=`cat ../conf/mysql/tables |grep -v '#' | awk "NR==$a{print}" |awk -F '.' '{print $1}'`
        d_table=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$a{print}"`
        sed "/$d_table$sink/,/KEY(/{//d;s/ *//;p};d" ../result/doris_$m_d.sql |awk '!x[$0]++' |awk '{print $0}'| awk -F '`' '{print $2}'|awk -F '\n' '{print $1","}' |sed '$s/.$//' > a.c
        ac=`cat a.c`
        m_d=`echo $m_table | awk -F '.' '{print $1}'`
        echo -e "insert into \`$d_table$sink\`\nselect\n${ac}\nfrom\n\`$m_table$src\`\nwhere 1=1;\n\n" >> ../result/insert_$m_d.sql
        rm -rf a.c
done
CREATE TABLE `demo.all_employees_info_sink1` (
  `emp_no` int NOT NULL COMMENT '',
  `birth_date` date NULL COMMENT '',
  `first_name` varchar(20) NULL COMMENT '',
  `last_name` varchar(20) NULL COMMENT '',
  `gender` string NULL COMMENT '',
  `hire_date` date NULL COMMENT '',
  `database_name` varchar(50) NULL COMMENT '',
  `table_name` varchar(200) NULL COMMENT '',
PRIMARY KEY(`emp_no`, `birth_date`)
NOT ENFORCED
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

source端的实现:

label=0
for t_name in $(cat ../conf/doris/tables | awk -F '\n' '{print $1}' | awk -F '.' '{print $2}')
        do
        let label++
        d_label=`cat ../conf/mysql/tables | awk "NR == $label"{print} |sed 's/\./\_/g'`
        m_d=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        d_d=`cat ../conf/doris/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        m_t=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $2}'`
        sed -i "0,/doris_username/s/doris_username/${fe_master_username}/" ../result/$t.sql
        sed -i "0,/doris_password/s/doris_password/${fe_master_password}/" ../result/$t.sql
        sed -i "0,/doris_table/s/doris_table/${d_d}.${t_name}/" ../result/$t.sql
        sed -i "0,/doris_connector/s/doris_connector/doris/" ../result/$t.sql
        sed -i "0,/doris_fenodes/s/doris_fenodes/${fe_master_host}:${fe_load_url_port}/" ../result/$t.sql
        sed -i "0,/doris_label\-prefix/s/doris_label\-prefix/${m_d}_${m_t}\_`date "+%y%m%d_%H%M%S"_$label`/" ../result/$t.sql
done
create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20) null comment '',
  `last_name` varchar(20) null comment '',
  `gender` string null comment '',
  `hire_date` date null comment '',
  `database_name` varchar(50) null comment '',
  `table_name` varchar(200) null comment '',
primary key(`emp_no`, `birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

sink端的实现:

for i in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}'|awk -F '.' '{print $1}' |sort -u)
        do
        sed -i '1iBEGIN STATEMENT SET;' ../result/insert_$i.sql
        sed -i '$aEND;' ../result/insert_$i.sql
        b=0
        for table in $(cat ../conf/doris/tables |grep -v '#' | awk -F '\n' '{print $1}')
                do
                let b++
                d_doris=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$b"`
                sed -i "0,/into \`${d_doris}_sink\`/s/into \`${d_doris}_sink\`/into \`${d_doris}_sink${b}\`/" ../result/insert_$i.sql
        done
done
create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20) null comment '',
  `last_name` varchar(20) null comment '',
  `gender` string null comment '',
  `hire_date` date null comment ''
primary key(`emp_no`, `birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

实现从checkpoint恢复任务:

1、小的改动:

  • 获取从job的web获取到checkpoint的url(注意每30s都会更改一次)
  • 修改文件内容
  • 修改label
  • 修改最终的checkpointurl

    640.png
  • 保存文件

碰到的问题:调整sql,相当变更算子,这时执行重新checkpoit会报错,需忽略这些新算子

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://127.0.0.1/fck/78f7cb6b577fe6db19648ca63607e640/chk-66. Cannot map checkpoint/savepoint state for operator e75d4004e6c5f0908bd4077fcf200fcd to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

640.png

2、大的改动

  • cp出一份mysql_to_doris
  • 重新配置表结构,执行sh flinkjob.sh
  • 复制文件
  • 整体粘贴
  • 配置最终checkpoint的url

6、展望:

  • 代码优化,将变量重新命名,调整格式以及通过变量简化代码量
  • 数据类型转换规范化,在此基础上继续填充
  • 优化checkpoint点,达到只需修改配置项即可同步的效果


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
99 1
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
2天前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
2月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
513 1
Flink CDC:新一代实时数据集成框架
|
2月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
519 14
Flink CDC 在货拉拉的落地与实践
|
17天前
|
消息中间件 NoSQL 关系型数据库
一文彻底搞定Redis与MySQL的数据同步
【10月更文挑战第21天】本文介绍了 Redis 与 MySQL 数据同步的原因及实现方式。同步的主要目的是为了优化性能和保持数据一致性。实现方式包括基于数据库触发器、应用层双写和使用消息队列。每种方式都有其优缺点,需根据具体场景选择合适的方法。此外,文章还强调了数据同步时需要注意的数据一致性、性能优化和异常处理等问题。
179 0
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之两个数据表是否可以同时进行双向的数据同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之Oracle数据库是集群部署的,怎么进行数据同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
Oracle 关系型数据库 分布式数据库
实时计算 Flink版产品使用问题之怎么实现跨多个DRDS的数据同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。