使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 本文通过实例来演示怎么通过Flink CDC 结合Doris的Flink Connector实现从Mysql数据库中监听数据并实时入库到Doris数仓对应的表中。

本文通过实例来演示怎么通过Flink CDC 结合Doris的Flink Connector实现从Mysql数据库中监听数据并实时入库到Doris数仓对应的表中。


1.什么是CDC


CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。


例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门需要将每分钟的实时数据简单聚合处理后保存到 Redis 中以供查询,B 部门需要将当天的数据暂存到

Elasticsearch 一份来做报表展示,C 部门也需要一份数据到 ClickHouse 做实时数仓。随着时间的推移,后续 D 部门、E 部门也会有数据分析的需求,这种场景下,传统的拷贝分发多个副本方法很不灵活,而 CDC 可以实现一份变动记录,实时处理并投递到多个目的地。


1.1 CDC的应用场景


  • 数据同步:用于备份,容灾;


  • 数据分发:一个数据源分发给多个下游系统;


  • 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。


CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:


  • 基于查询的 CDC:


  • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
  • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
  • 不保障实时性,基于离线调度存在天然的延迟。


  • 基于日志的 CDC:
  • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
  • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
  • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。


2.Flink CDC


Flink在1.11版本中新增了CDC的特性,简称 改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看CDC的内容。


以上是之前的mysq binlog日志处理流程,例如 canal 监听 binlog 把日志写入到 kafka 中。而 Apache Flink 实时消费 Kakfa 的数据实现 mysql 数据的同步或其他内容等。拆分来说整体上可以分为以下几个阶段。


  1. mysql开启binlog


  1. canal同步binlog数据写入到kafka


  1. flink读取kakfa中的binlog数据进行相关的业务处理。


整体的处理链路较长,需要用到的组件也比较多。Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析


2.1 Flink Connector Mysql CDC 2.0 特性


提供 MySQL CDC 2.0,核心 feature 包括


  • 并发读取,全量数据的读取性能可以水平扩展;
  • 全程无锁,不对线上业务产生锁的风险;
  • 断点续传,支持全量阶段的 checkpoint。


网上有测试文档显示用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段:


  • MySQL CDC 2.0 用时 13 分钟;
  • MySQL CDC 1.4 用时 89 分钟;
  • 读取性能提升 6.8 倍。


3.什么是Flink Doris Connector

image.png


Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。


Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!


Flink Doris Connector 是 doris 社区为了方便用户使用 Flink 读写Doris数据表的一个扩展,


目前 doris 支持 Flink 1.11.x ,1.12.x,1.13.x,Scala版本:2.12.x


目前Flink doris connector目前控制入库通过两个参数:


  1. sink.batch.size  :每多少条写入一次,默认100条


  1. sink.batch.interval :每个多少秒写入一下,默认1秒


这两参数同时起作用,那个条件先到就触发写doris表操作,


注意:


这里注意的是要启用 http v2 版本,具体在 fe.conf 中配置

enable_http_server_v2=true,同时因为是通过 fe http rest api 获取 be 列表,这俩需要配置的用户有 admin 权限。


4. 用法示例


4.1 Flink Doris Connector 编译


首先我们要编译Doris的Flink connector,也可以通过下面的地址进行下载:


注意:

这里因为Doris 的Flink Connector 是基于Scala 2.12.x版本进行开发的,所有你在使用Flink 的时候请选择对应scala 2.12的版本,

如果你使用上面地址下载了相应的jar,请忽略下面的编译内容部分

在 doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。


在 extension/flink-doris-connector/ 源码目录下执行:

sh build.sh

编译成功后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 FlinkClassPath 中即可使用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Flink,则将此文件放入预部署包中。


针对Flink 1.13.x版本适配问题

<properties>
        <scala.version>2.12</scala.version>
        <flink.version>1.11.2</flink.version>
        <libthrift.version>0.9.3</libthrift.version>
        <arrow.version>0.15.1</arrow.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <doris.home>${basedir}/../../</doris.home>
        <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
    </properties>

只需要将这里的 flink.version 改成和你 Flink 集群版本一致,重新编辑即可


4.2 配置Flink


这里我们是通过Flink Sql Client 方式来进行操作。


这里我们演示使用的软件版本:


  1. Mysql 8.x


  1. Apache Flink : 1.13.3


  1. Apache Doris :0.14.13.1


4.2.1 安装Flink


首先下载和安装 Flink :


这里演示使用的是本地单机模式,

# wget https://dlcdn.apache.org/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz
# tar zxvf flink-1.12.5-bin-scala_2.12.tgz


这里注意Flink CDC 和Flink 的版本对应关系

image.png


# wget https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz
# tar zxvf flink-1.13.3-bin-scala_2.12.tgz 
# cd flink-1.13.3
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar -P ./lib/
# wget https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar -P ./lib/

image.png


4.2.2 启动Flink


