使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 本文通过实例来演示怎么通过Flink CDC 结合Doris的Flink Connector实现从Mysql数据库中监听数据并实时入库到Doris数仓对应的表中。

本文通过实例来演示怎么通过Flink CDC 结合Doris的Flink Connector实现从Mysql数据库中监听数据并实时入库到Doris数仓对应的表中。


1.什么是CDC


CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。


例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门需要将每分钟的实时数据简单聚合处理后保存到 Redis 中以供查询,B 部门需要将当天的数据暂存到

Elasticsearch 一份来做报表展示,C 部门也需要一份数据到 ClickHouse 做实时数仓。随着时间的推移,后续 D 部门、E 部门也会有数据分析的需求,这种场景下,传统的拷贝分发多个副本方法很不灵活,而 CDC 可以实现一份变动记录,实时处理并投递到多个目的地。


1.1 CDC的应用场景


  • 数据同步:用于备份,容灾;


  • 数据分发:一个数据源分发给多个下游系统;


  • 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。


CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:


  • 基于查询的 CDC:


  • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
  • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
  • 不保障实时性,基于离线调度存在天然的延迟。


  • 基于日志的 CDC:
  • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
  • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
  • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。


2.Flink CDC


Flink在1.11版本中新增了CDC的特性,简称 改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看CDC的内容。


以上是之前的mysq binlog日志处理流程,例如 canal 监听 binlog 把日志写入到 kafka 中。而 Apache Flink 实时消费 Kakfa 的数据实现 mysql 数据的同步或其他内容等。拆分来说整体上可以分为以下几个阶段。


  1. mysql开启binlog


  1. canal同步binlog数据写入到kafka


  1. flink读取kakfa中的binlog数据进行相关的业务处理。


整体的处理链路较长,需要用到的组件也比较多。Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析


2.1 Flink Connector Mysql CDC 2.0 特性


提供 MySQL CDC 2.0,核心 feature 包括


  • 并发读取,全量数据的读取性能可以水平扩展;
  • 全程无锁,不对线上业务产生锁的风险;
  • 断点续传,支持全量阶段的 checkpoint。


网上有测试文档显示用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段:


  • MySQL CDC 2.0 用时 13 分钟;
  • MySQL CDC 1.4 用时 89 分钟;
  • 读取性能提升 6.8 倍。


3.什么是Flink Doris Connector

image.png


Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。


Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!


Flink Doris Connector 是 doris 社区为了方便用户使用 Flink 读写Doris数据表的一个扩展,


目前 doris 支持 Flink 1.11.x ,1.12.x,1.13.x,Scala版本:2.12.x


目前Flink doris connector目前控制入库通过两个参数:


  1. sink.batch.size  :每多少条写入一次,默认100条


  1. sink.batch.interval :每个多少秒写入一下,默认1秒


这两参数同时起作用,那个条件先到就触发写doris表操作,


注意:


这里注意的是要启用 http v2 版本,具体在 fe.conf 中配置

enable_http_server_v2=true,同时因为是通过 fe http rest api 获取 be 列表,这俩需要配置的用户有 admin 权限。


4. 用法示例


4.1 Flink Doris Connector 编译


首先我们要编译Doris的Flink connector,也可以通过下面的地址进行下载:


注意:

这里因为Doris 的Flink Connector 是基于Scala 2.12.x版本进行开发的,所有你在使用Flink 的时候请选择对应scala 2.12的版本,

如果你使用上面地址下载了相应的jar,请忽略下面的编译内容部分

在 doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。


在 extension/flink-doris-connector/ 源码目录下执行:

sh build.sh

编译成功后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 FlinkClassPath 中即可使用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Flink,则将此文件放入预部署包中。


针对Flink 1.13.x版本适配问题

<properties>
        <scala.version>2.12</scala.version>
        <flink.version>1.11.2</flink.version>
        <libthrift.version>0.9.3</libthrift.version>
        <arrow.version>0.15.1</arrow.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <doris.home>${basedir}/../../</doris.home>
        <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
    </properties>

只需要将这里的 flink.version 改成和你 Flink 集群版本一致,重新编辑即可


4.2 配置Flink


这里我们是通过Flink Sql Client 方式来进行操作。


这里我们演示使用的软件版本:


  1. Mysql 8.x


  1. Apache Flink : 1.13.3


  1. Apache Doris :0.14.13.1


4.2.1 安装Flink


首先下载和安装 Flink :


这里演示使用的是本地单机模式,

# wget https://dlcdn.apache.org/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz
# tar zxvf flink-1.12.5-bin-scala_2.12.tgz


这里注意Flink CDC 和Flink 的版本对应关系

image.png


# wget https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz
# tar zxvf flink-1.13.3-bin-scala_2.12.tgz 
# cd flink-1.13.3
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar -P ./lib/
# wget https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar -P ./lib/

image.png


4.2.2 启动Flink


这里我们使用的是本地单机模式

# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host doris01.
Starting taskexecutor daemon on host doris01.

我们通过web访问(默认端口是8081)启动起来Flink 集群,可以看到集群正常启动


image.png



4.3 安装Apache Doris


具体安装部署Doris的方法,参照下面的连接:

环境安装部署


