Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL

简介: 本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。
本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。

Flink-CDC 项目地址:
https://github.com/ververica/flink-cdc-connectors

本教程的演示基于 Docker 环境,都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。

假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。

对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。

接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:

一、准备阶段

准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

1.1 准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_PASSWORD=1234
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"

该 Docker Compose 中包含的容器有:

  • MySQL:商品表 products 和 订单表 orders 将存储在该数据库中, 这两张表将和 Postgres 数据库中的物流表 shipments 进行关联,得到一张包含更多信息的订单表 enriched_orders
  • Postgres:物流表 shipments 将存储在该数据库中;
  • Elasticsearch:最终的订单表 enriched_orders 将写到 Elasticsearch;
  • Kibana:用来可视化 ElasticSearch 的数据。

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

注:本教程接下来用到的容器相关的命令也都需要在 docker-compose.yml 所在目录下执行。

1.2 下载 Flink 和所需要的依赖包

  1. 下载 Flink 1.13.2 [1] 并将其解压至目录 flink-1.13.2
  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.13.2/lib/

[1] https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz

1.3 准备数据

1.3.1 在 MySQL 数据库中准备数据

  1. 进入 MySQL 容器:

    docker-compose exec mysql mysql -uroot -p123456
  2. 创建数据库和表 productsorders,并插入数据:

    -- MySQL
    CREATE DATABASE mydb;
    USE mydb;
    CREATE TABLE products (
      id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      name VARCHAR(255) NOT NULL,
      description VARCHAR(512)
    );
    ALTER TABLE products AUTO_INCREMENT = 101;
    
    INSERT INTO products
    VALUES (default,"scooter","Small 2-wheel scooter"),
           (default,"car battery","12V car battery"),
           (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
           (default,"hammer","12oz carpenter's hammer"),
           (default,"hammer","14oz carpenter's hammer"),
           (default,"hammer","16oz carpenter's hammer"),
           (default,"rocks","box of assorted rocks"),
           (default,"jacket","water resistent black wind breaker"),
           (default,"spare tire","24 inch spare tire");
    
    CREATE TABLE orders (
      order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      order_date DATETIME NOT NULL,
      customer_name VARCHAR(255) NOT NULL,
      price DECIMAL(10, 5) NOT NULL,
      product_id INTEGER NOT NULL,
      order_status BOOLEAN NOT NULL -- Whether order has been placed
    ) AUTO_INCREMENT = 10001;
    
    INSERT INTO orders
    VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
           (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
           (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

1.3.2 在 Postgres 数据库中准备数据

  1. 进入 Postgres 容器:

    docker-compose exec postgres psql -h localhost -U postgres
  2. 创建表 shipments,并插入数据:

    -- PG
    CREATE TABLE shipments (
       shipment_id SERIAL NOT NULL PRIMARY KEY,
       order_id SERIAL NOT NULL,
       origin VARCHAR(255) NOT NULL,
       destination VARCHAR(255) NOT NULL,
       is_arrived BOOLEAN NOT NULL
     );
     ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
     ALTER TABLE public.shipments REPLICA IDENTITY FULL;
     INSERT INTO shipments
     VALUES (default,10001,'Beijing','Shanghai',false),
            (default,10002,'Hangzhou','Shanghai',false),
            (default,10003,'Shanghai','Hangzhou',false);

二、启动 Flink 集群和 Flink SQL CLI

  1. 使用下面的命令跳转至 Flink 目录下:

    cd flink-1.13.2
  2. 使用下面的命令启动 Flink 集群:

    ./bin/start-cluster.sh

    启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

    image-20211117195311158

  3. 使用下面的命令启动 Flink SQL CLI

    ./bin/sql-client.sh

    启动成功后,可以看到如下的页面:

img

三、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔 3 秒做一次 checkpoint。

-- Flink SQL                   Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 products, orders, shipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据。

-- Flink SQLFlink SQL> CREATE TABLE products (    id INT,    name STRING,    description STRING,    PRIMARY KEY (id) NOT ENFORCED  ) WITH (    'connector' = 'mysql-cdc',    'hostname' = 'localhost',    'port' = '3306',    'username' = 'root',    'password' = '123456',    'database-name' = 'mydb',    'table-name' = 'products'  );Flink SQL> CREATE TABLE orders (   order_id INT,   order_date TIMESTAMP(0),   customer_name STRING,   price DECIMAL(10, 5),   product_id INT,   order_status BOOLEAN,   PRIMARY KEY (order_id) NOT ENFORCED ) WITH (   'connector' = 'mysql-cdc',   'hostname' = 'localhost',   'port' = '3306',   'username' = 'root',   'password' = '123456',   'database-name' = 'mydb',   'table-name' = 'orders' );Flink SQL> CREATE TABLE shipments (   shipment_id INT,   order_id INT,   origin STRING,   destination STRING,   is_arrived BOOLEAN,   PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH (   'connector' = 'postgres-cdc',   'hostname' = 'localhost',   'port' = '5432',   'username' = 'postgres',   'password' = 'postgres',   'database-name' = 'postgres',   'schema-name' = 'public',   'table-name' = 'shipments' );

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中。

-- Flink SQLFlink SQL> CREATE TABLE enriched_orders (   order_id INT,   order_date TIMESTAMP(0),   customer_name STRING,   price DECIMAL(10, 5),   product_id INT,   order_status BOOLEAN,   product_name STRING,   product_description STRING,   shipment_id INT,   origin STRING,   destination STRING,   is_arrived BOOLEAN,   PRIMARY KEY (order_id) NOT ENFORCED ) WITH (     'connector' = 'elasticsearch-7',     'hosts' = 'http://localhost:9200',     'index' = 'enriched_orders' );

四、关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中。

-- Flink SQLFlink SQL> INSERT INTO enriched_orders SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived FROM orders AS o LEFT JOIN products AS p ON o.product_id = p.id LEFT JOIN shipments AS s ON o.order_id = s.order_id;

启动成功后,可以访问 http://localhost:8081/#/job/running 在 Flink Web UI 上看到正在运行的 Flink Streaming Job,如下图所示:

img

现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders

kibana-create-index-pattern

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了。

kibana-detailed-orders

接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana 中显示的订单数据也将实时更新。

  1. 在 MySQL 的 orders 表中插入一条数据:

    --MySQLINSERT INTO ordersVALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
  2. 在 Postgres 的 shipment 表中插入一条数据:

    --PGINSERT INTO shipmentsVALUES (default,10004,'Shanghai','Beijing',false);
  3. 在 MySQL 的 orders 表中更新订单的状态:

    --MySQLUPDATE orders SET order_status = true WHERE order_id = 10004;
  4. 在 Postgres 的 shipment 表中更新物流的状态:

    --PGUPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
  5. 在 MYSQL 的 orders 表中删除一条数据:

    --MySQLDELETE FROM orders WHERE order_id = 10004;

    每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:

    kibana-detailed-orders-changes

五、环境清理

本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

在 Flink 所在目录 flink-1.13.2 下执行如下命令停止 Flink 集群:

./bin/stop-cluster.sh

六、总结

在本文中,我们以一个简单的业务场景展示了如何使用 Flink CDC 快速构建 Streaming ETL。希望通过本文,能够帮助读者快速上手 Flink CDC ,也希望 Flink CDC 能满足你的业务需求。

更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~

img


近期热点

img


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
消息中间件 存储 传感器
271 0
|
5月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
2214 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
7月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
8月前
|
数据采集 SQL canal
Amoro + Flink CDC 数据融合入湖新体验
本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
467 1
Amoro + Flink CDC 数据融合入湖新体验
|
SQL 存储 消息中间件
深入理解Flink Streaming SQL
序言        时效性提升数据的价值,所以Flink这样的流式(Streaming)计算系统应用得越来越广泛。        广大的普通用户决定一个产品的界面和接口。       ETL开发者需要简单而有效的开发工具,从而把更多时间花在理业务和对口径上。  &n
20953 0
|
5月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
561 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
12月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
763 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。

热门文章

最新文章

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多