【建议收藏】Mysql+Flink CDC+Doris 数据同步实战(上)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 【建议收藏】Mysql+Flink CDC+Doris 数据同步实战

1、业务需求及其痛点

公司诸多业务需求求其最新状态,例如车最新状态,桩最新状态,报告最新状态,检定任务最新状态,业务信息所有的明细数据保存至doris中,但是无法得知其最新状态集;


阶段1:根据GB4403、GB27930等协议,数据允许迟到7天,也就是说,通过sql进行计算的时候,必须取最近7天的数据,平均每天数据1000w条,就是单次计算大概在7000w条左右,通过创建最新状态表,然后通过sql取出结果集至状态表当中,通过调度框架dolphinscheduler对其进行调度;由于是最新状态其实时性比较高,往常是设定了1分钟的调度时间

640.png

痛点:

①:实时性根据调度时间确定,不管时间设定多短,都不够实时

②:频繁重复计算浪费大量计算资源

insert into the_monitor_latest_status
select vin, daq_time, province, city, district, odo, cha_state, op_mode, op_state, soc, curr, volt, lat, lng
from
    (select vin, daq_time, province, city, district, odo, cha_state, op_mode, op_state, soc, curr, volt, lat, lng,row_number() over (partition by vin order by daq_time desc)ro
     from ods_monitordata
     where daq_time >= date_format(data_sub(current_date(),interval 7 day),'%Y-%m-%d 00:00:00') and odo != 0 and province != 'unknown')t1
where ro = 1;

阶段2:

640.png

痛点:

①:开发成本高,每张表都需要写一段程序

Mysql外表需求和痛点:

业务系统很多表结构一直存储在mysql当中,其中的大表(数据量大)都会同步至doris中,数据量较小的维表没必要同步至doris当中,可以通过外表的方式挂载到doris中,但是创建外表的步骤较为繁琐,只能一张张手动创建,另外mysql中表结构更改后,外表就需要重建

痛点:

①:外部表手动创建繁琐,如100张表全部手动创建

②:mysql表结构更改就需要重新创建外表

2、mysql_to_doris结构图

工具实现上述优化,优点如下:

  • shell编写极其轻量,开源即用
  • 纯sql语法开发成本0特别适用于当前业务场景
  • 简单配置实现全程自动化处理

架构图

640.png

640.png

mysql_to_doris/
├── bin
│   ├── auto.sh  --Flink_job启动脚本
│   ├── create_doris.sh  --生成doris映射flink的建表语句
│   ├── create_mysql.sh  --生成mysql映射flink的建表语句
│   ├── e_auto.sh  --外部表执行脚本
│   ├── e_mysql_to_doris.sh  --外部表建表语句生成脚本
│   ├── flinksql.sh  --flink_job语句生成脚本
│   └── insert_into.sh  --insert into 语句生成脚本
├── conf
│   ├── doris
│   │   ├── doris.conf  --doris连接配置信息
│   │   ├── flink.conf  --flink特殊配置项
│   │   └── tables  --sink端的库名.表名
│   ├── e_mysql
│   │   ├── doris.conf  --外部表连接信息
│   │   ├── doris_tables  --外部表库名.表名(自定义)
│   │   ├── mysql.conf  --外部表连接信息
│   │   └── mysql_tables  --源表库名.表名
│   ├── flink
│   │   ├── flink_conf  --flink配置信息
│   └── mysql
│       ├── flink.conf  --flink特殊配置项
│       ├── mysql.conf  --mysql连接配置信息
│       └── tables  --source端的库名.表名
└── lib
    ├── doris_to_flink.sh  --doris映射flink表结构转换
    ├── mysql_to_doris.sh  --mysql映射doris外表结构转换
    └── mysql_to_flink.sh  --mysql映射flink外表结构转换

代码流程:

1、获取建表语句

for table in $(cat ../conf/e_mysql/mysql_tables |grep -v '#' | awk -F '\n' '{print $1}')
        do
        echo "show create table ${table};" |mysql -h$mysql_host -uroot -p$mysql_password  >> $path
done

640.png

2、调整格式

awk -F '\t' '{print $2}' $path |awk '!(NR%2)' |awk '{print $0 ";"}' > ../result/tmp111.sql
sed -i 's/\\n/\n/g' ../result/tmp111.sql
sed -n '/CREATE TABLE/,/ENGINE\=/p' ../result/tmp111.sql > ../result/tmp222.sql
##delete tables special struct
sed -i '/^  CON/d' ../result/tmp222.sql
sed -i '/^  KEY/d' ../result/tmp222.sql

