请问flink-cdc3.0 pipeline方式如何从savepoint启动?

请问flink-cdc3.0 pipeline方式如何从savepoint启动?

展开
收起
三分钟热度的鱼 2024-04-17 18:59:37 60 分享 版权
阿里云 AI 助理回答

在Flink CDC 3.0中,通过pipeline方式从savepoint启动作业需要结合DeploymentRestoreStrategyStartJobWithParams的相关配置来实现。以下是具体步骤和配置说明:


1. 确认Savepoint ID

在从savepoint启动之前,您需要明确要使用的savepoint ID。可以通过以下方式获取: - 查询已有的savepoint信息,参考Savepoint文档中的savepointId字段。 - 确保该savepoint的状态为可用(status字段为COMPLETED)。


2. 配置启动策略

根据DeploymentRestoreStrategy文档,您需要设置启动策略为FROM_SAVEPOINT,并指定对应的savepoint ID。以下是关键参数的说明: - kind: 设置为FROM_SAVEPOINT,表示从指定的savepoint启动。 - savepointId: 填写目标savepoint的ID。 - allowNonRestoredState(可选): 如果作业中存在无法恢复的状态(例如某些算子的状态丢失),可以设置为TRUE以忽略这些状态。

示例配置如下:

restoreStrategy:
  kind: "FROM_SAVEPOINT"
  savepointId: "354dde66-a3ae-463e-967a-0b4107fd****"
  allowNonRestoredState: true

3. 启动作业

使用Python SDK或API调用StartJobWithParams接口启动作业。以下是关键步骤和代码示例:

必填参数

  • workspace: 工作空间ID。
  • namespace: 项目空间名称。
  • deploymentId: 作业部署ID。
  • kind: 启动位点类型,设置为FROM_SAVEPOINT

代码示例

以下是一个基于Python SDK的启动代码示例:

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models

# 初始化客户端
def create_client():
    config = open_api_models.Config(
        access_key_id="your-access-key-id",
        access_key_secret="your-access-key-secret",
        endpoint="your-endpoint"
    )
    return ververica20220718Client(config)

# 启动作业
def start_job_with_savepoint():
    client = create_client()
    request = ververica_20220718_models.StartJobWithParamsRequest(
        workspace="adf9e5147a****",
        namespace="script****-default",
        deployment_id="3171d4d1-5952-4d02-b978-e762493b****",
        kind="FROM_SAVEPOINT",
        savepoint_id="354dde66-a3ae-463e-967a-0b4107fd****"
    )
    response = client.start_job_with_params(request)
    print(response.body)

if __name__ == '__main__':
    start_job_with_savepoint()

4. 注意事项

  • 无状态启动限制:如果修改了route模块的语句,则无法从已有状态恢复,必须进行无状态启动(kind设置为NONE)。
  • Schema变更行为:确保schema.change.behavior配置与下游目标端兼容,避免因Schema变更导致启动失败。
  • Tag与Savepoint结合:如果savepoint对应的快照文件可能已被清理,建议结合Paimon的Tag功能,确保快照状态长期保留。

5. 验证启动结果

启动后,您可以通过日志或API查询作业状态,确认是否成功从savepoint恢复。如果启动失败,请检查以下内容: - Savepoint ID是否正确。 - 是否存在无法恢复的状态(可通过allowNonRestoredState参数解决)。 - 下游目标端是否支持当前Schema。


通过以上步骤,您可以成功使用Flink CDC 3.0 pipeline方式从savepoint启动作业。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理