把 Flink CDC的数据 经过Doris 连接器写入Doris ,同时从checkpoint断开的位置恢复。 现在遇到的是从checkpoint恢复时,还是从头开始拉取的数据。 有大佬知道咋处理吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您描述的问题,Flink CDC 数据通过 Doris 连接器写入 Doris 时,从 checkpoint 恢复时仍然从头开始拉取数据,而不是从断点位置继续。这可能是由于配置或实现细节未正确设置导致的。以下是详细的分析和解决方案:
Flink 的 checkpoint 是保证作业状态恢复的核心机制。如果 checkpoint 配置不正确,可能会导致无法从断点恢复。
检查 Checkpoint 配置:
execution.checkpointing.interval=10s
验证 Checkpoint 是否成功保存:
Doris Connector 的配置需要支持从 checkpoint 恢复的能力。以下是一些关键配置项:
启用两阶段提交 (2PC):
sink.enable-2pc=true),以保证 Exactly-Once 语义。确保该参数已启用。sink.enable-2pc=false),可能会导致数据重复或无法从 checkpoint 恢复。批处理模式与 EOS 语义:
sink.enable.batch-mode=true),则不再保证 Exactly-Once 语义。建议关闭批处理模式,以确保从 checkpoint 恢复时数据一致性。缓存与重试机制:
sink.use-cache=true,可以在异常情况下使用内存缓存进行恢复。sink.max-retries),以避免因短暂错误导致的数据丢失。Flink CDC Source 的配置也会影响从 checkpoint 恢复的行为。以下是针对不同数据库的 CDC Source 配置建议:
MySQL CDC:
scan.startup.mode 参数设置为 latest-offset 或 timestamp,而不是 initial。如果设置为 initial,每次重启都会从头开始读取全量数据。'scan.startup.mode' = 'latest-offset'
PostgreSQL CDC:
slot.name 和 decoding.plugin.name 配置正确,并且 PostgreSQL 的逻辑复制槽(Logical Replication Slot)未被删除。MongoDB CDC:
Flink 作业的启动方式也会影响从 checkpoint 恢复的行为:
从 Savepoint 或 Checkpoint 启动作业:
<FLINK_HOME>/bin/flink run \
-s <checkpoint-path> \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
...
避免重复提交作业:
版本兼容性:
$FLINK_HOME/lib 目录下。日志排查:
IllegalArgumentException: timeout value is negative)。如果有类似问题,参考知识库中的解决方案。通过上述步骤,您可以解决 Flink CDC 数据经过 Doris 连接器写入 Doris 时从 checkpoint 恢复失败的问题。重点在于: 1. 确保 Flink 的 checkpoint 配置正确。 2. 验证 Doris Connector 和 CDC Source 的配置支持从 checkpoint 恢复。 3. 检查作业启动方式和版本兼容性。
如果问题仍未解决,请提供更详细的日志信息以便进一步排查。