flink cdc作业同步mysql一个数据库下面的多个表,运行一段时间后,使用中还是有这个问题?

flink cdc作业同步mysql一个数据库下面的多个表,运行一段时间后,停止作业新增一张表,从savepoint启动,所有表都doump了,看issue找到相同的问题,https://github.com/ververica/flink-cdc-connectors/pull/2096,flinkcdc 3.0.0按道理应该解决了,但是使用中还是有这个问题?

展开
收起
真的很搞笑 2023-12-31 21:11:17 62 发布于黑龙江 分享
分享
版权
举报
阿里云 AI 助理回答

问题分析

您提到在使用 Flink CDC 同步 MySQL 数据库中的多个表时,停止作业并新增一张表后,从 Savepoint 启动作业时出现了所有表数据丢失的问题。尽管根据社区的 Issue 和 PR(如 flink-cdc-connectors#2096),Flink CDC 3.0.0 应该已经解决了这个问题,但在实际使用中仍然遇到了类似问题。

以下是针对该问题的详细分析和解决方案。


可能原因及解决方法

1. Savepoint 的兼容性问题

Flink CDC 在处理 Savepoint 时,可能会因为版本升级或配置变更导致 Savepoint 不兼容。如果 Savepoint 中记录的元数据与当前作业的表结构不一致,可能会导致数据丢失或同步失败。

解决方法: - 确保 Flink CDC 版本与 Savepoint 的版本完全兼容。如果升级了 Flink CDC 版本,建议重新生成 Savepoint。 - 在启动作业时,检查 Savepoint 的元数据是否包含所有需要同步的表。如果新增表未被正确记录,可能导致同步失败。

2. 新增表未正确捕获

根据知识库资料,Flink CDC 在新增表时需要满足以下条件才能正确捕获: - 启用新增表功能:需要在 SQL 作业中显式开启 scan.newly-added-table.enabled 参数。 - 启动模式限制:新增表功能仅支持 initial 启动模式。如果作业以其他模式启动,新增表可能无法被捕获。

解决方法: 1. 在 SQL 作业中添加以下配置,确保启用了新增表功能:

SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
  1. 停止作业时,勾选 停止前创建一次快照,以便保留当前状态。
  2. 修改 SQL 作业,新增表的同步逻辑,并重新部署作业。
  3. 从快照恢复作业时,确保选择正确的启动模式(initial 模式)。

3. Binlog 位点问题

Flink CDC 依赖 MySQL 的 Binlog 进行增量同步。如果作业从 Savepoint 启动时,Binlog 位点发生了变化(例如 Binlog 被清理或位点不匹配),可能导致数据丢失或同步失败。

解决方法: - 确保 MySQL 的 Binlog 配置足够长,避免在作业停止期间 Binlog 被清理。 - 在从 Savepoint 启动作业时,检查 Binlog 位点是否与 Savepoint 记录的位点一致。如果不一致,可以手动调整位点。

4. Source 复用优化问题

如果多个表共享同一个 Source 节点(即通过 Source 复用优化减少对数据库的压力),新增表可能导致 Source 节点的配置发生变化,从而引发同步失败。

解决方法: - 确保新增表的配置与原有表的配置完全一致,包括 server-id、过滤规则等。 - 如果新增表的配置不同,建议单独为新增表创建一个 Source 节点,避免影响现有表的同步。


操作步骤总结

  1. 检查 Savepoint 兼容性

    • 确保 Flink CDC 版本与 Savepoint 版本一致。
    • 检查 Savepoint 是否包含所有需要同步的表。
  2. 启用新增表功能

    • 在 SQL 作业中添加以下配置:
      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
      
  3. 停止作业并创建快照

    • 停止作业时,勾选 停止前创建一次快照
  4. 修改 SQL 作业

    • 新增表的同步逻辑,并重新部署作业。
  5. 从快照恢复作业

    • 确保选择 initial 启动模式。
    • 检查 Binlog 位点是否与 Savepoint 记录的位点一致。
  6. 验证 Source 配置

    • 确保新增表的配置与原有表的配置完全一致。
    • 如果配置不同,建议单独为新增表创建 Source 节点。

注意事项

  • 版本兼容性:尽管 Flink CDC 3.0.0 声称解决了新增表的问题,但实际使用中可能存在未修复的边缘情况。建议关注官方更新日志,及时升级到最新版本。
  • Binlog 配置:确保 MySQL 的 Binlog 配置足够长,避免因 Binlog 清理导致同步失败。
  • 权限管理:确保 Flink 作业有足够的权限访问 MySQL 和下游存储系统。

通过以上步骤,您可以有效解决 Flink CDC 作业在新增表后从 Savepoint 启动时出现的数据丢失问题。如果问题仍然存在,建议联系阿里云技术支持团队,提供详细的日志和配置信息以进一步排查问题。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理