【建议收藏】Mysql+Flink CDC+Doris 数据同步实战(下)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 【建议收藏】Mysql+Flink CDC+Doris 数据同步实战

配置文件

doris
├── doris.conf
├── flink.conf
└── tables
mysql
├── flink.conf
├── mysql.conf
└── tables
flink
├── flink_conf
注意mysql的tables和doris的tables是一一对应的关系

文件详解

```mysql中flink.conf
-- 指定binlog消费方式
'scan.startup.mode' = 'initial'
```flink.conf
-- 运行的yarn模式,需要用户配置flink on yarn
set 'execution.target' = 'yarn-per-job';
-- 运行的yarn任务的名称
set 'yarn.application.name' = 'flinkjob_database';
-- checkpoint配置
set 'state.backend' = 'filesystem';
set 'state.checkpoints.dir' = 'hdfs:///ck';
set 'execution.checkpointing.interval' = '6000';
set 'execution.checkpointing.tolerable-failed-checkpoints' = '1';
set 'execution.checkpointing.timeout' ='600000';
set 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
set 'execution.checkpointing.max-concurrent-checkpoints' = '1';
-- 资源配置
set 'jobmanager.memory.process.size' = '1600m';
set 'taskmanager.memory.process.size' = '1780m';
set 'taskmanager.memory.managed.size' = '100m';
set 'taskmanager.numberoftaskslots' = '1';
```doris中flink.conf
-- 两阶段提交
'sink.properties.two_phase_commit' = 'true'

开始执行,去bin目录下执行脚本生成文件

文件生成格式:

每个Mysql库等于一个Flink任务等于一个Flink脚本文件

每个Flink脚本文件由一个Mysql脚本语句和一个Doris映射语句和一个insert同步语句构成

所以生成格式为

mysql_db.sql
doris_mysqldb.sql
insert_mysqldb.sql
flink_mysqldb.sql

原因如下:

首先为了统一命名格式便于理解

其次如果以doris库命名很有可能mysql10个库同步至doris一个库当中,就导致只会生成一个flinkjob

测试先:准备好insert_into和delete语句

INSERT INTO emp_2.employees_2 VALUES (11,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(22,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(33,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(44,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02')
delete from emp_2.employees_2 where emp_no = 11;
delete from emp_2.employees_2 where emp_no = 22;
delete from emp_2.employees_2 where emp_no = 33;
delete from emp_2.employees_2 where emp_no = 44;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10091 ;
update emp_2.employees_2 set first_name = 'toms' where emp_no= 10092;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10093;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10094;
统计语句
use emp_1;
select count(1) from employees_1;
select count(1) from employees_2;
use emp_2;
select count(1) from employees_1;
select count(1) from employees_2;

insert端的实现:

for m_table in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}')
        do
        let a++
        m_d=`cat ../conf/mysql/tables |grep -v '#' | awk "NR==$a{print}" |awk -F '.' '{print $1}'`
        d_table=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$a{print}"`
        sed "/$d_table$sink/,/KEY(/{//d;s/ *//;p};d" ../result/doris_$m_d.sql |awk '!x[$0]++' |awk '{print $0}'| awk -F '`' '{print $2}'|awk -F '\n' '{print $1","}' |sed '$s/.$//' > a.c
        ac=`cat a.c`
        m_d=`echo $m_table | awk -F '.' '{print $1}'`
        echo -e "insert into \`$d_table$sink\`\nselect\n${ac}\nfrom\n\`$m_table$src\`\nwhere 1=1;\n\n" >> ../result/insert_$m_d.sql
        rm -rf a.c
