【建议收藏】Mysql+Flink CDC+Doris 数据同步实战(下)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 【建议收藏】Mysql+Flink CDC+Doris 数据同步实战

配置文件

doris
├── doris.conf
├── flink.conf
└── tables
mysql
├── flink.conf
├── mysql.conf
└── tables
flink
├── flink_conf
注意mysql的tables和doris的tables是一一对应的关系

文件详解

```mysql中flink.conf
-- 指定binlog消费方式
'scan.startup.mode' = 'initial'
```flink.conf
-- 运行的yarn模式,需要用户配置flink on yarn
set 'execution.target' = 'yarn-per-job';
-- 运行的yarn任务的名称
set 'yarn.application.name' = 'flinkjob_database';
-- checkpoint配置
set 'state.backend' = 'filesystem';
set 'state.checkpoints.dir' = 'hdfs:///ck';
set 'execution.checkpointing.interval' = '6000';
set 'execution.checkpointing.tolerable-failed-checkpoints' = '1';
set 'execution.checkpointing.timeout' ='600000';
set 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
set 'execution.checkpointing.max-concurrent-checkpoints' = '1';
-- 资源配置
set 'jobmanager.memory.process.size' = '1600m';
set 'taskmanager.memory.process.size' = '1780m';
set 'taskmanager.memory.managed.size' = '100m';
set 'taskmanager.numberoftaskslots' = '1';
```doris中flink.conf
-- 两阶段提交
'sink.properties.two_phase_commit' = 'true'

开始执行,去bin目录下执行脚本生成文件

文件生成格式:

每个Mysql库等于一个Flink任务等于一个Flink脚本文件

每个Flink脚本文件由一个Mysql脚本语句和一个Doris映射语句和一个insert同步语句构成

所以生成格式为

mysql_db.sql
doris_mysqldb.sql
insert_mysqldb.sql
flink_mysqldb.sql

原因如下:

首先为了统一命名格式便于理解

其次如果以doris库命名很有可能mysql10个库同步至doris一个库当中,就导致只会生成一个flinkjob

测试先:准备好insert_into和delete语句

INSERT INTO emp_2.employees_2 VALUES (11,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(22,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(33,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(44,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02')
delete from emp_2.employees_2 where emp_no = 11;
delete from emp_2.employees_2 where emp_no = 22;
delete from emp_2.employees_2 where emp_no = 33;
delete from emp_2.employees_2 where emp_no = 44;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10091 ;
update emp_2.employees_2 set first_name = 'toms' where emp_no= 10092;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10093;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10094;
统计语句
use emp_1;
select count(1) from employees_1;
select count(1) from employees_2;
use emp_2;
select count(1) from employees_1;
select count(1) from employees_2;

insert端的实现:

for m_table in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}')
        do
        let a++
        m_d=`cat ../conf/mysql/tables |grep -v '#' | awk "NR==$a{print}" |awk -F '.' '{print $1}'`
        d_table=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$a{print}"`
        sed "/$d_table$sink/,/KEY(/{//d;s/ *//;p};d" ../result/doris_$m_d.sql |awk '!x[$0]++' |awk '{print $0}'| awk -F '`' '{print $2}'|awk -F '\n' '{print $1","}' |sed '$s/.$//' > a.c
        ac=`cat a.c`
        m_d=`echo $m_table | awk -F '.' '{print $1}'`
        echo -e "insert into \`$d_table$sink\`\nselect\n${ac}\nfrom\n\`$m_table$src\`\nwhere 1=1;\n\n" >> ../result/insert_$m_d.sql
        rm -rf a.c
