Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本篇教程将展示如何使用 Flink CDC 构建实时数据湖,并处理分库分表合并同步的场景。

作者:罗宇侠

本篇教程将展示如何使用 Flink CDC 构建实时数据湖,并处理分库分表合并同步的场景。
Flink-CDC 项目地址:

https://github.com/ververica/flink-cdc-connectors

Flink 中文学习网站
https://flink-learning.org.cn

在 OLTP 系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。

但是为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。

这篇教程将展示如何使用 Flink CDC 构建实时数据湖来应对这种场景,本教程的演示基于 Docker,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE,你可以很方便地在自己的电脑上完成本教程的全部内容。

接下来将以数据从 MySQL 同步到 Iceberg [1] 为例展示整个流程,架构图如下所示:

real-time-data-lake-tutorial

一、准备阶段

准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

1.1 准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:
  sql-client:
    user: flink:flink
    image: yuxialuo/flink-sql-client:1.13.2.v1 
    depends_on:
      - jobmanager
      - mysql
    environment:
      FLINK_JOBMANAGER_HOST: jobmanager
      MYSQL_HOST: mysql
    volumes:
      - shared-tmpfs:/tmp/iceberg
  jobmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
    volumes:
      - shared-tmpfs:/tmp/iceberg
  taskmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
    volumes:
      - shared-tmpfs:/tmp/iceberg
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

volumes:
  shared-tmpfs:
    driver: local
    driver_opts:
      type: "tmpfs"
      device: "tmpfs"

该 Docker Compose 中包含的容器有:

  • SQL-Client:Flink SQL Client, 用来提交 SQL 查询和查看 SQL 的执行结果;
  • Flink Cluster:包含 Flink JobManager 和 Flink TaskManager,用来执行 Flink SQL;
  • MySQL:作为分库分表的数据源,存储本教程的 user 表。

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8081/ 来查看 Flink 是否运行正常。

flink-ui

注意:

  1. 本教程接下来用到的容器相关的命令都需要在 docker-compose.yml 所在目录下执行。
  2. 为了简化整个教程,本教程需要的 jar 包都已经被打包进 SQL-Client 容器中了,镜像的构建脚本可以在 GitHub [2] 上找到。

    如果你想要在自己的 Flink 环境运行本教程,需要下载下面列出的包并且把它们放在 Flink 所在目录的 lib 目录下,即 FLINK_HOME/lib/

    截止目前支持 Flink 1.13 的 iceberg-flink-runtime jar 包还没有发布,所以我们在这里提供了一个支持 Flink 1.13 的 iceberg-flink-runtime jar 包,这个 jar 包是基于 Iceberg 的 master 分支打包的。

    当 Iceberg 0.13.0 版本发布后,你也可以在 apache official repository [3] 下载到支持 Flink 1.13 的 iceberg-flink-runtime jar 包。

1.2 准备数据

  1. 进入 MySQL 容器中:

    docker-compose exec mysql mysql -uroot -p123456
  2. 创建数据和表,并填充数据。

    创建两个不同的数据库,并在每个数据库中创建两个表,作为 user 表分库分表下拆分出的表。

     CREATE DATABASE db_1;
     USE db_1;
     CREATE TABLE user_1 (
       id INTEGER NOT NULL PRIMARY KEY,
       name VARCHAR(255) NOT NULL DEFAULT 'flink',
       address VARCHAR(1024),
       phone_number VARCHAR(512),
       email VARCHAR(255)
     );
     INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");
    
     CREATE TABLE user_2 (
       id INTEGER NOT NULL PRIMARY KEY,
       name VARCHAR(255) NOT NULL DEFAULT 'flink',
       address VARCHAR(1024),
       phone_number VARCHAR(512),
       email VARCHAR(255)
     );
    INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");
    CREATE DATABASE db_2;
    USE db_2;
    CREATE TABLE user_1 (
      id INTEGER NOT NULL PRIMARY KEY,
      name VARCHAR(255) NOT NULL DEFAULT 'flink',
      address VARCHAR(1024),
      phone_number VARCHAR(512),
      email VARCHAR(255)
    );
    INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);
    
    CREATE TABLE user_2 (
      id INTEGER NOT NULL PRIMARY KEY,
      name VARCHAR(255) NOT NULL DEFAULT 'flink',
      address VARCHAR(1024),
      phone_number VARCHAR(512),
      email VARCHAR(255)
    );
    INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");

二、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,使用如下的命令进入 Flink SQL CLI 容器中:

docker-compose exec sql-client ./sql-client

我们可以看到如下界面:

img

