flink cdc作业同步mysql一个数据库下面的多个表,运行一段时间后,停止作业新增一张表,从savepoint启动,所有表都doump了,看issue找到相同的问题,https://github.com/ververica/flink-cdc-connectors/pull/2096,flinkcdc 3.0.0按道理应该解决了,但是使用中还是有这个问题?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
您提到在使用 Flink CDC 同步 MySQL 数据库中的多个表时,停止作业并新增一张表后,从 Savepoint 启动作业时出现了所有表数据丢失的问题。尽管根据社区的 Issue 和 PR(如 flink-cdc-connectors#2096),Flink CDC 3.0.0 应该已经解决了这个问题,但在实际使用中仍然遇到了类似问题。
以下是针对该问题的详细分析和解决方案。
Flink CDC 在处理 Savepoint 时,可能会因为版本升级或配置变更导致 Savepoint 不兼容。如果 Savepoint 中记录的元数据与当前作业的表结构不一致,可能会导致数据丢失或同步失败。
解决方法: - 确保 Flink CDC 版本与 Savepoint 的版本完全兼容。如果升级了 Flink CDC 版本,建议重新生成 Savepoint。 - 在启动作业时,检查 Savepoint 的元数据是否包含所有需要同步的表。如果新增表未被正确记录,可能导致同步失败。
根据知识库资料,Flink CDC 在新增表时需要满足以下条件才能正确捕获: - 启用新增表功能:需要在 SQL 作业中显式开启 scan.newly-added-table.enabled
参数。 - 启动模式限制:新增表功能仅支持 initial
启动模式。如果作业以其他模式启动,新增表可能无法被捕获。
解决方法: 1. 在 SQL 作业中添加以下配置,确保启用了新增表功能:
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
initial
模式)。Flink CDC 依赖 MySQL 的 Binlog 进行增量同步。如果作业从 Savepoint 启动时,Binlog 位点发生了变化(例如 Binlog 被清理或位点不匹配),可能导致数据丢失或同步失败。
解决方法: - 确保 MySQL 的 Binlog 配置足够长,避免在作业停止期间 Binlog 被清理。 - 在从 Savepoint 启动作业时,检查 Binlog 位点是否与 Savepoint 记录的位点一致。如果不一致,可以手动调整位点。
如果多个表共享同一个 Source 节点(即通过 Source 复用优化减少对数据库的压力),新增表可能导致 Source 节点的配置发生变化,从而引发同步失败。
解决方法: - 确保新增表的配置与原有表的配置完全一致,包括 server-id
、过滤规则等。 - 如果新增表的配置不同,建议单独为新增表创建一个 Source 节点,避免影响现有表的同步。
检查 Savepoint 兼容性:
启用新增表功能:
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
停止作业并创建快照:
修改 SQL 作业:
从快照恢复作业:
initial
启动模式。验证 Source 配置:
通过以上步骤,您可以有效解决 Flink CDC 作业在新增表后从 Savepoint 启动时出现的数据丢失问题。如果问题仍然存在,建议联系阿里云技术支持团队,提供详细的日志和配置信息以进一步排查问题。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。