Flink CDC中用 SQL 方式配置 Flink CDC 任务,同一个 Job 中声明了多张表?

Flink CDC中用 SQL 方式配置 Flink CDC 任务,同一个 Job 中声明了多张表,最后任务提交后发现 MySQL 侧是多次订阅,这种场景有什么好的解决方案吗,保证 MySQL 侧只订阅一次?

展开
收起
真的很搞笑 2023-06-11 15:06:56 120 分享 版权
阿里云 AI 助理回答

在 Flink CDC 中,如果同一个 Job 中声明了多张表,可能会导致 MySQL 侧多次订阅 Binlog 数据的问题。这种场景下,可以通过以下解决方案来优化,确保 MySQL 侧只订阅一次 Binlog 数据:


1. 开启 Source 复用功能

Flink 提供了 Source 复用的功能,可以将具有相同配置参数的 MySQL 源表合并为一个 Binlog 连接,从而减少对 MySQL 的压力。

操作步骤:

  • 在 SQL 作业中通过 SET 命令开启 Source 复用功能:
    SET 'table.optimizer.source-merge.enabled' = 'true';
    
  • 注意:开启 Source 复用后,需要以无状态的方式启动作业(即全新启动),因为 Source 复用会导致作业拓扑发生变化,从原有状态可能无法正常启动或可能导致数据丢失。

效果:

  • 全量读取阶段:Binlog 连接数等于 Source 并发度。
  • 增量读取阶段:Binlog 连接数等于 1。

2. 配置相同的 Server ID 范围

当多个 MySQL CDC 源表使用相同的配置时,可以通过为这些表配置相同的 server-id 范围来实现数据源的复用,从而避免多次订阅 Binlog 数据。

操作步骤:

  • 在 SQL 语句中为每个 MySQL CDC 源表指定相同的 server-id 范围。例如:

    INSERT INTO target_table_1
    SELECT * FROM source_table_1 /*+ OPTIONS('server-id' = '5400-5408') */;
    
    INSERT INTO target_table_2
    SELECT * FROM source_table_2 /*+ OPTIONS('server-id' = '5400-5408') */;
    
  • 重要提示server-id 范围中的唯一 ID 数量必须大于等于作业的并发度。例如,范围 5400-5408 提供了 9 个唯一的 server-id,因此作业最多可以设置 9 个并发。

3. 将表同步到 Kafka 消息队列

如果 MySQL 侧的压力过大,可以考虑将表的数据先同步到 Kafka 消息队列中,再通过消费 Kafka 中的数据进行解耦。

操作步骤:

  1. 使用 Flink CDC 将 MySQL 表的数据同步到 Kafka。
  2. 在下游任务中消费 Kafka 中的数据,完成后续处理。

优点:

  • 减少对 MySQL 的直接连接压力。
  • 提高数据消费速度,解决消费同一个 Binlog 导致的数据瓶颈。

4. 合并多个 CTAS 作业

如果通过 CTAS(Create Table As Select)方式同步数据导致 MySQL 压力过大,可以将多个 CTAS 作业合并为一个作业运行,并为每个 MySQL CDC 源表配置相同的 server-id,从而实现数据源的复用。

操作步骤:

  • 将多个 CTAS 语句合并为一个作业。例如:
    CREATE TABLE target_table_1 AS SELECT * FROM source_table_1;
    CREATE TABLE target_table_2 AS SELECT * FROM source_table_2;
    
  • 确保所有源表的 server-id 配置一致。

5. 使用正则表达式匹配多张表

如果需要监控多张表,可以通过正则表达式匹配表名,从而减少重复声明表的数量。

操作步骤:

  • table-name 参数中使用正则表达式匹配多张表。例如:
    'table-name' = 'user_.*'
    
  • 上述配置会监控所有以 user_ 为前缀的表。

注意事项:

  • 正则表达式不支持逗号分隔形式指定多张表,需使用竖线(|)连接并用小括号包围。例如:
    'table-name' = '(user|product)'
    

6. 监控和调优

为了进一步优化性能,可以结合 Flink 的自动调优功能(Autopilot)来动态调整资源分配。

操作步骤:

  • 在作业运维页面,将自动调优模式设置为 Active 模式。
  • 自动调优会根据流量情况动态调整并发度,从而节约计算资源。

总结建议

  • 优先推荐:开启 Source 复用功能,并为所有 MySQL CDC 源表配置相同的 server-id 范围。
  • 如果 MySQL 压力仍然较大,可以考虑将数据同步到 Kafka 消息队列中进行解耦。
  • 对于复杂的多表场景,使用正则表达式匹配表名,减少重复声明表的数量。

通过以上方法,可以有效减少 MySQL 侧的多次订阅问题,同时提升整体作业的性能和稳定性。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

大数据领域前沿技术分享与交流,这里不止有技术干货、学习心得、企业实践、社区活动,还有未来。

还有其他疑问?
咨询AI助理