为啥flink cdc 读出来的是这样的?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink CDC 读取数据时,可能会出现一些不符合预期的结果,这通常与配置、数据库特性或Flink CDC的工作机制有关。以下是可能导致这种情况的原因及解决方案:
如果读取的 timestamp
字段存在时区偏差(例如相差8小时),可能是因为CDC作业中配置的 server-time-zone
参数与MySQL服务器的时区不一致。
server-time-zone
参数,使其与MySQL服务器的时区一致。'server-time-zone' = 'Asia/Shanghai'
即使Flink CDC只关注某一张表,MySQL的Binlog仍然会记录整个实例的所有表的变更。这种机制可能导致读取的数据量超出预期。
'scan.incremental.snapshot.enabled' = 'true'
如果使用了 table-name
的正则表达式来匹配表名,但正则表达式中包含逗号(,
),会导致解析失败。这是因为Debezium使用逗号作为分隔符,不支持带逗号的正则表达式。
'table-name' = '(t_process_wi_history_\d{1}|t_process_wi_history_\d{2})'
在从全量读取切换到增量读取时,如果全量阶段耗时过长,可能导致最后一个分片数据量过大,出现OOM(内存溢出)问题。此外,如果Checkpoint间隔时间设置过大,也可能导致增量读取延迟。
'parallelism' = 4
'checkpoint.interval' = '5min'
如果需要捕获数据库中的DDL事件(如表结构变更),但Flink CDC默认不会处理这些事件。
MySqlSource
,并启用 includeSchemaChanges
参数。MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.includeSchemaChanges(true)
.build();
如果目标数据库是分库分表结构,但Flink CDC未正确读取所有分表数据,可能是因为 table-name
配置未正确匹配所有表。
user_
为前缀的表:
'table-name' = 'user_.*'
在非正常情况下(如故障发生时),Flink CDC可能会接收到重复的变更事件,导致下游处理结果异常。
'table.exec.source.cdc-events-duplicate' = 'true'
如果使用的是MongoDB CDC源表,可能会遇到以下情况: - 全量与增量切换:MongoDB CDC会先读取全量数据,然后平滑切换到增量读取(oplog)。如果发生故障,能够保证Exactly Once语义。 - 启动模式:支持多种启动模式(如 initial
、latest-offset
和 timestamp
),可以根据需求选择合适的模式。
'scan.startup.mode' = 'timestamp'
'scan.startup.timestamp-millis' = '1698768000000'
flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar
,建议使用稳定版本(如 2.1.0
)。根据上述分析,Flink CDC读取数据异常的原因可能涉及时区配置、Binlog机制、正则表达式解析、全量与增量切换等多个方面。请根据具体问题场景,参考对应的解决方案进行调整。如果问题仍未解决,请提供更多上下文信息以便进一步排查。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。