【建议收藏】Mysql+Flink CDC+Doris 数据同步实战(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 【建议收藏】Mysql+Flink CDC+Doris 数据同步实战

1、业务需求及其痛点

公司诸多业务需求求其最新状态,例如车最新状态,桩最新状态,报告最新状态,检定任务最新状态,业务信息所有的明细数据保存至doris中,但是无法得知其最新状态集;


阶段1:根据GB4403、GB27930等协议,数据允许迟到7天,也就是说,通过sql进行计算的时候,必须取最近7天的数据,平均每天数据1000w条,就是单次计算大概在7000w条左右,通过创建最新状态表,然后通过sql取出结果集至状态表当中,通过调度框架dolphinscheduler对其进行调度;由于是最新状态其实时性比较高,往常是设定了1分钟的调度时间

640.png

痛点:

①:实时性根据调度时间确定,不管时间设定多短,都不够实时

②:频繁重复计算浪费大量计算资源

insert into the_monitor_latest_status
select vin, daq_time, province, city, district, odo, cha_state, op_mode, op_state, soc, curr, volt, lat, lng
from
    (select vin, daq_time, province, city, district, odo, cha_state, op_mode, op_state, soc, curr, volt, lat, lng,row_number() over (partition by vin order by daq_time desc)ro
     from ods_monitordata
     where daq_time >= date_format(data_sub(current_date(),interval 7 day),'%Y-%m-%d 00:00:00') and odo != 0 and province != 'unknown')t1
where ro = 1;

阶段2:

640.png

痛点:

①:开发成本高,每张表都需要写一段程序

Mysql外表需求和痛点:

业务系统很多表结构一直存储在mysql当中,其中的大表(数据量大)都会同步至doris中,数据量较小的维表没必要同步至doris当中,可以通过外表的方式挂载到doris中,但是创建外表的步骤较为繁琐,只能一张张手动创建,另外mysql中表结构更改后,外表就需要重建

痛点:

①:外部表手动创建繁琐,如100张表全部手动创建

②:mysql表结构更改就需要重新创建外表

2、mysql_to_doris结构图

工具实现上述优化,优点如下:

  • shell编写极其轻量,开源即用
  • 纯sql语法开发成本0特别适用于当前业务场景
  • 简单配置实现全程自动化处理

架构图

640.png

640.png

mysql_to_doris/
├── bin
│   ├── auto.sh  --Flink_job启动脚本
│   ├── create_doris.sh  --生成doris映射flink的建表语句
│   ├── create_mysql.sh  --生成mysql映射flink的建表语句
│   ├── e_auto.sh  --外部表执行脚本
│   ├── e_mysql_to_doris.sh  --外部表建表语句生成脚本
│   ├── flinksql.sh  --flink_job语句生成脚本
│   └── insert_into.sh  --insert into 语句生成脚本
├── conf
│   ├── doris
│   │   ├── doris.conf  --doris连接配置信息
│   │   ├── flink.conf  --flink特殊配置项
│   │   └── tables  --sink端的库名.表名
│   ├── e_mysql
│   │   ├── doris.conf  --外部表连接信息
│   │   ├── doris_tables  --外部表库名.表名(自定义)
│   │   ├── mysql.conf  --外部表连接信息
│   │   └── mysql_tables  --源表库名.表名
│   ├── flink
│   │   ├── flink_conf  --flink配置信息
│   └── mysql
│       ├── flink.conf  --flink特殊配置项
│       ├── mysql.conf  --mysql连接配置信息
│       └── tables  --source端的库名.表名
└── lib
    ├── doris_to_flink.sh  --doris映射flink表结构转换
    ├── mysql_to_doris.sh  --mysql映射doris外表结构转换
    └── mysql_to_flink.sh  --mysql映射flink外表结构转换

代码流程:

1、获取建表语句

for table in $(cat ../conf/e_mysql/mysql_tables |grep -v '#' | awk -F '\n' '{print $1}')
        do
        echo "show create table ${table};" |mysql -h$mysql_host -uroot -p$mysql_password  >> $path
done

640.png

2、调整格式

awk -F '\t' '{print $2}' $path |awk '!(NR%2)' |awk '{print $0 ";"}' > ../result/tmp111.sql
sed -i 's/\\n/\n/g' ../result/tmp111.sql
sed -n '/CREATE TABLE/,/ENGINE\=/p' ../result/tmp111.sql > ../result/tmp222.sql
##delete tables special struct
sed -i '/^  CON/d' ../result/tmp222.sql
sed -i '/^  KEY/d' ../result/tmp222.sql

640.png

3、拼接doris信息

sed -i '/ENGINE=/a) ENGINE=ODBC\n COMMENT "ODBC"\nPROPERTIES (\n"host" = "ApacheDorisHostIp",\n"port" = "3306",\n"user" = "root",\n"password" = "ApacheDorisHostPassword",\n"database" = "ApacheDorisDataBases",\n"table" = "ApacheDorisTables",\n"driver" = "MySQL",\n"odbc_type" = "mysql");' $path

640.png

3、涉及组件介绍:

  • FlinkCDC版本2.2.1
  • Doris Flink Connector版本:1.14_2.12-1.0.0
  • FLink版本:1.14.5
  • Hadoop版本:3.1.3
  • doris版本:1.1.1
  • mysql odbc版本:5.3.13
链接:https://pan.baidu.com/s/1eMML1Km-VYa01SRQaGuwBQ 
提取码:yyds

什么是 CDC

CDC 是 Change Data Capture 变更数据获取的简称。


核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入 INSERT、更新 UPDATE、删除 DELETE 等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC 技术应用场景也非常广泛,包括:

  • 数据分发:将一个数据源分发给多个下游,常用于业务解耦、微服务。
  • 数据集成:将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。
  • 数据迁移:常用于数据库备份、容灾等。

什么是 Apache Doris

Apache Doris 是一个现代化的 MPP 分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris 的分布式架构非常简洁,易于运维,并且可以支持 10PB 以上的超大数据集。

Apache Doris 可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。可以使数据分析工作更加简单高效!

什么是 Doris Flink Connector

Flink Doris Connector 是 Doris 社区为了方便用户使用 Flink 读写 Doris 数据表的一个扩展。实现了通过flink实时写入数据进入到doris的可能,Flink Doris Connector之前,针对业务不规则数据,经常需要针对消息做规范处理,空值过滤等写入新的topic,然后再启动Routine load写入Doris。Flink Doris Connector之后,flink可以直接读取kafka,直接写入doris。

什么是Doris On ODBC

ODBC External Table Of Doris 提供了Doris通过数据库访问的标准接口(ODBC)来访问外部表,外部表省去了繁琐的数据导入工作,让Doris可以具有了访问各式数据库的能力,并借助Doris本身的OLAP的能力来解决外部表的数据分析问题:

  1. 支持各种数据源接入Doris
  2. 支持Doris与各种数据源中的表联合查询,进行更加复杂的分析操作
  3. 通过insert into将Doris执行的查询结果写入外部的数据源
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
4天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
26 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2天前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
9天前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
28 5
|
19天前
|
架构师 关系型数据库 MySQL
MySQL最左前缀优化原则:深入解析与实战应用
【10月更文挑战第12天】在数据库架构设计与优化中,索引的使用是提升查询性能的关键手段之一。其中,MySQL的最左前缀优化原则(Leftmost Prefix Principle)是复合索引(Composite Index)应用中的核心策略。作为资深架构师,深入理解并掌握这一原则,对于平衡数据库性能与维护成本至关重要。本文将详细解读最左前缀优化原则的功能特点、业务场景、优缺点、底层原理,并通过Java示例展示其实现方式。
41 1
|
3天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
17 0
|
4天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第26天】数据库作为现代应用系统的核心组件,其性能优化至关重要。本文主要探讨MySQL的索引策略与查询性能调优。通过合理创建索引(如B-Tree、复合索引)和优化查询语句(如使用EXPLAIN、优化分页查询),可以显著提升数据库的响应速度和稳定性。实践中还需定期审查慢查询日志,持续优化性能。
22 0
|
2月前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
47 2
zabbix agent集成percona监控MySQL的插件实战案例
|
3月前
|
SQL 关系型数据库 MySQL
干货!python与MySQL数据库的交互实战
干货!python与MySQL数据库的交互实战
|
3月前
|
存储 关系型数据库 MySQL
实战!MySQL主从复制一键搭建脚本分享
实战!MySQL主从复制一键搭建脚本分享
61 2
|
3月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
59 1