我正在尝试使用 Flink CDC 3.0 按照官网案例同步 MySQL 数据库到 Doris,使用的是以下命令:bash bin/flink-cdc.sh mysql-to-doris.yaml,有人说对于新增加的表,可以通过创建 Savepoint 后再从 Savepoint 重新启动作业。我的问题是,具体怎样使用 flink-cdc.sh 脚本从 Savepoint 重启这样的 CDC 作业呢?我已经在 Flink 的配置文件 conf/flink-conf.yaml 中设置了 Savepoint 存储路径 state.savepoints.dir: hdfs://xxx/flink/flink-savepoints,并成功为我的作业创建了 Savepoint 同时停止了作业,但我不清楚如何使用这个 Savepoint 来重启作业,有相应的命令或步骤吗?
在Flink CDC中,从savepoint启动作业的命令与常规Flink作业略有不同。对于使用flink-cdc.sh脚本启动的作业,您可以通过以下步骤从savepoint重新启动作业:
state.savepoints.dir
参数所指定的目录中找到savepoint文件。/path/to/flink
,则可以将savepoint文件移动到/path/to/flink/savepoints
目录下。然后,您可以使用以下命令从savepoint启动作业:
./bin/flink run -s <savepoint_directory> -p <job_parallelism> -c <main_class> -- <job_arguments>
其中,<savepoint_directory>
是包含savepoint文件的目录路径,<job_parallelism>
是作业的并行度,<main_class>
是主类的名称,<job_arguments>
是传递给主类的参数。
例如,如果您的主类名为com.example.MyJob
,并且没有额外的参数,则可以使用以下命令从savepoint启动作业:
./bin/flink run -s /path/to/flink/savepoints/my_job_id -p 1 -c com.example.MyJob
这将从savepoint文件中恢复作业的状态,并启动作业。
请注意,上述命令中的-s
选项用于指定savepoint目录,而不是savepoint文件的完整路径。Flink会自动在指定的目录中查找最新的savepoint文件。
通过按照上述步骤执行命令,您应该能够从savepoint重新启动Flink CDC作业。
在flink-conf里面手动添加下路径,3.1会把这个参数暴露出来。pipeline,目前不能动态加表,这个参数没有暴露出来。此回答来自钉群Flink CDC 社区。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。