请问flink-cdc3.0 pipeline方式如何从savepoint启动?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC 3.0中,通过pipeline方式从savepoint启动作业需要结合DeploymentRestoreStrategy
和StartJobWithParams
的相关配置来实现。以下是具体步骤和配置说明:
在从savepoint启动之前,您需要明确要使用的savepoint ID。可以通过以下方式获取: - 查询已有的savepoint信息,参考Savepoint
文档中的savepointId
字段。 - 确保该savepoint的状态为可用(status
字段为COMPLETED
)。
根据DeploymentRestoreStrategy
文档,您需要设置启动策略为FROM_SAVEPOINT
,并指定对应的savepoint ID。以下是关键参数的说明: - kind
: 设置为FROM_SAVEPOINT
,表示从指定的savepoint启动。 - savepointId
: 填写目标savepoint的ID。 - allowNonRestoredState
(可选): 如果作业中存在无法恢复的状态(例如某些算子的状态丢失),可以设置为TRUE
以忽略这些状态。
示例配置如下:
restoreStrategy:
kind: "FROM_SAVEPOINT"
savepointId: "354dde66-a3ae-463e-967a-0b4107fd****"
allowNonRestoredState: true
使用Python SDK或API调用StartJobWithParams
接口启动作业。以下是关键步骤和代码示例:
workspace
: 工作空间ID。namespace
: 项目空间名称。deploymentId
: 作业部署ID。kind
: 启动位点类型,设置为FROM_SAVEPOINT
。以下是一个基于Python SDK的启动代码示例:
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
# 初始化客户端
def create_client():
config = open_api_models.Config(
access_key_id="your-access-key-id",
access_key_secret="your-access-key-secret",
endpoint="your-endpoint"
)
return ververica20220718Client(config)
# 启动作业
def start_job_with_savepoint():
client = create_client()
request = ververica_20220718_models.StartJobWithParamsRequest(
workspace="adf9e5147a****",
namespace="script****-default",
deployment_id="3171d4d1-5952-4d02-b978-e762493b****",
kind="FROM_SAVEPOINT",
savepoint_id="354dde66-a3ae-463e-967a-0b4107fd****"
)
response = client.start_job_with_params(request)
print(response.body)
if __name__ == '__main__':
start_job_with_savepoint()
route
模块的语句,则无法从已有状态恢复,必须进行无状态启动(kind
设置为NONE
)。schema.change.behavior
配置与下游目标端兼容,避免因Schema变更导致启动失败。启动后,您可以通过日志或API查询作业状态,确认是否成功从savepoint恢复。如果启动失败,请检查以下内容: - Savepoint ID是否正确。 - 是否存在无法恢复的状态(可通过allowNonRestoredState
参数解决)。 - 下游目标端是否支持当前Schema。
通过以上步骤,您可以成功使用Flink CDC 3.0 pipeline方式从savepoint启动作业。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。