Flink CDC这个SQL的作用吗?

Flink CDC这个SQL的作用吗?image.png

展开
收起
真的很搞笑 2023-10-18 16:42:39 80 分享 版权
阿里云 AI 助理回答

Flink CDC(Change Data Capture)的SQL主要用于实现实时数据同步表结构变更的自动同步,支持从源数据库捕获全量和增量数据,并将这些数据实时同步到目标存储系统中。以下是其具体作用和功能的详细说明:


1. 核心功能

Flink CDC通过SQL语句(如CREATE DATABASE ASCREATE TABLE AS)实现了以下核心功能: - 整库级别的数据同步:支持将一个或多个数据库中的所有表数据实时复制到目标存储中。 - 表结构变更的自动同步:当源表发生表结构变更(如新增列、修改列类型等),Flink CDC能够自动将这些变更同步到目标表中。 - 分库分表合并:支持将分散在不同数据库或表中的数据合并到一张目标表中,适用于分库分表场景。


2. 主要SQL语句及其作用

(1)CREATE DATABASE AS(CDAS)

  • 作用:用于实现整库级别的数据同步,支持多库多表的实时同步。
  • 特点
    • CREATE TABLE AS(CTAS)语法的扩展,能够将每个需要同步的表翻译成对应的CTAS语句。
    • 支持表结构变更的同步(Schema Evolution)。
    • 可以复用一个Source节点读取多张业务表的数据,减少对数据库的连接数和读取压力。
  • 示例
    CREATE DATABASE IF NOT EXISTS holo_tpcds
    AS DATABASE mysql.tpcds INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */;
    

(2)CREATE TABLE AS(CTAS)

  • 作用:用于单表或分库分表的数据同步。
  • 特点
    • 支持自定义计算列,可以在同步过程中进行数据转换。
    • 支持分库分表合并,将多个分库分表的数据同步到一张目标表中。
    • 可以通过STATEMENT SET语法将多个CTAS语句作为一个作业提交,优化Source节点的使用。
  • 示例
    CREATE TABLE IF NOT EXISTS user
    AS TABLE mysql.`wp.*`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */
    ADD COLUMN (
    `c_id` AS `id` + 10 AFTER `id`,
    `calss` AS 3 AFTER `id`
    );
    

3. 数据同步方式

Flink CDC支持两种主要的数据同步方式:

(1)多库多表同步

  • 适用场景:需要将一个或多个数据库中的多张业务表实时复制到另一个数据库。
  • 特点
    • 自动同步全量和增量数据。
    • 实时同步表结构变更(如加列操作)。
    • 使用CDAS语句实现,详情可参考CREATE DATABASE AS文档。

(2)分库分表合并

  • 适用场景:同一份业务数据分散在不同的数据库或表中,需要将其合并到一张目标表中。
  • 特点
    • 自动同步所有分库分表的全量和增量数据。
    • 支持新增符合正则表达式的新表的自动同步。
    • 使用CTAS语句实现,详情可参考CREATE TABLE AS文档。

4. 高级功能

(1)新增表的动态同步

  • 在VVR 8.0.6及以上版本中,Flink CDC支持动态捕获源库新增表并将其加入数据同步。
  • 配置步骤
    1. 开启新增表读取功能:
      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
      
    2. 当有新增表需要同步时,停止作业并创建快照。
    3. 重新部署SQL作业,并从快照恢复作业。

(2)Source复用优化

  • 作用:减少MySQL CDC源表对数据库的压力。
  • 配置方法
    SET 'table.optimizer.source-merge.enabled' = 'true';
    
  • 注意事项
    • 需要确保不同CDC源表的配置项(除数据库名、表名和server-id外)完全一致。
    • 启用后需要无状态启动作业,避免因拓扑改变导致的状态不兼容问题。

5. 限制与注意事项

  • 支持的同步方向:目前仅支持从MySQL到Hologres的数据同步。
  • 版本要求
    • CDAS语法需要Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持。
    • 新增表功能仅适用于默认的initial启动模式。
  • 重要提醒
    • 启用Source复用后,不建议将pipeline.operator-chaining设为false,否则会增加序列化和反序列化的开销。
    • Maven依赖管理:如果使用SNAPSHOT版本,需自行编译JAR包;建议使用稳定版本以避免潜在问题。

总结

Flink CDC的SQL语句(如CDAS和CTAS)主要用于实现实时数据同步表结构变更的自动同步,支持多库多表同步和分库分表合并等复杂场景。通过合理配置和优化(如Source复用、新增表动态同步等),可以显著提升数据同步效率并降低对源数据库的压力。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理