开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink中appliction 模式怎么从 checkpoint 保存的地方 启动?

Flink中appliction 模式怎么从 checkpoint 保存的地方 启动?

展开
收起
真的很搞笑 2024-01-09 12:13:12 50 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在Flink中,应用程序模式(Application Mode)的作业可以通过以下步骤从检查点保存的地方启动:

    1. 首先,确保你已经设置了Flink检查点相关的配置。在Flink的配置文件(flink-conf.yaml)中,你需要设置以下参数:

      jobmanager.checkpoints.dir: <checkpoint directory>
      state.backend: filesystem
      

      其中,<checkpoint directory>是用于保存检查点的目录路径。

    2. 在你的Flink应用程序中,你需要使用CheckpointedFunction来标记需要进行检查点操作的函数。这个函数会在每个时间窗口结束后触发一次检查点保存。例如:

      public class MyFunction extends RichMapFunction<String, Integer> implements CheckpointedFunction {
          private ValueState<Integer> countState;
          // ...
          @Override
          public void snapshotState(FunctionSnapshotContext context) throws Exception {
              countState.clear(); // 清除旧的状态数据
              countState.update(value); // 更新状态数据
          }
          // ...
      }
      
    3. 在你的Flink应用程序中,你需要使用StreamExecutionEnvironment来设置检查点策略和重启方式。例如:

      Configuration config = new Configuration();
      config.setString("state.backend", "filesystem");
      config.setString("checkpointing.interval", "10000"); // 设置检查点间隔为10秒
      config.setString("savepoints.path", "/path/to/savepoints"); // 设置保存点路径
      config.setBoolean("failover-strategy.restart-job", true); // 设置重启作业的策略为失败时重启
      config.setBoolean("failover-strategy.ignore-checkpoints", false); // 设置忽略检查点的策略为不忽略
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setConfig(config);
      env.enableCheckpointing(60000); // 启用检查点,并设置最小间隔为60秒
      env.addSource(new MySourceFunction()); // 添加数据源函数
      env.map(new MyFunction()); // 应用映射函数
      env.print(); // 打印结果
      env.execute("My Flink Job"); // 执行作业
      
    4. 当你需要从检查点保存的地方启动作业时,你可以使用Savepoint类来加载检查点信息。例如:
      ```java
      Configuration config = new Configuration();
      config.setString("state.backend", "filesystem");
      config.setString("checkpoints.dir", "/path/to/checkpoints"); // 设置检查点目录路径
      config.setBoolean("failover-strategy.restart-job", true); // 设置重启作业的策略为失败时重启
      config.setBoolean("failover-strategy.ignore-checkpoints", false); // 设置忽略检查点的策略为不忽略

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setConfig(config);
      env.enableCheckpointing(60000); // 启用检查点,并设置最小间隔为60秒
      env.addSource(new MySourceFunction()); // 添加数据源函数
      env.map(new MyFunction()); // 应用映射函数
      env.print(); // 打印结果

      // 加载最近的检查点信息并恢复作业状态
      long latestCheckpointId = getLatestCheckpointId(); // 获取最新的检查点ID,可以根据实际情况实现该逻辑
      List savepoints = env.getCheckpointData().getAllSavepoints(); // 获取所有保存点的信息
      for (Savepoint savepoint : savepoints) {

      if (savepoint.getCheckpointId() == latestCheckpointId) {
          try {
              env.restoreSavepoint(savepoint); // 恢复最近的检查点状态
              break; // 如果成功恢复,则退出循环
          } catch (Exception e) {
              e.printStackTrace(); // 如果恢复失败,则打印异常信息并继续尝试其他保存点或重新执行作业
          }
      } else {
          // 如果当前保存点的ID小于最新的检查点ID,则说明已经找到最近的检查点,无需继续尝试其他保存点或重新执行作业。可以在这里进行相应的处理。
          break;
      }
      

      }
      env.execute("My Flink Job from Checkpoint"); // 执行从检查点恢复的作业

    2024-01-09 14:33:47
    赞同 展开评论 打赏
  • 检查点是自动触发的,保存点是手动触发 ,此回答整理自钉群“【③群】Apache Flink China社区”

    2024-01-09 13:12:16
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载