Flink CDC断点续传 的例子哪儿能找到,网上找了一些都不好用?我想在程序中使用flinkcdc savepoint存在本地
你可以在Flink的官方文档中找到关于Flink CDC断点续传的例子。以下是一个简单的例子:
首先,你需要在你的Flink程序中添加Flink CDC连接器。然后,你可以使用CheckpointConfig
来配置检查点。在CheckpointConfig
中,你可以设置检查点的间隔时间,以及保存检查点的位置。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // 设置检查点间隔时间为60秒
// 添加Flink CDC连接器
FlinkCDCSource<String> source = new FlinkCDCSource<>(...); // 这里需要填写你的数据库连接信息
DataStream<String> stream = env.addSource(source);
// 对数据流进行处理
DataStream<String> processedStream = stream.map(...); // 这里可以填写你的数据处理逻辑
// 设置检查点的配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(60000); // 设置检查点间隔时间为60秒
checkpointConfig.setMinPauseBetweenCheckpoints(10000); // 设置两次检查点之间的最小暂停时间为10秒
checkpointConfig.setCheckpointTimeout(60000); // 设置检查点超时时间为60秒
checkpointConfig.setMaxConcurrentCheckpoints(1); // 设置最大并发检查点数为1
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 设置外部化检查点清理策略为保留在取消时
checkpointConfig.setPreferCheckpointForRecovery(true); // 设置优先恢复检查点
checkpointConfig.setSavepointRetentionPolicy(SavepointRetentionPolicy.REtainOnCancellation); // 设置保存点保留策略为保留在取消时
// 将处理后的数据流写入到目标系统中
processedStream.addSink(...); // 这里需要填写你的目标系统连接信息
// 启动Flink程序
env.execute("Flink CDC Example");
在这个例子中,Flink CDC连接器会定期地从源数据库中抓取数据,并将这些数据保存到Flink的程序中。当Flink程序出现问题时,你可以使用保存的检查点来恢复程序的状态。
Flink CDC支持使用Savepoint进行断点续传,这允许你从上次执行的中断点恢复作业。以下是一个简单的例子来说明如何创建和使用Savepoint:
创建Savepoint:
./bin/flink savepoint <job-id> hdfs:///flink/savepoints/
停止作业:
从Savepoint恢复作业:
./bin/flink run -s hdfs:///flink/savepoints/<savepoint-name> examples/streaming/WordCount.jar
配置作业以自动从Savepoint恢复:
flink-conf.yaml
配置文件中设置以下参数,以便在作业失败时自动从最近的Savepoint恢复:jobmanager.execution.failover-strategy: "region"
execution.checkpointing.interval: 5000
execution.savepoint-dir: hdfs:///flink/savepoints/
flink-cdc最多目前2.x版本,你指的是flink版本.首先不推荐和spring任何框架结合使用,其次需要确定下日志到底有没有走savepoint,此回答整理自钉群“Flink CDC 社区”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。