Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。
本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。

Flink-CDC 项目地址:
https://github.com/ververica/flink-cdc-connectors

本教程的演示基于 Docker 环境,都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。

假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。

对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。

接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:

一、准备阶段

准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

1.1 准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_PASSWORD=1234
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"

该 Docker Compose 中包含的容器有:

  • MySQL:商品表 products 和 订单表 orders 将存储在该数据库中, 这两张表将和 Postgres 数据库中的物流表 shipments 进行关联,得到一张包含更多信息的订单表 enriched_orders
  • Postgres:物流表 shipments 将存储在该数据库中;
  • Elasticsearch:最终的订单表 enriched_orders 将写到 Elasticsearch;
  • Kibana:用来可视化 ElasticSearch 的数据。

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

注:本教程接下来用到的容器相关的命令也都需要在 docker-compose.yml 所在目录下执行。

1.2 下载 Flink 和所需要的依赖包

  1. 下载 Flink 1.13.2 [1] 并将其解压至目录 flink-1.13.2
  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.13.2/lib/

[1] https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz

1.3 准备数据

1.3.1 在 MySQL 数据库中准备数据

  1. 进入 MySQL 容器:

    docker-compose exec mysql mysql -uroot -p123456
  2. 创建数据库和表 productsorders,并插入数据:

    -- MySQL
    CREATE DATABASE mydb;
    USE mydb;
    CREATE TABLE products (
      id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      name VARCHAR(255) NOT NULL,
      description VARCHAR(512)
    );
    ALTER TABLE products AUTO_INCREMENT = 101;
    
    INSERT INTO products
    VALUES (default,"scooter","Small 2-wheel scooter"),
           (default,"car battery","12V car battery"),
           (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
           (default,"hammer","12oz carpenter's hammer"),
           (default,"hammer","14oz carpenter's hammer"),
           (default,"hammer","16oz carpenter's hammer"),
           (default,"rocks","box of assorted rocks"),
           (default,"jacket","water resistent black wind breaker"),
           (default,"spare tire","24 inch spare tire");
    
    CREATE TABLE orders (
      order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      order_date DATETIME NOT NULL,
      customer_name VARCHAR(255) NOT NULL,
      price DECIMAL(10, 5) NOT NULL,
      product_id INTEGER NOT NULL,
      order_status BOOLEAN NOT NULL -- Whether order has been placed
    ) AUTO_INCREMENT = 10001;
    
    INSERT INTO orders
    VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
           (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
           (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

1.3.2 在 Postgres 数据库中准备数据

  1. 进入 Postgres 容器:

    docker-compose exec postgres psql -h localhost -U postgres
  2. 创建表 shipments,并插入数据:

    -- PG
    CREATE TABLE shipments (
       shipment_id SERIAL NOT NULL PRIMARY KEY,
       order_id SERIAL NOT NULL,
       origin VARCHAR(255) NOT NULL,
       destination VARCHAR(255) NOT NULL,
       is_arrived BOOLEAN NOT NULL
     );
     ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
     ALTER TABLE public.shipments REPLICA IDENTITY FULL;
     INSERT INTO shipments
     VALUES (default,10001,'Beijing','Shanghai',false),
            (default,10002,'Hangzhou','Shanghai',false),
            (default,10003,'Shanghai','Hangzhou',false);

二、启动 Flink 集群和 Flink SQL CLI

  1. 使用下面的命令跳转至 Flink 目录下:

    cd flink-1.13.2
  2. 使用下面的命令启动 Flink 集群:

    ./bin/start-cluster.sh

    启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

    image-20211117195311158

  3. 使用下面的命令启动 Flink SQL CLI

    ./bin/sql-client.sh

    启动成功后,可以看到如下的页面:

img

三、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔 3 秒做一次 checkpoint。

-- Flink SQL                   Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 products, orders, shipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据。

-- Flink SQLFlink SQL> CREATE TABLE products (    id INT,    name STRING,    description STRING,    PRIMARY KEY (id) NOT ENFORCED  ) WITH (    'connector' = 'mysql-cdc',    'hostname' = 'localhost',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'mydb',    'table-name' = 'products'  );Flink SQL> CREATE TABLE orders (   order_id INT,   order_date TIMESTAMP(0),   customer_name STRING,   price DECIMAL(10, 5),   product_id INT,   order_status BOOLEAN,   PRIMARY KEY (order_id) NOT ENFORCED ) WITH (   'connector' = 'mysql-cdc',   'hostname' = 'localhost',   'port' = '3306',   'username' = 'root',   'password' = '123456',   'database-name' = 'mydb',   'table-name' = 'orders' );Flink SQL> CREATE TABLE shipments (   shipment_id INT,   order_id INT,   origin STRING,   destination STRING,   is_arrived BOOLEAN,   PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH (   'connector' = 'postgres-cdc',   'hostname' = 'localhost',   'port' = '5432',   'username' = 'postgres',   'password' = 'postgres',   'database-name' = 'postgres',   'schema-name' = 'public',   'table-name' = 'shipments' );

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中。

-- Flink SQLFlink SQL> CREATE TABLE enriched_orders (   order_id INT,   order_date TIMESTAMP(0),   customer_name STRING,   price DECIMAL(10, 5),   product_id INT,   order_status BOOLEAN,   product_name STRING,   product_description STRING,   shipment_id INT,   origin STRING,   destination STRING,   is_arrived BOOLEAN,   PRIMARY KEY (order_id) NOT ENFORCED ) WITH (     'connector' = 'elasticsearch-7',     'hosts' = 'http://localhost:9200',     'index' = 'enriched_orders' );

四、关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中。

-- Flink SQLFlink SQL> INSERT INTO enriched_orders SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived FROM orders AS o LEFT JOIN products AS p ON o.product_id = p.id LEFT JOIN shipments AS s ON o.order_id = s.order_id;

启动成功后,可以访问 http://localhost:8081/#/job/running 在 Flink Web UI 上看到正在运行的 Flink Streaming Job,如下图所示:

img

现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders

kibana-create-index-pattern

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了。

kibana-detailed-orders

接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana 中显示的订单数据也将实时更新。

  1. 在 MySQL 的 orders 表中插入一条数据:

    --MySQLINSERT INTO ordersVALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
  2. 在 Postgres 的 shipment 表中插入一条数据:

    --PGINSERT INTO shipmentsVALUES (default,10004,'Shanghai','Beijing',false);
  3. 在 MySQL 的 orders 表中更新订单的状态:

    --MySQLUPDATE orders SET order_status = true WHERE order_id = 10004;
  4. 在 Postgres 的 shipment 表中更新物流的状态:

    --PGUPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
  5. 在 MYSQL 的 orders 表中删除一条数据:

    --MySQLDELETE FROM orders WHERE order_id = 10004;

    每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:

    kibana-detailed-orders-changes

五、环境清理

本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

在 Flink 所在目录 flink-1.13.2 下执行如下命令停止 Flink 集群:

./bin/stop-cluster.sh

六、总结

在本文中,我们以一个简单的业务场景展示了如何使用 Flink CDC 快速构建 Streaming ETL。希望通过本文,能够帮助读者快速上手 Flink CDC ,也希望 Flink CDC 能满足你的业务需求。

更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~

img


近期热点

img


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
消息中间件 存储 传感器
72 0
|
2月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
1022 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
4月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
5月前
|
数据采集 SQL canal
Amoro + Flink CDC 数据融合入湖新体验
本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
257 1
Amoro + Flink CDC 数据融合入湖新体验
|
14天前
|
关系型数据库 MySQL 分布式数据库
阿里云PolarDB云原生数据库收费价格:MySQL和PostgreSQL详细介绍
阿里云PolarDB兼容MySQL、PostgreSQL及Oracle语法,支持集中式与分布式架构。标准版2核4G年费1116元起,企业版最高性能达4核16G,支持HTAP与多级高可用,广泛应用于金融、政务、互联网等领域,TCO成本降低50%。
|
15天前
|
SQL 关系型数据库 MySQL
Mysql数据恢复—Mysql数据库delete删除后数据恢复案例
本地服务器,操作系统为windows server。服务器上部署mysql单实例,innodb引擎,独立表空间。未进行数据库备份,未开启binlog。 人为误操作使用Delete命令删除数据时未添加where子句,导致全表数据被删除。删除后未对该表进行任何操作。需要恢复误删除的数据。 在本案例中的mysql数据库未进行备份,也未开启binlog日志,无法直接还原数据库。
|
15天前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。
|
16天前
|
关系型数据库 分布式数据库 数据库
阿里云数据库收费价格:MySQL、PostgreSQL、SQL Server和MariaDB引擎费用整理
阿里云数据库提供多种类型,包括关系型与NoSQL,主流如PolarDB、RDS MySQL/PostgreSQL、Redis等。价格低至21元/月起,支持按需付费与优惠套餐,适用于各类应用场景。
|
21天前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎,提供高性价比、稳定安全的云数据库服务,适用于多种行业与业务场景。
|
21天前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
57 3

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多