然后,进行如下步骤:

  1. 开启 checkpoint

    Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。
    并且,mysql-cdc 在 binlog 读取阶段开始前,需要等待一个完整的 checkpoint 来避免 binlog 记录乱序的情况。

    -- Flink SQL
    -- 每隔 3 秒做一次 checkpoint                 
    Flink SQL> SET execution.checkpointing.interval = 3s;
  2. 创建 MySQL 分库分表 source 表

    创建 source 表 user_source 来捕获MySQL中所有 user 表的数据,在表的配置项 database-name , table-name 使用正则表达式来匹配这些表。
    并且,user_source 表也定义了 metadata 列来区分数据是来自哪个数据库和表。

    -- Flink SQL
    Flink SQL> CREATE TABLE user_source (
        database_name STRING METADATA VIRTUAL,
        table_name STRING METADATA VIRTUAL,
        `id` DECIMAL(20, 0) NOT NULL,
        name STRING,
        address STRING,
        phone_number STRING,
        email STRING,
        PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'mysql',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'db_[0-9]+',
        'table-name' = 'user_[0-9]+'
      );
  3. 创建 Iceberg sink 表

    创建 sink 表 all_users_sink,用来将数据加载至 Iceberg 中。
    在这个 sink 表,考虑到不同的 MySQL 数据库表的 id 字段的值可能相同,我们定义了复合主键 (database_name, table_name, id)。

    -- Flink SQL
    Flink SQL> CREATE TABLE all_users_sink (
        database_name STRING,
        table_name    STRING,
        `id`          DECIMAL(20, 0) NOT NULL,
        name          STRING,
        address       STRING,
        phone_number  STRING,
        email         STRING,
        PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
      ) WITH (
        'connector'='iceberg',
        'catalog-name'='iceberg_catalog',
        'catalog-type'='hadoop',  
        'warehouse'='file:///tmp/iceberg/warehouse',
        'format-version'='2'
      );

三、流式写入 Iceberg

  1. 使用下面的 Flink SQL 语句将数据从 MySQL 写入 Iceberg 中:

    -- Flink SQL
    Flink SQL> INSERT INTO all_users_sink select * from user_source;

    上述命令将会启动一个流式作业,源源不断将 MySQL 数据库中的全量和增量数据同步到 Iceberg 中。
    Flink UI [4] 上可以看到这个运行的作业:

    flink-cdc-iceberg-running-job

    然后我们就可以使用如下的命令看到 Iceberg 中的写入的文件:

    docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/

    如下所示:

    files-in-iceberg

    在你的运行环境中,实际的文件可能与上面的截图不相同,但是整体的目录结构应该相似。

  2. 使用下面的 Flink SQL 语句查询表 all_users_sink 中的数据:

    -- Flink SQL
    Flink SQL> SELECT * FROM all_users_sink;

    在 Flink SQL CLI 中我们可以看到如下查询结果:

    data_in_iceberg

    修改 MySQL 中表的数据,Iceberg 中的表 all_users_sink 中的数据也将实时更新:

    (3.1) 在 db_1.user_1 表中插入新的一行

    --- db_1
    INSERT INTO db_1.user_1 VALUES (111,"user_111","Shanghai","123567891234","user_111@foo.com");

    (3.2) 更新 db_1.user_2 表的数据

    --- db_1
    UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;

    (3.3) 在 db_2.user_2 表中删除一行

    --- db_2
    DELETE FROM db_2.user_2 WHERE id=220;

    每执行一步,我们就可以在 Flink Client CLI 中使用 SELECT * FROM all_users_sink 查询表 all_users_sink 来看到数据的变化。

    最后的查询结果如下所示:

    final-data-in-iceberg

    从 Iceberg 的最新结果中可以看到新增了(db_1, user_1, 111)的记录,(db_1, user_2, 120)的地址更新成了 Beijing,且(db_2, user_2, 220)的记录被删除了,与我们在 MySQL 做的数据更新完全一致。

四、环境清理

本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

五、总结

在本文中,我们展示了如何使用 Flink CDC 同步 MySQL 分库分表的数据,快速构建 Icberg 实时数据湖。用户也可以同步其他数据库(Postgres/Oracle)的数据到 Hudi 等数据湖中。最后希望通过本文,能够帮助读者快速上手 Flink CDC 。

更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~

img

注释:

[1] https://iceberg.apache.org/

[2] https://github.com/luoyuxia/flink-cdc-tutorial/tree/main/flink-cdc-iceberg-demo/sql-client

[3] https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/


Flink Forward Asia 2021

2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。

大会官网:
https://flink-forward.org.cn

大会线上观看地址 (记得预约哦):
https://developer.aliyun.com/special/ffa2021/live

img

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4天前
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
265 22
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
292 0
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
336 2
|
29天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
97 16
|
20天前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何创建mysql临时表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
如何在Dataphin中构建Flink+Paimon流式湖仓方案
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。 本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
7938 10
如何在Dataphin中构建Flink+Paimon流式湖仓方案
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
5月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
556 1

相关产品

  • 实时计算 Flink版