done
CREATE TABLE `demo.all_employees_info_sink1` (
  `emp_no` int NOT NULL COMMENT '',
  `birth_date` date NULL COMMENT '',
  `first_name` varchar(20) NULL COMMENT '',
  `last_name` varchar(20) NULL COMMENT '',
  `gender` string NULL COMMENT '',
  `hire_date` date NULL COMMENT '',
  `database_name` varchar(50) NULL COMMENT '',
  `table_name` varchar(200) NULL COMMENT '',
PRIMARY KEY(`emp_no`, `birth_date`)
NOT ENFORCED
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

source端的实现:

label=0
for t_name in $(cat ../conf/doris/tables | awk -F '\n' '{print $1}' | awk -F '.' '{print $2}')
        do
        let label++
        d_label=`cat ../conf/mysql/tables | awk "NR == $label"{print} |sed 's/\./\_/g'`
        m_d=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        d_d=`cat ../conf/doris/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        m_t=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $2}'`
        sed -i "0,/doris_username/s/doris_username/${fe_master_username}/" ../result/$t.sql
        sed -i "0,/doris_password/s/doris_password/${fe_master_password}/" ../result/$t.sql
        sed -i "0,/doris_table/s/doris_table/${d_d}.${t_name}/" ../result/$t.sql
        sed -i "0,/doris_connector/s/doris_connector/doris/" ../result/$t.sql
        sed -i "0,/doris_fenodes/s/doris_fenodes/${fe_master_host}:${fe_load_url_port}/" ../result/$t.sql
        sed -i "0,/doris_label\-prefix/s/doris_label\-prefix/${m_d}_${m_t}\_`date "+%y%m%d_%H%M%S"_$label`/" ../result/$t.sql
done
create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20) null comment '',
  `last_name` varchar(20) null comment '',
  `gender` string null comment '',
  `hire_date` date null comment '',
  `database_name` varchar(50) null comment '',
  `table_name` varchar(200) null comment '',
primary key(`emp_no`, `birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

sink端的实现:

for i in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}'|awk -F '.' '{print $1}' |sort -u)
        do
        sed -i '1iBEGIN STATEMENT SET;' ../result/insert_$i.sql
        sed -i '$aEND;' ../result/insert_$i.sql
        b=0
        for table in $(cat ../conf/doris/tables |grep -v '#' | awk -F '\n' '{print $1}')
                do
                let b++
                d_doris=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$b"`
                sed -i "0,/into \`${d_doris}_sink\`/s/into \`${d_doris}_sink\`/into \`${d_doris}_sink${b}\`/" ../result/insert_$i.sql
        done
done
create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20) null comment '',
  `last_name` varchar(20) null comment '',
  `gender` string null comment '',
  `hire_date` date null comment ''
primary key(`emp_no`, `birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

实现从checkpoint恢复任务:

1、小的改动:

  • 获取从job的web获取到checkpoint的url(注意每30s都会更改一次)
  • 修改文件内容
  • 修改label
  • 修改最终的checkpointurl

    640.png
  • 保存文件

碰到的问题:调整sql,相当变更算子,这时执行重新checkpoit会报错,需忽略这些新算子

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://127.0.0.1/fck/78f7cb6b577fe6db19648ca63607e640/chk-66. Cannot map checkpoint/savepoint state for operator e75d4004e6c5f0908bd4077fcf200fcd to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

640.png

2、大的改动

  • cp出一份mysql_to_doris
  • 重新配置表结构,执行sh flinkjob.sh
  • 复制文件
  • 整体粘贴
  • 配置最终checkpoint的url

6、展望:

  • 代码优化,将变量重新命名,调整格式以及通过变量简化代码量
  • 数据类型转换规范化,在此基础上继续填充
  • 优化checkpoint点,达到只需修改配置项即可同步的效果


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
打赏
0
1
0
0
95
分享
相关文章
Flink + Doris 实时湖仓解决方案
本文整理自SelectDB技术副总裁陈明雨在Flink Forward Asia 2024的分享,聚焦Apache Doris与湖仓一体解决方案。内容涵盖三部分:一是介绍Apache Doris,一款高性能实时分析数据库,支持多场景应用;二是基于Doris、Flink和Paimon的湖仓解决方案,解决批流融合与数据一致性挑战;三是Doris社区生态及云原生发展,包括存算分离架构与600多位贡献者的活跃社区。文章深入探讨了Doris在性能、易用性及场景支持上的优势,并展示了其在多维分析、日志分析和湖仓分析中的实际应用案例。
292 17
Flink + Doris 实时湖仓解决方案
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
408 1
Flink CDC + Hologres高性能数据同步优化实践
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
481 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1056 43
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
342 17
Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖
本篇教程将展示如何使用 Flink CDC 构建实时数据湖,并处理分库分表合并同步的场景。
Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖
实时计算 Flink版操作报错之同步MySQL分库分表500张表报连接超时,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入
本文主要介绍了 Flink CDC 分库分表怎么实时同步,以及其结合 Apache Doris Flink Connector 最新版本整合的 Flink 2PC 和 Doris Stream Load 2PC 的机制及整合原理、使用方法等。
技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问