开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

怎么尝试使用 Flink CDC 3.0 按照官网案例同步 MySQL 数据库到 Doris?

我正在尝试使用 Flink CDC 3.0 按照官网案例同步 MySQL 数据库到 Doris,使用的是以下命令:bash bin/flink-cdc.sh mysql-to-doris.yaml,有人说对于新增加的表,可以通过创建 Savepoint 后再从 Savepoint 重新启动作业。我的问题是,具体怎样使用 flink-cdc.sh 脚本从 Savepoint 重启这样的 CDC 作业呢?我已经在 Flink 的配置文件 conf/flink-conf.yaml 中设置了 Savepoint 存储路径 state.savepoints.dir: hdfs://xxx/flink/flink-savepoints,并成功为我的作业创建了 Savepoint 同时停止了作业,但我不清楚如何使用这个 Savepoint 来重启作业,有相应的命令或步骤吗?

展开
收起
小小鹿鹿鹿 2024-02-26 17:35:23 107 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在Flink CDC中,从savepoint启动作业的命令与常规Flink作业略有不同。对于使用flink-cdc.sh脚本启动的作业,您可以通过以下步骤从savepoint重新启动作业:

    1. 首先,确保您的savepoint文件已经成功创建并保存在指定的目录中。您可以在state.savepoints.dir参数所指定的目录中找到savepoint文件。
    2. 接下来,您需要将savepoint文件移动到Flink的savepoint目录中,以便Flink能够识别并加载它。假设您的Flink安装目录为/path/to/flink,则可以将savepoint文件移动到/path/to/flink/savepoints目录下。
    3. 然后,您可以使用以下命令从savepoint启动作业:

      ./bin/flink run -s <savepoint_directory> -p <job_parallelism> -c <main_class> -- <job_arguments>
      

      其中,<savepoint_directory>是包含savepoint文件的目录路径,<job_parallelism>是作业的并行度,<main_class>是主类的名称,<job_arguments>是传递给主类的参数。

      例如,如果您的主类名为com.example.MyJob,并且没有额外的参数,则可以使用以下命令从savepoint启动作业:

      ./bin/flink run -s /path/to/flink/savepoints/my_job_id -p 1 -c com.example.MyJob
      

      这将从savepoint文件中恢复作业的状态,并启动作业。

    请注意,上述命令中的-s选项用于指定savepoint目录,而不是savepoint文件的完整路径。Flink会自动在指定的目录中查找最新的savepoint文件。

    通过按照上述步骤执行命令,您应该能够从savepoint重新启动Flink CDC作业。

    2024-02-27 13:13:59
    赞同 展开评论 打赏
  • 在flink-conf里面手动添加下路径,3.1会把这个参数暴露出来。pipeline,目前不能动态加表,这个参数没有暴露出来。此回答来自钉群Flink CDC 社区。

    2024-02-26 18:18:50
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载

    相关镜像