done
CREATE TABLE `demo.all_employees_info_sink1` (
  `emp_no` int NOT NULL COMMENT '',
  `birth_date` date NULL COMMENT '',
  `first_name` varchar(20) NULL COMMENT '',
  `last_name` varchar(20) NULL COMMENT '',
  `gender` string NULL COMMENT '',
  `hire_date` date NULL COMMENT '',
  `database_name` varchar(50) NULL COMMENT '',
  `table_name` varchar(200) NULL COMMENT '',
PRIMARY KEY(`emp_no`, `birth_date`)
NOT ENFORCED
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

source端的实现:

label=0
for t_name in $(cat ../conf/doris/tables | awk -F '\n' '{print $1}' | awk -F '.' '{print $2}')
        do
        let label++
        d_label=`cat ../conf/mysql/tables | awk "NR == $label"{print} |sed 's/\./\_/g'`
        m_d=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        d_d=`cat ../conf/doris/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        m_t=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $2}'`
        sed -i "0,/doris_username/s/doris_username/${fe_master_username}/" ../result/$t.sql
        sed -i "0,/doris_password/s/doris_password/${fe_master_password}/" ../result/$t.sql
        sed -i "0,/doris_table/s/doris_table/${d_d}.${t_name}/" ../result/$t.sql
        sed -i "0,/doris_connector/s/doris_connector/doris/" ../result/$t.sql
        sed -i "0,/doris_fenodes/s/doris_fenodes/${fe_master_host}:${fe_load_url_port}/" ../result/$t.sql
        sed -i "0,/doris_label\-prefix/s/doris_label\-prefix/${m_d}_${m_t}\_`date "+%y%m%d_%H%M%S"_$label`/" ../result/$t.sql
done
create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20) null comment '',
  `last_name` varchar(20) null comment '',
  `gender` string null comment '',
  `hire_date` date null comment '',
  `database_name` varchar(50) null comment '',
  `table_name` varchar(200) null comment '',
primary key(`emp_no`, `birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

sink端的实现:

for i in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}'|awk -F '.' '{print $1}' |sort -u)
        do
        sed -i '1iBEGIN STATEMENT SET;' ../result/insert_$i.sql
        sed -i '$aEND;' ../result/insert_$i.sql
        b=0
        for table in $(cat ../conf/doris/tables |grep -v '#' | awk -F '\n' '{print $1}')
                do
                let b++
                d_doris=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$b"`
                sed -i "0,/into \`${d_doris}_sink\`/s/into \`${d_doris}_sink\`/into \`${d_doris}_sink${b}\`/" ../result/insert_$i.sql
        done
done
create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20) null comment '',
  `last_name` varchar(20) null comment '',
  `gender` string null comment '',
  `hire_date` date null comment ''
primary key(`emp_no`, `birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

实现从checkpoint恢复任务:

1、小的改动:

  • 获取从job的web获取到checkpoint的url(注意每30s都会更改一次)
  • 修改文件内容
  • 修改label
  • 修改最终的checkpointurl

    640.png
  • 保存文件

碰到的问题:调整sql,相当变更算子,这时执行重新checkpoit会报错,需忽略这些新算子

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://127.0.0.1/fck/78f7cb6b577fe6db19648ca63607e640/chk-66. Cannot map checkpoint/savepoint state for operator e75d4004e6c5f0908bd4077fcf200fcd to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

640.png

2、大的改动

  • cp出一份mysql_to_doris
  • 重新配置表结构,执行sh flinkjob.sh
  • 复制文件
  • 整体粘贴
  • 配置最终checkpoint的url

6、展望:

  • 代码优化,将变量重新命名,调整格式以及通过变量简化代码量
  • 数据类型转换规范化,在此基础上继续填充
  • 优化checkpoint点,达到只需修改配置项即可同步的效果


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
24天前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
1月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
207 4
|
2月前
|
关系型数据库 MySQL 数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步
|
9天前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
258 1
Flink CDC:新一代实时数据集成框架
|
1月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
434 14
Flink CDC 在货拉拉的落地与实践
|
2月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
395 13
Flink CDC 在新能源制造业的实践
|
1月前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
31 2
zabbix agent集成percona监控MySQL的插件实战案例
|
2月前
|
SQL 关系型数据库 MySQL
干货!python与MySQL数据库的交互实战
干货!python与MySQL数据库的交互实战
|
2月前
|
存储 关系型数据库 MySQL
实战!MySQL主从复制一键搭建脚本分享
实战!MySQL主从复制一键搭建脚本分享
31 2
|
2月前
|
SQL 数据库 流计算
Flink CDC数据读取问题之一致性如何解决
Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
89 1
下一篇
无影云桌面