flink cdc 采集oracle 间歇性丢数据,日志里也没报错的情况,使用 3.0 的 pipline 从 MySQL 向 Doris 同步数据的问题
flink 上跑的任务中断过,所以改用了 timestamp 的模式,然后过程中间,对表结构有过变化,数据复制链路就中断了;
报错提示:Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema.
Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to TIMESTAMP
然后尝试了其他的集中启动模式,都相同。
那是不是只能采用默认的 initial 从头进行数据同步和表结构映射呢?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您描述的问题,Flink CDC 在采集 Oracle 数据时出现间歇性丢数据的情况,同时在使用 Flink 3.0 的 Pipeline 从 MySQL 向 Doris 同步数据时,由于表结构变更导致数据复制链路中断,并报错 SchemaOutOfSyncException
。以下是针对问题的详细分析和解决方案。
fetchSize
或 pollInterval
参数,导致数据捕获不完整。DEBUG
或 TRACE
,导致潜在问题未被记录。检查网络连通性
优化 CDC 源端配置
fetchSize
和 pollInterval
参数,确保数据捕获的效率和稳定性。例如:
source:
type: oracle-cdc
fetchSize: 1000
pollInterval: 500ms
提升日志级别
DEBUG
或 TRACE
,以便捕获更详细的运行信息。具体操作如下:
log4j.properties
文件中添加以下配置:logger.flink-cdc.name = com.ververica.cdc
logger.flink-cdc.level = DEBUG
报错信息 SchemaOutOfSyncException
表明 Flink CDC 的内部 Schema 表示与实际数据库 Schema 不一致。这通常发生在以下场景: - 启动模式为 TIMESTAMP:当表结构在指定的时间戳之后发生变更时,Flink CDC 无法动态同步这些变更。 - 启动模式为其他模式(如 LATEST_OFFSET):同样不支持动态表结构变更。
采用 INITIAL 模式重新同步
initial
模式从头开始同步数据。此模式会重新读取全量数据并同步最新的表结构。source:
type: mysql-cdc
startupMode: initial
手动处理表结构变更
启用动态 Schema 支持
source:
type: mysql-cdc
enableDynamicSchema: true
initial
模式进行全量同步。timestamp
或 latest-offset
模式,并在变更后手动处理。finish split response timeout
异常,可能是 TaskManager 的 CPU 使用率过高导致。建议增加 TaskManager 的 CPU 和内存资源。batchSize
和 flushIntervalMs
参数以提高写入性能。例如:
sink:
type: doris
batchSize: 500
flushIntervalMs: 2000
针对您的问题,建议采取以下措施: 1. 对于 Oracle 数据采集丢数据问题,优化网络连通性和 CDC 源端配置,并提升日志级别以捕获更多细节。 2. 对于 MySQL 向 Doris 同步数据时表结构变更导致的链路中断问题,优先考虑使用 initial
模式重新同步数据,或手动处理表结构变更。 3. 根据实际需求选择合适的启动模式,并注意性能优化和监控告警配置。
希望以上解答能够帮助您解决问题!如有进一步疑问,请随时联系技术支持团队。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。