开发者社区 > 云原生 > Serverless > 正文

有Serverless 工作流的Python SDK 使用示例吗?

有Serverless 工作流的Python SDK 使用示例吗?

展开
收起
小天使爱美 2020-03-27 11:12:13 1217 0
1 条回答
写回答
取消 提交回答
  • 本文介绍使用 Python SDK 的详细流程,包括环境准备、SDK 获取和安装和快速使用三部分。

    环境准备 要使用阿里云 Python SDK ,您需要一个云账号以及一对 Access Key ID 和 Access Key Secret 。 请在阿里云控制台中的 AccessKey 管理页面 上创建和查看您的 AccessKey,或者联系您的系统管理员。 要使用阿里云 SDK 访问某个产品的 API,您需要事先在 阿里云控制台 中开通这个产品。 SDK 获取和安装 使用 pip 安装(推荐)

    pip install aliyun-python-sdk-core # 安装阿里云 SDK 核心库 pip install aliyun-python-sdk-fnf # 安装 Serverless 工作流 SDK
    快速使用 下文将以创建一个流程,发起一次执行并获取执行详情为例展示如何使用 Python SDK 调用 Serverless 工作流服务。

    请求方式

    encoding: utf-8

    import time

    from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkfnf.request.v20190315 import CreateFlowRequest from aliyunsdkfnf.request.v20190315 import StartExecutionRequest from aliyunsdkfnf.request.v20190315 import DescribeExecutionRequest from aliyunsdkfnf.request.v20190315 import GetExecutionHistoryRequest

    flow_definition_type = "FDL" flow_name = "xxx" flow_definition = "xxx" flow_description = "some descriptions" role_arn = "acs:ram::${Your_Account_ID}:${Your_Role}" execution_name = "xxx"

    def create_flow(fnf_cli): request = CreateFlowRequest.CreateFlowRequest() request.set_Type(flow_definition_type) request.set_Description(flow_description) request.set_Definition(flow_definition) request.set_RoleArn(role_arn) request.set_Name(flow_name) return fnf_cli.do_action_with_exception(request)

    def start_execution(fnf_cli): request = StartExecutionRequest.StartExecutionRequest() request.set_FlowName(flow_name) request.set_ExecutionName(execution_name) return fnf_cli.do_action_with_exception(request)

    def describe_execution(fnf_cli): request = DescribeExecutionRequest.DescribeExecutionRequest() request.set_FlowName(flow_name) request.set_ExecutionName(execution_name) return fnf_cli.do_action_with_exception(request)

    def get_execution_history(fnf_cli): request = GetExecutionHistoryRequest.GetExecutionHistoryRequest() request.set_FlowName(flow_name) request.set_ExecutionName(execution_name) return fnf_cli.do_action_with_exception(request)
    创建客户端并利用上述函数发起一系列调用

    说明 如果您需要不加改造进行调试的话,请将下述函数与上述请求方式代码块置于同一个文件中,避免在 import 时报错。 def main():

    # 创建 AcsClient 实例
    client = AcsClient(
        "<your-access-key-id>",
        "<your-access-key-secret>",
        "<your-region-id>"
    )
    try:
        create_resp = create_flow(client)
        print(create_resp)
    
        start_resp = start_execution(client)
        print(start_resp)
    
        time.sleep(1)
    
        desc_resp = describe_execution(client)
        print(desc_resp)
    
        get_resp = get_execution_history(client)
        print(get_resp)
    except ServerException as e:
        print(e.get_request_id())
    

    if name == 'main': main()

    2020-03-27 11:25:18
    赞同 展开评论 打赏

快速交付实现商业价值。

相关产品

  • 函数计算
  • 相关电子书

    更多
    Python第四讲——使用IPython/Jupyter Notebook与日志服务玩转超大规模数据分析与可视化 立即下载
    Improving Python and Spark 立即下载
    一个跨平台的云服务SDK需要什么 立即下载