把 Flink CDC的数据 经过Doris 连接器写入Doris ,同时从checkpoint断?

把 Flink CDC的数据 经过Doris 连接器写入Doris ,同时从checkpoint断开的位置恢复。 现在遇到的是从checkpoint恢复时,还是从头开始拉取的数据。 有大佬知道咋处理吗?

展开
收起
真的很搞笑 2023-06-11 14:41:46 224 分享 版权
阿里云 AI 助理回答

根据您描述的问题,Flink CDC 数据通过 Doris 连接器写入 Doris 时,从 checkpoint 恢复时仍然从头开始拉取数据,而不是从断点位置继续。这可能是由于配置或实现细节未正确设置导致的。以下是详细的分析和解决方案:


1. 确保 Flink Checkpoint 配置正确

Flink 的 checkpoint 是保证作业状态恢复的核心机制。如果 checkpoint 配置不正确,可能会导致无法从断点恢复。

  • 检查 Checkpoint 配置

    • 确保启用了 checkpoint,并设置了合理的间隔时间。例如:
    execution.checkpointing.interval=10s
    
    • 确保 checkpoint 存储路径(如 HDFS、OSS)可用且配置正确。
    • 如果使用的是阿里云实时计算 Flink 版,确保在控制台中启用了 checkpoint 功能。
  • 验证 Checkpoint 是否成功保存

    • 在 Flink Web UI 中查看最近一次成功的 checkpoint 位点。
    • 如果 checkpoint 未成功保存,可能是因为存储路径不可用或 checkpoint 超时。

2. 验证 Doris Connector 的配置

Doris Connector 的配置需要支持从 checkpoint 恢复的能力。以下是一些关键配置项:

  • 启用两阶段提交 (2PC)

    • Doris Connector 默认支持两阶段提交(sink.enable-2pc=true),以保证 Exactly-Once 语义。确保该参数已启用。
    • 如果关闭了 2PC(sink.enable-2pc=false),可能会导致数据重复或无法从 checkpoint 恢复。
  • 批处理模式与 EOS 语义

    • 如果启用了批处理模式(sink.enable.batch-mode=true),则不再保证 Exactly-Once 语义。建议关闭批处理模式,以确保从 checkpoint 恢复时数据一致性。
  • 缓存与重试机制

    • 配置 sink.use-cache=true,可以在异常情况下使用内存缓存进行恢复。
    • 设置合理的重试次数(sink.max-retries),以避免因短暂错误导致的数据丢失。

3. 确保 CDC Source 支持从 Checkpoint 恢复

Flink CDC Source 的配置也会影响从 checkpoint 恢复的行为。以下是针对不同数据库的 CDC Source 配置建议:

  • MySQL CDC

    • 确保 scan.startup.mode 参数设置为 latest-offsettimestamp,而不是 initial。如果设置为 initial,每次重启都会从头开始读取全量数据。
    • 示例配置:
    'scan.startup.mode' = 'latest-offset'
    
  • PostgreSQL CDC

    • 确保 slot.namedecoding.plugin.name 配置正确,并且 PostgreSQL 的逻辑复制槽(Logical Replication Slot)未被删除。
    • 如果发现 WAL 日志未释放,检查是否启用了 checkpoint 并清理未使用的 slot。
  • MongoDB CDC

    • MongoDB CDC 支持从 checkpoint 恢复,原理是通过 Change Stream 的 Resume Token 记录读取位置。如果 Resume Token 对应的记录在 oplog.rs 中不存在,可能会导致从头开始读取。
    • 建议调整 oplog.rs 的大小,确保其保留时间足够长。

4. 检查 Flink 作业的启动方式

Flink 作业的启动方式也会影响从 checkpoint 恢复的行为:

  • 从 Savepoint 或 Checkpoint 启动作业

    • 使用以下命令从指定的 checkpoint 或 savepoint 启动作业:
    <FLINK_HOME>/bin/flink run \
      -s <checkpoint-path> \
      -c org.apache.doris.flink.tools.cdc.CdcTools \
      lib/flink-doris-connector-1.16-1.5.2.jar \
      ...
    
    • 确保指定的 checkpoint 路径有效。
  • 避免重复提交作业

    • 如果多次提交相同的作业,可能会导致多个实例同时运行,从而引发数据重复或从头读取的问题。

5. 其他注意事项

  • 版本兼容性

    • 确保使用的 Flink 版本和 Doris Connector 版本兼容。例如,Flink 1.15 及以上版本支持整库同步。
    • 如果使用的是开源版 Flink,确保下载了对应版本的 JAR 包并放置在 $FLINK_HOME/lib 目录下。
  • 日志排查

    • 查看 Flink 作业的日志,确认是否有异常信息(如 IllegalArgumentException: timeout value is negative)。如果有类似问题,参考知识库中的解决方案。

总结

通过上述步骤,您可以解决 Flink CDC 数据经过 Doris 连接器写入 Doris 时从 checkpoint 恢复失败的问题。重点在于: 1. 确保 Flink 的 checkpoint 配置正确。 2. 验证 Doris Connector 和 CDC Source 的配置支持从 checkpoint 恢复。 3. 检查作业启动方式和版本兼容性。

如果问题仍未解决,请提供更详细的日志信息以便进一步排查。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

大数据领域前沿技术分享与交流,这里不止有技术干货、学习心得、企业实践、社区活动,还有未来。

还有其他疑问?
咨询AI助理