Flink CDC 系列 - 实现 MySQL 数据实时写入 Apache Doris

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink CDC 结合 Doris Flink Connector 实现 MySQL 数据实时入 Apache Doris。

本文通过实例来演示怎么通过 Flink CDC 结合 Doris 的 Flink Connector 实现从 Mysql 数据库中监听数据并实时入库到 Doris 数仓对应的表中。主要内容包括:

  1. 什么是 CDC
  2. Flink CDC
  3. 什么是 Flink Doris Connector
  4. 用法示例

Flink 中文学习网站
https://flink-learning.org.cn

一、什么是 CDC

CDC 是变更数据捕获 (Change Data Capture) 技术的缩写,它可以将源数据库 (Source) 的增量变动记录,同步到一个或多个数据目的 (Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组 (GROUP BY)、多表的关联 (JOIN) 等。

例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门需要将每分钟的实时数据简单聚合处理后保存到 Redis 中以供查询,B 部门需要将当天的数据暂存到 Elasticsearch 一份来做报表展示,C 部门也需要一份数据到 ClickHouse 做实时数仓。随着时间的推移,后续 D 部门、E 部门也会有数据分析的需求,这种场景下,传统的拷贝分发多个副本方法很不灵活,而 CDC 可以实现一份变动记录,实时处理并投递到多个目的地。

CDC 的应用场景

  • 数据同步:用于备份,容灾;
  • 数据分发:一个数据源分发给多个下游系统;
  • 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。

CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:

  • 基于查询的 CDC

    • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
    • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
    • 不保障实时性,基于离线调度存在天然的延迟。
  • 基于日志的 CDC

    • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
    • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

二、Flink CDC

Flink 在 1.11 版本中新增了 CDC 的特性,简称改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看 CDC 的内容。

img

以上是之前的 mysq binlog 日志处理流程,例如 canal 监听 binlog 把日志写入到 kafka 中。而 Apache Flink 实时消费 Kakfa 的数据实现 mysql 数据的同步或其他内容等。拆分来说整体上可以分为以下几个阶段:

  1. Mysql 开启 binlog;
  2. Canal 同步 binlog 数据写入到 Kafka;
  3. Flink 读取 Kakfa 中的 binlog 数据进行相关的业务处理。

整体的处理链路较长,需要用到的组件也比较多。Apache Flink CDC 可以直接从数据库获取到 binlog 供下游进行业务计算分析

Flink Connector Mysql CDC 2.0 特性

提供 MySQL CDC 2.0,核心 feature 包括:

  • 并发读取,全量数据的读取性能可以水平扩展;
  • 全程无锁,不对线上业务产生锁的风险;
  • 断点续传,支持全量阶段的 checkpoint。

网上有测试文档显示用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段:

  • MySQL CDC 2.0 用时 13 分钟;
  • MySQL CDC 1.4 用时 89 分钟;
  • 读取性能提升 6.8 倍。

三、什么是 Flink Doris Connector

Flink Doris Connector 是 Doris 社区为了方便用户使用 Flink 读写 Doris 数据表的一个扩展,目前 Doris 支持 Flink 1.11.x ,1.12.x,1.13.x;Scala 版本:2.12.x。

目前 Flink Doris connector 目前控制入库通过两个参数:

  1. sink.batch.size:每多少条写入一次,默认 100 条;
  2. sink.batch.interval :每个多少秒写入一下,默认 1 秒。

这两参数同时起作用,哪个条件先到就触发写 Doris 表操作,

注意:

这里注意的是要启用 http v2 版本,具体在 fe.conf 中配置 enable_http_server_v2=true,同时因为是通过 fe http rest api 获取 be 列表,这俩需要配置的用户有 admin 权限。

四、用法示例

4.1 Flink Doris Connector 编译

首先我们要编译 Doris 的 Flink connector,也可以通过下面的地址进行下载:

https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar

注意:

这里因为 Doris 的 Flink Connector 是基于 Scala 2.12.x 版本进行开发的,所以你在使用 Flink 的时候请选择对应 Scala 2.12 的版本,如果你使用上面地址下载了相应的 jar,请忽略下面的编译内容部分。

在 Doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 下面的 JDK 版本是 11,会存在编译问题。

在 extension/flink-doris-connector/ 源码目录下执行:

sh build.sh

编译成功后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 FlinkClassPath 中即可使用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn 集群模式运行的 Flink,则将此文件放入预部署包中。

针对 Flink 1.13.x 版本适配问题

   <properties>
        <scala.version>2.12</scala.version>
        <flink.version>1.11.2</flink.version>
        <libthrift.version>0.9.3</libthrift.version>
        <arrow.version>0.15.1</arrow.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <doris.home>${basedir}/../../</doris.home>
        <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
    </properties>

只需要将这里的 flink.version 改成和你 Flink 集群版本一致,重新编辑即可。

4.2 配置 Flink

这里我们是通过 Flink Sql Client 方式来进行操作。

这里我们演示使用的软件版本:

  1. Mysql 8.x
  2. Apache Flink :1.13.3
  3. Apache Doris :0.14.13.1

4.2.1 安装 Flink

首先下载和安装 Flink :

https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz

这里演示使用的是本地单机模式:

# wget https://dlcdn.apache.org/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz
# tar zxvf flink-1.12.5-bin-scala_2.12.tgz 

下载 Flink CDC 相关 Jar 包:

https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar

这里注意 Flink CDC 和 Flink 的版本对应关系。

image-20211025170642628

  • 将上面下载或者编译好的 Flink Doris Connector jar 包复制到 Flink 根目录下的 lib 目录下;
  • Flink CDC 的 jar 包也复制到 Flink 根目录下的 lib 目录下。

image-20211026095513892

4.2.2 启动 Flink

这里我们使用的是本地单机模式。

# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host doris01.
Starting taskexecutor daemon on host doris01.

我们通过 web 访问 (默认端口是 8081) 启动起来 Flink 集群,可以看到集群正常启动。

image-20211025162831632

4.3 安装 Apache Doris

具体安装部署 Doris 的方法,参照下面的连接:

https://hf200012.github.io/2021/09/Apache-Doris-环境安装部署

4.4 安装配置 Mysql

  1. 安装 Mysql,快速使用 Docker 安装配置 Mysql,具体参照下面的连接:

    https://segmentfault.com/a/1190000021523570

  2. 开启 Mysql binlog,进入 Docker 容器修改 /etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,

    log_bin=mysql_bin
    binlog-format=Row
    server-id=1

    然后重启 Mysql。

    systemctl restart mysqld
  3. 创建 Mysql 数据库表。
 CREATE TABLE `test_cdc` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB 

4.5 创建 Doris 表

CREATE TABLE `doris_test` (
  `id` int NULL COMMENT "",
  `name` varchar(100) NULL COMMENT ""
 ) ENGINE=OLAP
 UNIQUE KEY(`id`)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );

