flink-cdc sql-client提交 可以支持 checkpoint吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
是的,Flink CDC SQL-Client 提交作业时可以支持 Checkpoint。Checkpoint 是 Flink 保证 Exactly-Once 语义的重要机制,尤其在使用 Flink CDC 进行数据同步或处理时,Checkpoint 的配置和使用至关重要。
以下是关于 Flink CDC 支持 Checkpoint 的详细说明:
Checkpoint 在 Flink 中用于定期保存作业的状态,以便在发生故障时能够从最近的 Checkpoint 恢复。对于 Flink CDC 来说,Checkpoint 的主要作用包括: - 记录消费位点:例如 MongoDB CDC 使用 Change Stream 的 Resume Token 或 Kafka 源表记录分区消费位点。 - 保证数据一致性:通过 Checkpoint 确保数据处理的 Exactly-Once 语义。 - 支持作业恢复:当作业因故障重启时,可以从 Checkpoint 中恢复状态,避免数据丢失或重复处理。
在使用 Flink SQL-Client 提交作业时,可以通过以下方式配置 Checkpoint:
需要在作业配置中启用 Checkpoint,并设置相关参数。例如:
SET execution.checkpointing.interval = '10min'; -- 设置 Checkpoint 时间间隔
SET execution.checkpointing.mode = 'EXACTLY_ONCE'; -- 设置 Checkpoint 模式为 Exactly-Once
SET execution.checkpointing.tolerable-failed-checkpoints = '3'; -- 容忍 Checkpoint 失败的次数
Flink 需要一个可靠的存储系统来保存 Checkpoint 数据。常见的存储方式包括: - 文件系统:如 HDFS、OSS 等。 - 远程 Shuffle 服务:在批作业中,可以使用远程 Shuffle 服务(如 Apache Celeborn)来存储 Shuffle 数据。
配置示例:
SET state.backend = 'filesystem'; -- 使用文件系统作为状态后端
SET state.checkpoints.dir = 'oss://your-bucket/checkpoints/'; -- 指定 Checkpoint 存储路径
如果状态较大,可以启用增量 Checkpoint 以减少存储开销:
SET state.backend.incremental = 'true'; -- 启用增量 Checkpoint
不同的 Flink CDC 连接器对 Checkpoint 的依赖和支持有所不同:
MongoDB CDC 使用 Change Stream 的 Resume Token 记录消费位点,并通过 Checkpoint 保存该 Token。如果 Checkpoint 中的 Resume Token 对应的记录在 oplog.rs
中不存在,则可能会出现无效 Token 的异常。因此,建议: - 调整 oplog.rs
的大小:确保 oplog.rs
的保留时间足够长,避免 Token 过期。 - 启用 Checkpoint:MongoDB CDC 必须开启 Checkpoint 才能支持从故障中恢复。
Postgres CDC 在开启增量快照功能时,必须启用 Checkpoint,并且 Source 表必须声明主键。此外,全量扫描阶段不支持 Checkpoint,因此需要合理配置 Checkpoint 参数以避免超时问题。
Kafka 源表通过 Checkpoint 记录分区消费位点,并在作业恢复时从 Checkpoint 中读取位点。如果未启用 Checkpoint,则无法保证消费位点的持久化。
以下是一个完整的 SQL-Client 配置示例,展示如何启用 Checkpoint 并提交作业:
-- 启用 Checkpoint
SET execution.checkpointing.interval = '5min';
SET execution.checkpointing.mode = 'EXACTLY_ONCE';
SET execution.checkpointing.tolerable-failed-checkpoints = '5';
-- 配置状态后端
SET state.backend = 'filesystem';
SET state.checkpoints.dir = 'oss://your-bucket/checkpoints/';
-- 提交作业
CREATE TABLE source_table (...) WITH (...);
CREATE TABLE sink_table (...) WITH (...);
INSERT INTO sink_table SELECT * FROM source_table;
通过以上配置,您可以确保 Flink CDC SQL-Client 提交的作业支持 Checkpoint,并能够在故障恢复时正确地从 Checkpoint 中恢复状态。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。