4.3 安装配置 Mysql


  1. 安装Mysql


快速使用Docker安装配置Mysql,具体参照下面的连接

  1. 开启Mysql binlog


进入 Docker 容器修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,

log_bin=mysql_bin
binlog-format=Row
server-id=1

然后重启Mysql


systemctl restart mysqld
  1. 创建Mysql数据库表
CREATE TABLE `test_cdc` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB

4.4 创建doris表


CREATE TABLE `doris_test` (
  `id` int NULL COMMENT "",
  `name` varchar(100) NULL COMMENT ""
 ) ENGINE=OLAP
 UNIQUE KEY(`id`)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );

4.5 启动 Flink Sql Client


./bin/sql-client.sh embedded
> set execution.result-mode=tableau;

image.png


4.5.1 创建 Flink CDC Mysql 映射表


CREATE TABLE test_flink_cdc ( 
  id INT, 
  name STRING,
  primary key(id)  NOT ENFORCED
) WITH ( 
  'connector' = 'mysql-cdc', 
  'hostname' = 'localhost', 
  'port' = '3306', 
  'username' = 'root', 
  'password' = 'password', 
  'database-name' = 'demo', 
  'table-name' = 'test_cdc' 
);

执行查询创建的Mysql映射表,显示正常

select * from test_flink_cdc;

image.png


4.5.2 创建Flink Doris Table 映射表


使用Doris Flink Connector创建 Doris映射表

CREATE TABLE doris_test_sink (
   id INT,
   name STRING
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'localhost:8030',
  'table.identifier' = 'db_audit.doris_test',
  'sink.batch.size' = '2',
  'sink.batch.interval'='1',
  'username' = 'root',
  'password' = ''
)

在命令行下执行上面的语句,可以看到创建表成功,然后执行查询语句,验证是否正常


select * from doris_test_sink;

image.png


执行插入操作,将Mysql 里的数据通过 Flink CDC结合Doris Flink Connector方式插入到 Doris中


INSERT INTO doris_test_sink select id,name from test_flink_cdc

image.png


提交成功之后我们在Flink的Web界面可以看到相关的Job任务信息


image.png


4.5.3 向Mysql表中插入数据


INSERT INTO test_cdc VALUES (123, 'this is a update');
INSERT INTO test_cdc VALUES (1212, '测试flink CDC');
INSERT INTO test_cdc VALUES (1234, '这是测试');
INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1');
INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2');
INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3');
INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4');
INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5');
INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6');
INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7');
INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8');
INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9');

4.5.4 观察Doris表的数据

首先停掉Insert into这个任务,因为我是在本地单机模式,只有一个task任务,所以要停掉,然后在命令行执行查询语句才能看到数据

image.png


4.5.5 修改Mysql的数据


重新启动Insert into任务

image.png



修改Mysql表里的数据

update test_cdc set name='这个是验证修改的操作' where id =123

再去观察Doris表中的数据,你会发现已经修改


注意这里如果要想Mysql表里的数据修改,Doris里的数据也同样修改,Doris数据表的模型要是Unique key模型,其他数据模型(Aggregate Key 和 Duplicate Key)不能进行数据的更新操作。

image.png


image.png


4.5.6 删除数据操作


目前Doris Flink Connector 还不支持删除操作,后面计划会加上这个操作




相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
6月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
560 33
The Past, Present and Future of Apache Flink
|
3月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
365 0
|
3月前
|
存储 缓存 数据挖掘
Flink + Doris 实时湖仓解决方案
本文整理自SelectDB技术副总裁陈明雨在Flink Forward Asia 2024的分享,聚焦Apache Doris与湖仓一体解决方案。内容涵盖三部分:一是介绍Apache Doris,一款高性能实时分析数据库,支持多场景应用;二是基于Doris、Flink和Paimon的湖仓解决方案,解决批流融合与数据一致性挑战;三是Doris社区生态及云原生发展,包括存算分离架构与600多位贡献者的活跃社区。文章深入探讨了Doris在性能、易用性及场景支持上的优势,并展示了其在多维分析、日志分析和湖仓分析中的实际应用案例。
266 17
Flink + Doris 实时湖仓解决方案
|
3月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
413 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
4月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
457 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
4月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
980 43
|
3月前
|
存储 大数据 数据处理
您有一份 Apache Flink 社区年度报告请查收~
您有一份 Apache Flink 社区年度报告请查收~
|
7月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
529 61
|
6月前
|
存储 SQL 人工智能
Apache Flink 2.0:Streaming into the Future
本文整理自阿里云智能高级技术专家宋辛童、资深技术专家梅源和高级技术专家李麟在 Flink Forward Asia 2024 主会场的分享。三位专家详细介绍了 Flink 2.0 的四大技术方向:Streaming、Stream-Batch Unification、Streaming Lakehouse 和 AI。主要内容包括 Flink 2.0 的存算分离云原生化、流批一体的 Materialized Table、Flink 与 Paimon 的深度集成,以及 Flink 在 AI 领域的应用。
1044 13
Apache Flink 2.0:Streaming into the Future
|
6月前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
315 17

热门文章

最新文章

推荐镜像

更多
下一篇
oss创建bucket