640.png

3、拼接doris信息

sed -i '/ENGINE=/a) ENGINE=ODBC\n COMMENT "ODBC"\nPROPERTIES (\n"host" = "ApacheDorisHostIp",\n"port" = "3306",\n"user" = "root",\n"password" = "ApacheDorisHostPassword",\n"database" = "ApacheDorisDataBases",\n"table" = "ApacheDorisTables",\n"driver" = "MySQL",\n"odbc_type" = "mysql");' $path

640.png

3、涉及组件介绍:

  • FlinkCDC版本2.2.1
  • Doris Flink Connector版本:1.14_2.12-1.0.0
  • FLink版本:1.14.5
  • Hadoop版本:3.1.3
  • doris版本:1.1.1
  • mysql odbc版本:5.3.13
链接:https://pan.baidu.com/s/1eMML1Km-VYa01SRQaGuwBQ 
提取码:yyds

什么是 CDC

CDC 是 Change Data Capture 变更数据获取的简称。


核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入 INSERT、更新 UPDATE、删除 DELETE 等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC 技术应用场景也非常广泛,包括:

  • 数据分发:将一个数据源分发给多个下游,常用于业务解耦、微服务。
  • 数据集成:将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。
  • 数据迁移:常用于数据库备份、容灾等。

什么是 Apache Doris

Apache Doris 是一个现代化的 MPP 分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris 的分布式架构非常简洁,易于运维,并且可以支持 10PB 以上的超大数据集。

Apache Doris 可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。可以使数据分析工作更加简单高效!

什么是 Doris Flink Connector

Flink Doris Connector 是 Doris 社区为了方便用户使用 Flink 读写 Doris 数据表的一个扩展。实现了通过flink实时写入数据进入到doris的可能,Flink Doris Connector之前,针对业务不规则数据,经常需要针对消息做规范处理,空值过滤等写入新的topic,然后再启动Routine load写入Doris。Flink Doris Connector之后,flink可以直接读取kafka,直接写入doris。

什么是Doris On ODBC

ODBC External Table Of Doris 提供了Doris通过数据库访问的标准接口(ODBC)来访问外部表,外部表省去了繁琐的数据导入工作,让Doris可以具有了访问各式数据库的能力,并借助Doris本身的OLAP的能力来解决外部表的数据分析问题:

  1. 支持各种数据源接入Doris
  2. 支持Doris与各种数据源中的表联合查询,进行更加复杂的分析操作
  3. 通过insert into将Doris执行的查询结果写入外部的数据源
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
9月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
11月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1133 0
|
11月前
|
存储 缓存 数据挖掘
Flink + Doris 实时湖仓解决方案
本文整理自SelectDB技术副总裁陈明雨在Flink Forward Asia 2024的分享,聚焦Apache Doris与湖仓一体解决方案。内容涵盖三部分:一是介绍Apache Doris,一款高性能实时分析数据库,支持多场景应用;二是基于Doris、Flink和Paimon的湖仓解决方案,解决批流融合与数据一致性挑战;三是Doris社区生态及云原生发展,包括存算分离架构与600多位贡献者的活跃社区。文章深入探讨了Doris在性能、易用性及场景支持上的优势,并展示了其在多维分析、日志分析和湖仓分析中的实际应用案例。
936 17
Flink + Doris 实时湖仓解决方案
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
2888 45
|
11月前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
798 1
Flink CDC + Hologres高性能数据同步优化实践
|
12月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
927 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
SQL 存储 关系型数据库
MySQL秘籍之索引与查询优化实战指南
最左前缀原则。不冗余原则。最大选择性原则。所谓前缀索引,说白了就是对文本的前几个字符建立索引(具体是几个字符在建立索引时去指定),比如以产品名称的前 10 位来建索引,这样建立起来的索引更小,查询效率更快!
422 22
 MySQL秘籍之索引与查询优化实战指南
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
543 17
|
5月前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
430 158
|
5月前
|
关系型数据库 MySQL 数据库
自建数据库如何迁移至RDS MySQL实例
数据库迁移是一项复杂且耗时的工程,需考虑数据安全、完整性及业务中断影响。使用阿里云数据传输服务DTS,可快速、平滑完成迁移任务,将应用停机时间降至分钟级。您还可通过全量备份自建数据库并恢复至RDS MySQL实例,实现间接迁移上云。

热门文章

最新文章

推荐镜像

更多