4.6 启动 Flink Sql Client

./bin/sql-client.sh embedded
> set execution.result-mode=tableau;

image-20211025165547903

4.6.1 创建 Flink CDC Mysql 映射表

CREATE TABLE test_flink_cdc ( 
  id INT, 
  name STRING,
  primary key(id)  NOT ENFORCED
) WITH ( 
  'connector' = 'mysql-cdc', 
  'hostname' = 'localhost', 
  'port' = '3306', 
  'username' = 'root', 
  'password' = 'password', 
  'database-name' = 'demo', 
  'table-name' = 'test_cdc' 
);

执行查询创建的 Mysql 映射表,显示正常。

select * from test_flink_cdc;

image-20211026100505972

4.6.2 创建 Flink Doris Table 映射表

使用 Doris Flink Connector 创建 Doris 映射表。

CREATE TABLE doris_test_sink (
   id INT,
   name STRING
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'localhost:8030',
  'table.identifier' = 'db_audit.doris_test',
  'sink.batch.size' = '2',
  'sink.batch.interval'='1',
  'username' = 'root',
  'password' = ''
)

在命令行下执行上面的语句,可以看到创建表成功,然后执行查询语句,验证是否正常。

select * from doris_test_sink;

image-20211026100804091

执行插入操作,将 Mysql 里的数据通过 Flink CDC 结合 Doris Flink Connector 方式插入到 Doris 中。

