【建议收藏】Mysql+Flink CDC+Doris 数据同步实战(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 【建议收藏】Mysql+Flink CDC+Doris 数据同步实战

配置文件

doris
├── doris.conf
├── flink.conf
└── tables
mysql
├── flink.conf
├── mysql.conf
└── tables
flink
├── flink_conf
注意mysql的tables和doris的tables是一一对应的关系

文件详解

```mysql中flink.conf
-- 指定binlog消费方式
'scan.startup.mode' = 'initial'
```flink.conf
-- 运行的yarn模式,需要用户配置flink on yarn
set 'execution.target' = 'yarn-per-job';
-- 运行的yarn任务的名称
set 'yarn.application.name' = 'flinkjob_database';
-- checkpoint配置
set 'state.backend' = 'filesystem';
set 'state.checkpoints.dir' = 'hdfs:///ck';
set 'execution.checkpointing.interval' = '6000';
set 'execution.checkpointing.tolerable-failed-checkpoints' = '1';
set 'execution.checkpointing.timeout' ='600000';
set 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
set 'execution.checkpointing.max-concurrent-checkpoints' = '1';
-- 资源配置
set 'jobmanager.memory.process.size' = '1600m';
set 'taskmanager.memory.process.size' = '1780m';
set 'taskmanager.memory.managed.size' = '100m';
set 'taskmanager.numberoftaskslots' = '1';
```doris中flink.conf
-- 两阶段提交
'sink.properties.two_phase_commit' = 'true'

开始执行,去bin目录下执行脚本生成文件

文件生成格式:

每个Mysql库等于一个Flink任务等于一个Flink脚本文件

每个Flink脚本文件由一个Mysql脚本语句和一个Doris映射语句和一个insert同步语句构成

所以生成格式为

mysql_db.sql
doris_mysqldb.sql
insert_mysqldb.sql
flink_mysqldb.sql

原因如下:

首先为了统一命名格式便于理解

其次如果以doris库命名很有可能mysql10个库同步至doris一个库当中,就导致只会生成一个flinkjob

测试先:准备好insert_into和delete语句

INSERT INTO emp_2.employees_2 VALUES (11,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(22,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(33,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(44,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02')
delete from emp_2.employees_2 where emp_no = 11;
delete from emp_2.employees_2 where emp_no = 22;
delete from emp_2.employees_2 where emp_no = 33;
delete from emp_2.employees_2 where emp_no = 44;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10091 ;
update emp_2.employees_2 set first_name = 'toms' where emp_no= 10092;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10093;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10094;
统计语句
use emp_1;
select count(1) from employees_1;
select count(1) from employees_2;
use emp_2;
select count(1) from employees_1;
select count(1) from employees_2;

insert端的实现:

for m_table in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}')
        do
        let a++
        m_d=`cat ../conf/mysql/tables |grep -v '#' | awk "NR==$a{print}" |awk -F '.' '{print $1}'`
        d_table=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$a{print}"`
        sed "/$d_table$sink/,/KEY(/{//d;s/ *//;p};d" ../result/doris_$m_d.sql |awk '!x[$0]++' |awk '{print $0}'| awk -F '`' '{print $2}'|awk -F '\n' '{print $1","}' |sed '$s/.$//' > a.c
        ac=`cat a.c`
        m_d=`echo $m_table | awk -F '.' '{print $1}'`
        echo -e "insert into \`$d_table$sink\`\nselect\n${ac}\nfrom\n\`$m_table$src\`\nwhere 1=1;\n\n" >> ../result/insert_$m_d.sql
        rm -rf a.c
done
CREATE TABLE `demo.all_employees_info_sink1` (
  `emp_no` int NOT NULL COMMENT '',
  `birth_date` date NULL COMMENT '',
  `first_name` varchar(20) NULL COMMENT '',
  `last_name` varchar(20) NULL COMMENT '',
  `gender` string NULL COMMENT '',
  `hire_date` date NULL COMMENT '',
  `database_name` varchar(50) NULL COMMENT '',
  `table_name` varchar(200) NULL COMMENT '',
PRIMARY KEY(`emp_no`, `birth_date`)
NOT ENFORCED
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

source端的实现:

label=0
for t_name in $(cat ../conf/doris/tables | awk -F '\n' '{print $1}' | awk -F '.' '{print $2}')
        do
        let label++
        d_label=`cat ../conf/mysql/tables | awk "NR == $label"{print} |sed 's/\./\_/g'`
        m_d=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        d_d=`cat ../conf/doris/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        m_t=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $2}'`
        sed -i "0,/doris_username/s/doris_username/${fe_master_username}/" ../result/$t.sql
        sed -i "0,/doris_password/s/doris_password/${fe_master_password}/" ../result/$t.sql
        sed -i "0,/doris_table/s/doris_table/${d_d}.${t_name}/" ../result/$t.sql
        sed -i "0,/doris_connector/s/doris_connector/doris/" ../result/$t.sql
        sed -i "0,/doris_fenodes/s/doris_fenodes/${fe_master_host}:${fe_load_url_port}/" ../result/$t.sql
        sed -i "0,/doris_label\-prefix/s/doris_label\-prefix/${m_d}_${m_t}\_`date "+%y%m%d_%H%M%S"_$label`/" ../result/$t.sql
done
create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20) null comment '',
  `last_name` varchar(20) null comment '',
  `gender` string null comment '',
  `hire_date` date null comment '',
  `database_name` varchar(50) null comment '',
  `table_name` varchar(200) null comment '',
primary key(`emp_no`, `birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

sink端的实现:

for i in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}'|awk -F '.' '{print $1}' |sort -u)
        do
        sed -i '1iBEGIN STATEMENT SET;' ../result/insert_$i.sql
        sed -i '$aEND;' ../result/insert_$i.sql
        b=0
        for table in $(cat ../conf/doris/tables |grep -v '#' | awk -F '\n' '{print $1}')
                do
                let b++
                d_doris=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$b"`
                sed -i "0,/into \`${d_doris}_sink\`/s/into \`${d_doris}_sink\`/into \`${d_doris}_sink${b}\`/" ../result/insert_$i.sql
        done
done
create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20) null comment '',
  `last_name` varchar(20) null comment '',
  `gender` string null comment '',
  `hire_date` date null comment ''
primary key(`emp_no`, `birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

实现从checkpoint恢复任务:

1、小的改动:

  • 获取从job的web获取到checkpoint的url(注意每30s都会更改一次)
  • 修改文件内容
  • 修改label
  • 修改最终的checkpointurl

    640.png
  • 保存文件

碰到的问题:调整sql,相当变更算子,这时执行重新checkpoit会报错,需忽略这些新算子

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://127.0.0.1/fck/78f7cb6b577fe6db19648ca63607e640/chk-66. Cannot map checkpoint/savepoint state for operator e75d4004e6c5f0908bd4077fcf200fcd to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

640.png

2、大的改动

  • cp出一份mysql_to_doris
  • 重新配置表结构,执行sh flinkjob.sh
  • 复制文件
  • 整体粘贴
  • 配置最终checkpoint的url

6、展望:

  • 代码优化,将变量重新命名,调整格式以及通过变量简化代码量
  • 数据类型转换规范化,在此基础上继续填充
  • 优化checkpoint点,达到只需修改配置项即可同步的效果


相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
16天前
|
SQL 存储 关系型数据库
不允许你不知道的 MySQL 优化实战(三)
不允许你不知道的 MySQL 优化实战(三)
22 1
|
20天前
|
关系型数据库 MySQL
【MySQL实战笔记】07 | 行锁功过:怎么减少行锁对性能的影响?-01
【4月更文挑战第18天】MySQL的InnoDB引擎支持行锁,而MyISAM只支持表锁。行锁在事务开始时添加,事务结束时释放,遵循两阶段锁协议。为减少锁冲突影响并发,应将可能导致最大冲突的锁操作放在事务最后。例如,在电影票交易中,应将更新影院账户余额的操作安排在事务末尾,以缩短锁住关键行的时间,提高系统并发性能。
15 4
|
21天前
|
关系型数据库 MySQL 数据库
【MySQL实战笔记】 06 | 全局锁和表锁 :给表加个字段怎么有这么多阻碍?-01
【4月更文挑战第17天】MySQL的锁分为全局锁、表级锁和行锁。全局锁用于全库备份,可能导致业务暂停或主从延迟。不加锁备份会导致逻辑不一致。推荐使用`FTWRL`而非`readonly=true`因后者可能影响其他逻辑且异常处理不同。表级锁如`lock tables`限制读写并限定操作对象,常用于并发控制。元数据锁(MDL)在访问表时自动加锁,确保读写正确性。
70 31
|
1月前
|
存储 关系型数据库 MySQL
【MySQL实战笔记】 04 | 深入浅出索引(上)-01
【4月更文挑战第8天】这篇文章除了介绍索引的作用和提高查询效率的原理,还探讨了三种常见的数据结构:哈希表、有序数组和搜索树。哈希表适合等值查询,但不支持范围查询;有序数组利用二分查找实现快速等值查询,但更新成本高;二叉搜索树保持平衡时查询高效,但磁盘存储时效率低。文章指出,由于磁盘读取延迟,实际数据库索引设计需考虑减少磁盘访问次数。
34 5
|
1月前
|
SQL 关系型数据库 MySQL
【MySQL实战笔记】02.一条SQL更新语句是如何执行的-1
【4月更文挑战第4天】SQL更新语句执行涉及查询和日志模块,主要为`redo log`和`binlog`。`redo log`先写日志再写磁盘,保证`crash-safe`;`binlog`记录逻辑日志,支持所有引擎,且追加写入。执行过程分为执行器查找数据、更新内存和`redo log`(prepare状态)、写入`binlog`、提交事务(`redo log`转commit)。两阶段提交确保日志逻辑一致,支持数据库恢复至任意时间点。
20 0
|
16天前
|
SQL 关系型数据库 MySQL
不允许你不知道的 MySQL 优化实战(二)
不允许你不知道的 MySQL 优化实战(二)
22 2
|
1月前
|
SQL 安全 关系型数据库
【MySQL实战笔记】03.事务隔离:为什么你改了我还看不见?-01
【4月更文挑战第6天】MySQL事务的隔离性确保数据操作的完整性和一致性,ACID原则包括原子性、一致性、隔离性和持久性。事务隔离级别有四种:读未提交、读提交、可重复读和串行化,分别解决并发问题如脏读、不可重复读和幻读。不同隔离级别在效率和安全性间权衡,例如读未提交允许未提交变更可见,而串行化通过锁保证安全但可能降低效率。在不同隔离级别下,事务看到的数据状态会有所变化,例如在可重复读级别,事务始终看到初始数据,而在串行化级别,事务会等待其他事务完成再继续,避免数据冲突。
278 10
|
11天前
|
缓存 关系型数据库 MySQL
MySQL数据库性能优化实战
【4月更文挑战第30天】本文探讨了MySQL性能优化实战技巧,包括硬件与配置优化(如使用SSD、增加内存和调整配置参数)、索引优化(创建合适索引、使用复合索引及定期维护)、查询优化(避免全表扫描、减少JOIN和使用LIMIT)、分区与分片(表分区和数据库分片),以及使用缓存、定期清理数据库和监控诊断。通过这些方法,可以提升数据库性能和响应速度。
|
12天前
|
DataWorks Shell 对象存储
DataWorks产品使用合集之在 DataWorks 中,有一个 MySQL 数据表,数据量非常大且数据会不断更新将这些数据同步到 DataWorks如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
29 3
|
13天前
|
存储 关系型数据库 MySQL
MySQL数据库实战:从入门到精通
本文介绍了MySQL的使用和优化,适合Web开发者阅读。首先,确保安装并配置好MySQL,熟悉SQL基础。接着,通过命令行客户端连接数据库,执行创建、查询、添加、修改和删除数据等操作。学习数据类型并创建表存储数据。最后,探讨了数据库优化,包括查询优化和索引使用,以提升性能。
24 2