开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink-sql 可以配置退出时不清理 checkpoint 嘛

然后重跑 sql 时,指定checkpoint 进行恢复

展开
收起
游客6vdkhpqtie2h2 2022-09-29 10:35:27 1382 0
16 条回答
写回答
取消 提交回答
  • 在阿里云实时计算 Flink 中,flink-sql 作业默认会在退出时清理 checkpoint。如果希望在退出时不清理 checkpoint,可以通过在 SQL 语句中设置对应的参数来实现。

    具体来说,可以通过在 SQL 语句中使用 SET 语句来设置 ExecutionCheckpointingOptions.DISABLE_EXTERNALIZED_CHECKPOINTS 参数,该参数的值为 true,表示在作业退出时不清理 checkpoint。例如:

    SET ExecutionCheckpointingOptions.DISABLE_EXTERNALIZED_CHECKPOINTS='true';
    
    -- 这里写具体的查询语句
    

    在上述代码中,通过 SET 语句设置 DISABLE_EXTERNALIZED_CHECKPOINTS 参数的值为 true,表示在作业退出时不清理 checkpoint。然后在语句中写入具体的查询逻辑。

    设置该参数可能会影响作业的恢复能力和资源占用情况,建议根据具体情况进行权衡和测试。

    2023-05-07 23:04:32
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    可以通过在 Flink SQL 配置文件中设置 checkpoint 模式来实现不清理 checkpoint。例如,可以在 flink-conf.yaml 文件中添加以下配置:

    state.backend: rocksdb
    state.backend.checkpointing.checkpoints-in-externalized-directory: true
    state.savepoints.dir: hdfs://localhost:9000/flink/checkpoints
    

    通过这个配置,Flink 将 checkpoint 存储在外部目录,并且在退出时不会清理 checkpoint。这意味着当您重启程序时,Flink 会检查 checkpoint 是否存在并使用其进行恢复。

    要实现在重新运行 SQL 时指定 checkpoint 进行恢复,则可以通过以下步骤进行操作:

    1. 在 flink-conf.yaml 文件中配置 checkpoint 存储位置,如上所述。
    2. 在 SQL 中添加如下语句:
    SET execution.checkpointing.externalized-checkpoint-retention.enabled = "true";
    

    这将启用外部化 checkpoint 保留,并且 checkpoint 将存储在您上面配置的目录中。然后,您可以在重新运行 SQL 时使用以下命令来恢复 checkpoint:

    RESTORE [FULL|INCREMENTAL] FROM 'hdfs://localhost:9000/flink/checkpoints';
    

    将路径更改为您配置的目录并使用适当的恢复类型(FULL 或 INCREMENTAL)。这样,Flink 将使用指定的 checkpoint 进行恢复,而不是从头开始重新计算。

    2023-05-05 20:19:09
    赞同 展开评论 打赏
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    答案是肯定的。如果我们这样配置当flink-sql退出的时候,checkpoint目录是不会被删除,这个你需要注意。

    2023-05-03 18:16:37
    赞同 展开评论 打赏
  • Flink SQL 支持在退出应用程序时不清理 Checkpoint,以便在重启应用程序时可以使用 Checkpoint 进行恢复。

    具体来说,您可以在 Flink SQL 中配置以下两个参数:

    • execution.checkpointing.enabled: 此参数用于启用 Flink SQL 应用程序的 Checkpoint 功能,其默认值为 true。

    • state.checkpoints.dir: 此参数用于指定将状态数据持久化到哪个目录下。例如,您可以将其设置为 file:///opt/flink/checkpoints。如果您希望 Flink SQL 应用程序退出时不清理 Checkpoint,请确保在重启应用程序时使用相同的 Checkpoint 目录位置。

    如果您希望在使用上述配置的情况下重新启动 Flink SQL 应用程序时恢复 Checkpoint,请按照以下步骤操作:

    1. 启动 Flink 集群,并将 Flink SQL 应用程序提交到集群中运行。

    2. 在 Flink WebUI 或者命令行中查看 Flink SQL 应用程序的 JobID。例如,您可以使用命令 flink list -a 在命令行中查看所有正在运行的作业。

    3. 在 Flink WebUI 或者命令行中执行如下命令,以重新启动 Flink SQL 应用程序并恢复 Checkpoint:

    flink run -s <checkpoint_path> -d <path_to_flink_sql_jar> <job_id>
    

    其中,<checkpoint_path> 是上一次 Flink SQL 应用程序退出时保存 Checkpoint 的位置,<path_to_flink_sql_jar> 是 Flink SQL 应用程序的路径,<job_id> 是 Flink SQL 应用程序的 JobID。该命令将重新启动 Flink SQL 应用程序,并从指定的 Checkpoint 恢复状态。

    需要注意的是,在重启应用程序时,Flink 将会检查 Checkpoint 是否存在,如果存在将会使用它在上一次的状态来恢复应用程序,否则将会从头开始处理数据。另外,如果在应用程序运行期间没有生成任何 Checkpoint,那么无法使用 Checkpoint 恢复应用程序。

    2023-05-02 07:46:42
    赞同 展开评论 打赏
  • 是的,flink-sql 可以配置退出时不清理 checkpoint。可以通过设置 Flink 配置参数 retention.retention-type 和 retention.retention-list 参数来控制 checkpoint 的清理。

    • retention.retention-type: 设置 checkpoint 的保留策略类型,默认值为 "delete",即删除策略。可设置为 "externalized",表示将 checkpoint 留在外部存储系统中,不会被清理掉。
    • retention.retention-list: 设置保留的 checkpoint 目录列表,如果 retention-type 设置为 "delete",则列表中的 checkpoint 目录不会被删除。

    例如,可以在 Flink 配置文件中添加以下配置:

    # 设置 checkpoint 保留类型为 "externalized",即外部化
    flink.checkpoint.retain-externalized-checkpoints: true
    # 设置保留的 checkpoint 目录列表
    flink.checkpoint.externalized-checkpoint-retention.enabled: true
    flink.checkpoint.externalized-checkpoint-retention.retention-list: '&lt;checkpoint_directory&gt;/*'
    

    这样配置后,当 flink-sql 退出时,checkpoint 目录将不会被删除。注意,外部化的 checkpoint 需要手动清理。

    2023-04-27 22:51:58
    赞同 展开评论 打赏
  • 从事java行业9年至今,热爱技术,热爱以博文记录日常工作,csdn博主,座右铭是:让技术不再枯燥,让每一位技术人爱上技术

    Checkpoint作业运行底层自动进行,默认3min一次Checkpoint,一次Checkpoint超时10min,Checkpoint完成时,在用户给定的外部持久化 存储保存;当作业FAILED(或者 CANCELED)时,外部 存储的Checkpoint会保留下来。Checkpoint保存在哪个文件下具体可以参考查看历史作业实例日志。

    2023-04-26 20:42:54
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    可以通过设置 Flink 的 CheckpointConfig 来控制退出时是否清理 checkpoint。具体来说,可以设置 CheckpointConfig 的属性 externalizedCheckpointsEnabled 为 true,表示将 checkpoint 存储到外部存储系统(如 HDFS)中,从而在 Flink 退出时不会删除 checkpoint 数据。

    在重跑 SQL 时,可以通过指定 checkpoint 来进行恢复。具体来说,可以使用 Flink SQL 的 SET 命令来设置 checkpoint 相关的参数,如:

    SET execution.checkpointing.mode = 'EXACTLY_ONCE'; SET execution.checkpointing.interval = '10s'; SET execution.checkpointing.externalized-checkpoint-retention = 'RETAIN_ON_CANCELLATION'; SET execution.checkpointing.externalized-checkpoint-enabled = 'true'; SET execution.checkpointing.externalized-checkpoint-retain-externalized = 'true'; SET execution.checkpointing.externalized-checkpoint-path = 'hdfs:///flink/checkpoints'; 其中,execution.checkpointing.externalized-checkpoint-enabled 和 execution.checkpointing.externalized-checkpoint-path 分别表示是否启用外部化 checkpoint 和 checkpoint 存储路径。在重跑 SQL 时,可以通过设置 execution.checkpointing.externalized-checkpoint-path 来指定要恢复的 checkpoint 路径。

    2023-04-26 12:31:37
    赞同 展开评论 打赏
  • 要配置 Flink SQL 任务在退出时不清理检查点,您需要在提交任务时设置 execution.checkpointing.externalized-checkpoint-retention 参数为 RETAIN_ON_CANCELLATION

    2023-04-25 11:26:16
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    可以。Flink SQL 可以通过设置配置参数 state.ttl.cleanup-include-deltas 的值来控制 Flink 退出时是否清理 Checkpoint。

    具体来说,如果将 state.ttl.cleanup-include-deltas 设置为 false,则 Flink 在退出时不会清理 Delta Checkpoint,这通常用于在调试或测试时保留 Checkpoint 数据以便进行后续分析。

    例如,在 SQL CLI 中可以使用如下命令来设置该参数的值:

    SET state.ttl.cleanup-include-deltas = false;

    需要注意的是,虽然在某些情况下保留 Checkpoint 数据可能会有一定的帮助,但同时也会增加系统存储和管理成本,因此需要谨慎使用。

    2023-04-25 10:29:26
    赞同 展开评论 打赏
  • Flink SQL 是 Apache Flink 的一个组件,它提供了基于 SQL 的查询和分析的功能。在使用 Flink SQL 时,常常会遇到需要从上一次故障中恢复应用程序状态的情况,即所谓的“容错”(fault-tolerance)。

    Flink SQL 将状态保存在 Checkpoint 中,并在应用程序故障时自动将其恢复。Checkpoint 包括所有流数据、操作符状态以及元数据信息。此外,Flink 还根据 Checkpoint 自动重新启动失败的任务,并使得整个作业达到系统级别的完全容错性。

    默认情况下,在应用程序正常退出时,Flink 会删除已完成 Checkpoint 和 Savepoint。不过,可以通过设置几个配置项来实现您所需的行为:

    • state.checkpoints.cleanup.enable: 是否在应用程序正常终止时清理 Checkpoint 默认是 true,如果要保留 checkpoint 可以设置为 false。
    • state.savepoints.dir: 存储路径,默认跟 TaskManager 主机的本地文件系统有关,但也可以是 HDFS 或其他支持的文件系统
    • state.checkpoints.dir: 所有 checkpoint 路径

    在 Flink SQL 应用程序中,我们可以通过以下方式来指定应该加载哪个 Checkpoint 来进行状态恢复:

    SET senv.restart-strategy.type=fixed-delay 
    SET senv.restart-strategy.attempts=3 
    SET senv.restart-strategy.delay=10s 
    
    SET senv.execution.checkpointing.mode=EXACTLY_ONCE 
    SET senv.execution.checkpointing.interval=1min 
    
    SET 
    senv.execution.checkpointing.max-concurrent-checkpoints=2 
    
    SET senv.execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION 
    # 可以定义使用的 checkpoint 的 ID,用于作业重新启动和状态恢复
    Set jobmanager.execution.failover-strategy: region,priority 
    

    其中 externalized-checkpoint-retention 属性可以设置,在取消任务时是否要保留已完成 Checkpoint。将其设置为 RETAIN_ON_CANCELLATION 即可保留。

    在重新提交作业时,可以通过指定一个 Savepoint 或者 Checkpoint 来实现带有特定状态的应用程序的重启。由于如果需要按特定顺序执行几个重启步骤,因此您必须手动维护并记录每个 Checkpoint id 或 Savepoint id。

    然后在 sql 语句中指定:

    SET senv.execution.checkpointing.mode = AT_LEAST_ONCE;
    SET senv.execution.checkpointing.interval='500ms';
    SET restart_strategy.type=fixed-delay;
    SET restart_strategy.restart-timeout='10 minute';
    
    SELECT ...
    
    FROM …
    
    WHERE ...
    
    WITH (
     'connector.type' = 'jdbc',
     'url'= ... ,
     'table-name'= ... ,
     'buffer-size'=...,
     'parallelism'=...
    )
    

    注意:上面代码中最后一行即InsertSql 中除了字段内容外,还可以插入我们具体想设定的参数,如buffer size、parallelism 等等从而达到我们想要的效果。

    以上是关于 Flink SQL 如何通过指定 Checkpoint 进行状态恢复的详细介绍,希望对您有所帮助。

    2023-04-24 18:47:48
    赞同 展开评论 打赏
  • Flink SQL是建立在Flink DataStream API之上的,所以重跑一条Flink SQL语句时可以采用Flink DataStream API中的checkpoint续跑功能。具体步骤如下: 1. 首先,在第一次运行Flink SQL作业时,需要配置checkpoint,并指定checkpoint的存储位置。例如:

    SET 'execution.checkpointing.interval' = '5min';
    SET 'execution.checkpointing.mode' = 'exactly-once';
    SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
    SET 'execution.checkpointing.externalize-checkpoints' = 'true'; 
    SET 'execution.checkpointing.checkpoint-storage' = '/checkpoint/path'; 
    

    这会将checkpoint数据持久化存储在指定路径/checkpoint/path下。 2. 取消作业时,不清理checkpoint数据。这可以通过上一步的RETAIN_ON_CANCELLATION配置实现。 3. 在重新执行同一条Flink SQL语句时,指定初始checkpoint来恢复状态。例如:

    SET 'execution.checkpointing.resume-from-latest-savepoint' = 'true';
    SET 'execution.checkpointing.restore-from-checkpoint' = '/checkpoint/path/chk-point_pid=...'  
    

    其中,/checkpoint/path/chk-point_pid=...指定要恢复的checkpoint文件路径。 4. Flink将从指定的checkpoint中恢复状态,并从checkpoint记录的位置继续进行SQL的执行和计算。 这种方式允许你在取消Flink SQL作业后,通过指定checkpoint进行重跑和状态恢复。但是,需要注意: 1. 作业取消前必须已启用checkpoint且配置为在取消时保留(RETAIN_ON_CANCELLATION)。 2. 重跑作业的环境必须与初始作业相同,特别是并行度设置需要一致,否则无法恢复状态。 3. 若环境中原始数据已发生变更,则从checkpoint恢复至新数据位置之间的数据可能重复或丢失。这需要业务保证幂等或可以容忍。 4. 重跑过程中的遇到的故障,将导致无法继续恢复,需要重新指定checkpoint重跑。 所以,这种checkpoint续跑机制对于一定期间内的作业取消重跑还是比较有用的,但环境和数据的变化可能导致的影响也需要在设计和应用中考虑。

    2023-04-24 17:18:33
    赞同 展开评论 打赏
  • 是可以的。 在 Flink SQL 中,可以通过设置 ExecutionCheckpointingOptions.CHECKPOINT_RETENTION_TIME 配置项来控制 Checkpoint 的保留时间,从而实现在退出时不清理 Checkpoint。另外,可以通过设置 ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT 手动指定 Checkpoint 的保存路径,然后在重跑 SQL 时指定该路径进行恢复。具体操作可以参考 Flink 官方文档中的相关章节。

    2023-04-24 13:14:59
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    在 Flink SQL 中,可以通过设置 state.ttl 参数来控制状态的过期时间。默认情况下,状态会在作业完成后自动清理,可以通过将 state.ttl 参数设置为较大的值来延长状态的存储时间。同时,在重新启动 Flink 程序时,可以指定 checkpoint 的位置来恢复程序的状态。

    具体来说,您可以按照以下步骤配置 Flink SQL 程序:

    在 SQL 中设置 state.ttl 参数。可以在 SQL 中添加以下语句:

    SET table.exec.state.ttl = '24h';
    

    其中,24h 表示状态的过期时间为 24 小时。可以根据实际需求进行调整。

    在重新启动 Flink 程序时,指定 checkpoint 的位置。可以在启动命令中添加以下参数:

    ./bin/flink run -s /path/to/checkpoint
    
    

    其中,/path/to/checkpoint 表示 checkpoint 的位置。通过指定该参数,程序可以从指定的 checkpoint 处恢复状态,而不是从头开始执行。

    需要注意的是,如果设置了较大的 state.ttl 参数,可能会增加状态的存储成本和读写负担。同时,在重新启动 Flink 程序时,如果指定了 checkpoint 的位置,可能会导致程序重复消费一些数据,从而产生一些不一致的结果。因此,在配置 state.ttl 参数和指定 checkpoint 时,需要根据实际情况进行权衡和调整。

    2023-04-23 21:31:16
    赞同 展开评论 打赏
  • 技术架构师 阿里云开发者社区技术专家博主 CSDN签约专栏技术博主 掘金签约技术博主 云安全联盟专家 众多开源代码库Commiter

    是的,你可以在 Flink SQL 中配置退出时不清理 Checkpoint,并且可以通过指定 Checkpoint 来恢复之前的状态。

    具体来说,可以通过修改 Flink 配置文件中的以下参数来实现:

    state.checkpoints.dir: hdfs://localhost:9000/flink/checkpoints   # 设置 Checkpoint 存储目录
    state.savepoints.dir: hdfs://localhost:9000/flink/savepoints     # 设置 Savepoint 存储目录
    state.backend: filesystem                                       # 设置存储后端类型为文件系统
    

    如果你使用的是 RocksDB 或其他类型的存储后端,需要根据实际情况进行配置。

    然后,在 Flink SQL 中,可以通过设置 SET checkpoint_enabled = 'true' 来启用 Checkpoint,并通过 SET checkpoint_dir = '/path/to/checkpoints' 指定 Checkpoint 存储目录。例如,下面是一个示例 SQL 语句:

    SET checkpoint_enabled = 'true';
    SET checkpoint_dir = 'hdfs://localhost:9000/flink/checkpoints';
    
    -- 在这里写入你的 SQL 查询语句
    SELECT ...
    FROM ...
    WHERE ...
    
    -- 提交 SQL 查询任务
    INSERT INTO my_table
    

    上述 SQL 语句中,我们先通过 SET checkpoint_enabledSET checkpoint_dir 方法启用和指定 Checkpoint 相关配置。然后,执行 SQL 查询并将结果输出到 my_table 表中。

    在任务退出时,Flink 会将最后一个 Checkpoint 状态保存到指定的 Checkpoint 存储目录中,并生成一个 Savepoint。你可以通过 flink savepoint 命令来创建 Savepoint,并指定它的路径:

    ./bin/flink savepoint <job-id> [<savepoint-directory>]
    

    例如,下面是一个示例命令:

    ./bin/flink savepoint 8fae5d64a3e3b1da8f7c9b9a24d1ade2 hdfs://localhost:9000/flink/savepoints
    

    上述命令中,我们指定了 Job ID 和 Savepoint 目录,表示将当前任务的状态保存到指定的 Savepoint 中。

    当需要恢复之前的状态时,可以使用 flink run -s 命令来从指定的 Savepoint 启动 Flink 任务:

    ./bin/flink run -s <savepoint-path> <JAR-file>
    

    例如,下面是一个示例命令:

    ./bin/flink run -s hdfs://localhost:9000/flink/savepoints/savepoint-8fae5d-4ee4ff4d7044 ./my-job.jar
    

    上述命令中,我们指定了 Savepoint 路径和 JAR 文件路径,表示使用指定的 Savepoint 来启动 Flink 任务。

    2023-04-23 17:24:08
    赞同 展开评论 打赏
  • 热爱开发

    在 Flink SQL 中,可以通过配置 Checkpoint 相关的参数来实现在退出时不清理 Checkpoint,然后重跑 SQL 时指定 Checkpoint 进行恢复。具体操作如下:

    在 flink-conf.yaml 文件中设置 Checkpoint 参数 首先,您需要在 Flink 配置文件 flink-conf.yaml 中设置 Checkpoint 的相关参数。找到以下配置项:

    state.checkpoints.dir: file:///path/to/checkpoints 将其修改为:

    state.checkpoints.dir: hdfs:///path/to/checkpoints state.savepoints.dir: hdfs:///path/to/savepoints state.backend.fs.checkpointdir: /path/to/fs_checkpoint 上述配置将 Checkpoint 存储到 HDFS 上,并且启用了文件系统 Checkpoint。state.checkpoints.dir 是存储 Checkpoint 的路径,state.savepoints.dir 是存储 Savepoint 的路径,state.backend.fs.checkpointdir 是使用文件系统作为 Checkpoint 存储时的路径。

    在 SQL 中启用 Checkpoint 接下来,在 SQL 中启用 Checkpoint。在您的 SQL 文件或程序中添加以下语句:

    SET execution.checkpointing.mode = 'EXACTLY_ONCE';

    SET execution.checkpointing.interval = '10s';

    SET execution.checkpointing.timeout = '60s';

    SET execution.checkpointing.max-concurrent-checkpoints = '1'; 这些语句分别设置了 Checkpoint 的模式、间隔、超时时间和并发数。如果您希望在退出时不清理 Checkpoint,则需要将 execution.checkpointing.cleanup-on-failure 参数设置为 false。例如:

    SET execution.checkpointing.cleanup-on-failure = 'false'; 恢复 Checkpoint 在重跑 SQL 时,您可以通过以下语句来指定要恢复的 Checkpoint:

    SET execution.restore-from = 'latest-checkpoint'; 该语句将从最新的 Checkpoint 中恢复状态。如果需要从特定的 Checkpoint 恢复,则可以将其指定为参数值。例如:

    SET execution.restore-from = 'hdfs:///path/to/checkpoints/chk-123456789'; 上述语句将从名为 chk-123456789 的 Checkpoint 中恢复状态。

    请注意,在启用 Checkpoint 后,Flink 会自动将任务的状态和元数据保存到存储设备中。如果您不希望在退出时清理 Checkpoint,则需要手动删除过期的 Checkpoint 数据,避免占用过多存储空间。

    2023-04-23 17:09:17
    赞同 展开评论 打赏
  • Flink SQL 中的 Checkpoint 实际上是由 Flink 引擎提供的机制,用于实现容错性的保证。因此,如果你想要在退出时避免清除 Checkpoint,需要在 Flink 引擎的配置文件中进行配置。

    具体来说,可以通过修改 Flink 的配置文件 flink-conf.yaml 来实现该功能。具体步骤如下:

    找到 Flink 安装路径下的 conf 文件夹,如果没有则可以在安装路径下创建该文件夹。

    打开 conf/flink-conf.yaml 文件,找到 state.checkpoints.dir 参数,该参数指定了 Checkpoint 存储的目录路径。

    例如,可以将其配置为:state.checkpoints.dir: file:///path/to/checkpoints。

    注意,这里使用了本地文件系统,如果需要使用分布式文件系统(如 HDFS),可以在 file:// 前添加对应的协议。

    2023-04-23 16:40:19
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载