这里我们使用的是本地单机模式

# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host doris01.
Starting taskexecutor daemon on host doris01.

我们通过web访问(默认端口是8081)启动起来Flink 集群,可以看到集群正常启动


image.png



4.3 安装Apache Doris


具体安装部署Doris的方法,参照下面的连接:

环境安装部署


4.3 安装配置 Mysql


  1. 安装Mysql


快速使用Docker安装配置Mysql,具体参照下面的连接

  1. 开启Mysql binlog


进入 Docker 容器修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,

log_bin=mysql_bin
binlog-format=Row
server-id=1

然后重启Mysql


systemctl restart mysqld
  1. 创建Mysql数据库表
CREATE TABLE `test_cdc` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB

4.4 创建doris表


CREATE TABLE `doris_test` (
  `id` int NULL COMMENT "",
  `name` varchar(100) NULL COMMENT ""
 ) ENGINE=OLAP
 UNIQUE KEY(`id`)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );

4.5 启动 Flink Sql Client


./bin/sql-client.sh embedded
> set execution.result-mode=tableau;

image.png


4.5.1 创建 Flink CDC Mysql 映射表


CREATE TABLE test_flink_cdc ( 
  id INT, 
  name STRING,
  primary key(id)  NOT ENFORCED
) WITH ( 
  'connector' = 'mysql-cdc', 
  'hostname' = 'localhost', 
  'port' = '3306', 
  'username' = 'root', 
  'password' = 'password', 
  'database-name' = 'demo', 
  'table-name' = 'test_cdc' 
);

执行查询创建的Mysql映射表,显示正常

select * from test_flink_cdc;

image.png


4.5.2 创建Flink Doris Table 映射表


使用Doris Flink Connector创建 Doris映射表

CREATE TABLE doris_test_sink (
   id INT,
   name STRING
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'localhost:8030',
  'table.identifier' = 'db_audit.doris_test',
  'sink.batch.size' = '2',
  'sink.batch.interval'='1',
  'username' = 'root',
  'password' = ''
)

在命令行下执行上面的语句,可以看到创建表成功,然后执行查询语句,验证是否正常


select * from doris_test_sink;

image.png


执行插入操作,将Mysql 里的数据通过 Flink CDC结合Doris Flink Connector方式插入到 Doris中


INSERT INTO doris_test_sink select id,name from test_flink_cdc

image.png


提交成功之后我们在Flink的Web界面可以看到相关的Job任务信息


image.png


4.5.3 向Mysql表中插入数据


INSERT INTO test_cdc VALUES (123, 'this is a update');
INSERT INTO test_cdc VALUES (1212, '测试flink CDC');
INSERT INTO test_cdc VALUES (1234, '这是测试');
INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1');
INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2');
INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3');
INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4');
INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5');
INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6');
INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7');
INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8');
INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9');

4.5.4 观察Doris表的数据

首先停掉Insert into这个任务,因为我是在本地单机模式,只有一个task任务,所以要停掉,然后在命令行执行查询语句才能看到数据

image.png


4.5.5 修改Mysql的数据


重新启动Insert into任务

image.png



修改Mysql表里的数据

update test_cdc set name='这个是验证修改的操作' where id =123

再去观察Doris表中的数据,你会发现已经修改


注意这里如果要想Mysql表里的数据修改,Doris里的数据也同样修改,Doris数据表的模型要是Unique key模型,其他数据模型(Aggregate Key 和 Duplicate Key)不能进行数据的更新操作。

image.png


image.png


4.5.6 删除数据操作


目前Doris Flink Connector 还不支持删除操作,后面计划会加上这个操作




相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
29天前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
165 61
|
25天前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
15天前
|
SQL 存储 Apache
Apache Doris 3.0.3 版本正式发布
亲爱的社区小伙伴们,Apache Doris 3.0.3 版本已于 2024 年 12 月 02 日正式发布。该版本进一步提升了系统的性能及稳定性,欢迎大家下载体验。
|
1月前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
1月前
|
SQL 存储 数据处理
兼顾高性能与低成本,浅析 Apache Doris 异步物化视图原理及典型场景
Apache Doris 物化视图进行了支持。**早期版本中,Doris 支持同步物化视图;从 2.1 版本开始,正式引入异步物化视图,[并在 3.0 版本中完善了这一功能](https://www.selectdb.com/blog/1058)。**
|
1月前
|
SQL 存储 Java
Apache Doris 2.1.7 版本正式发布
亲爱的社区小伙伴们,**Apache Doris 2.1.7 版本已于 2024 年 11 月 10 日正式发布。**2.1.7 版本持续升级改进,同时在湖仓一体、异步物化视图、半结构化数据管理、查询优化器、执行引擎、存储管理、以及权限管理等方面完成了若干修复。欢迎大家下载使用。
|
1月前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
1月前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
2月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
1天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
10 3

推荐镜像

更多