INSERT INTO doris_test_sink select id,name from test_flink_cdc

image-20211026101004547

提交成功之后我们在 Flink 的 Web 界面可以看到相关的 Job 任务信息。

image-20211026100943474

4.6.3 向 Mysql 表中插入数据

INSERT INTO test_cdc VALUES (123, 'this is a update');
INSERT INTO test_cdc VALUES (1212, '测试flink CDC');
INSERT INTO test_cdc VALUES (1234, '这是测试');
INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1');
INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2');
INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3');
INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4');
INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5');
INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6');
INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7');
INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8');
INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9');

4.6.4 观察 Doris 表的数据

首先停掉 Insert into 这个任务,因为我是在本地单机模式,只有一个 task 任务,所以要停掉,然后在命令行执行查询语句才能看到数据。

image-20211026101203629

4.6.5 修改 Mysql 的数据

重新启动 Insert into 任务:

image-20211025182341086

修改 Mysql 表里的数据:

update test_cdc set name='这个是验证修改的操作' where id =123

再去观察 Doris 表中的数据,你会发现已经修改。

注意这里如果要想 Mysql 表里的数据修改,Doris 里的数据也同样修改,Doris 数据表的模型要是 Unique key 模型,其他数据模型 (Aggregate Key 和 Duplicate Key) 不能进行数据的更新操作。

image-20211025182435827

4.6.6 删除数据操作

目前 Doris Flink Connector 还不支持删除操作,后面计划会加上这个操作。

更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~

img


相关文章

近期热点

img


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
17天前
|
存储 SQL Apache
Apache Doris 创始人:何为“现代化”的数据仓库?
3.0 版本是 Apache Doris 研发路程中的重要里程碑,他将这一进展总结为“实时之路”、“统一之路”和“弹性之路”,详细介绍了所对应的核心特性的设计思考与应用价值,揭晓了 2025 年社区发展蓝图
Apache Doris 创始人:何为“现代化”的数据仓库?
|
19天前
|
SQL 存储 数据处理
别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!
别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!
63 1
别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!
|
21天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
67 16
|
9天前
|
存储 SQL 监控
计算效率提升 10 倍,存储成本降低 60%,灵犀科技基于 Apache Doris 建设统一数据服务平台
灵犀科技早期基于 Hadoop 构建大数据平台,在战略调整和需求的持续扩增下,数据处理效率、查询性能、资源成本问题随之出现。为此,引入 [Apache Doris](https://doris.apache.org/) 替换了复杂技术栈,升级为集存储、加工、服务为一体的统一架构,实现存储成本下降 60%,计算效率提升超 10 倍的显著成效。
计算效率提升 10 倍,存储成本降低 60%,灵犀科技基于 Apache Doris 建设统一数据服务平台
|
2月前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
1月前
|
SQL 存储 Apache
Apache Doris 3.0.3 版本正式发布
亲爱的社区小伙伴们,Apache Doris 3.0.3 版本已于 2024 年 12 月 02 日正式发布。该版本进一步提升了系统的性能及稳定性,欢迎大家下载体验。
|
2月前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
2月前
|
SQL 存储 数据处理
兼顾高性能与低成本,浅析 Apache Doris 异步物化视图原理及典型场景
Apache Doris 物化视图进行了支持。**早期版本中,Doris 支持同步物化视图;从 2.1 版本开始,正式引入异步物化视图,[并在 3.0 版本中完善了这一功能](https://www.selectdb.com/blog/1058)。**
|
2月前
|
SQL 存储 Java
Apache Doris 2.1.7 版本正式发布
亲爱的社区小伙伴们,**Apache Doris 2.1.7 版本已于 2024 年 11 月 10 日正式发布。**2.1.7 版本持续升级改进,同时在湖仓一体、异步物化视图、半结构化数据管理、查询优化器、执行引擎、存储管理、以及权限管理等方面完成了若干修复。欢迎大家下载使用。
|
2月前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多