开发者社区 > 云原生 > Serverless > 正文

Serverless 工作流如何实现异步任务回调?

Serverless 工作流如何实现异步任务回调?

展开
收起
小天使爱美 2020-03-27 10:42:28 1423 0
1 条回答
写回答
取消 提交回答
  • 本文介绍了 Serverless 工作流的回调功能。相比较轮询,使用回调有效地降低了延迟、减少了轮询对服务器造成的不必要压力。另外,回调功能配合队列可以实现对非 FC 任务的编排,将 Serverless 工作流的编排范围扩展到任意类型的计算资源。

    简介 长时间执行的任务通常会采用异步提交任务并返回任务标识 (ID),判断异步任务结束的方法通常有两种:轮询 (polling)和回调 (callback),在任务状态轮询中我们介绍了使用轮询来判断任务结束。Serverless 工作流的回调 (callback)功能,覆盖以下的痛点或场景:

    消除轮询周期长带来的不必要延迟。 消除大流量场景下高并发的轮询造成不必要的服务器资源压力和浪费 。 编排非 FC Function 的任务,例如运行在自建机房或 ECS 上的进程 。 需要人工干预的步骤,例如通知审批通过。 下图展示了使用 MNS 队列服务集成结合回调 API 编排自建资源,拓宽了 Serverless 工作流的适用场景。

    fnf-doc-service-integration-mns-queue 回调使用详解 在 Task 步骤中指定 pattern: waitForCallback,如下图状态机所示:该步骤会在提交 resourceArn 指定的任务后(如 FC invocation)该步骤会将一个 taskToken 存入到该步骤的 context 对象并进入一个暂停的状态,直到 Serverless 工作流收到回调或指定的任务超时。将 taskToken 传入 ReportTaskSucceed 或 ReportTaskFailed 接口去回调会使得该步骤继续执行。

    fnf-doc-callback-state-machine - type: task name: mytask resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function} pattern: waitForCallback # 指定该 Task 步骤在提交任务后等待回调。 inputMappings: - target: taskToken source: $context.task.token # 将 context 中的 taskToken 作为 input 传入 resourceArn 指定的函数。 outputMappings: - target: k source: $local.key # 将 ReportTaskSucceeded 中 output {"key": "value"} 映射成 {"k": "value"} 并作为该步骤的输出。
    示例 该示例共分为以下 3 个步骤:

    准备 Task Function 开始工作流 回调 步骤 1:准备 Task Function 创建下面一个简单的函数,该函数会将输入直接返回。 服务:fnf-demo。 函数:echo。 运行环境:python2.7。 函数入口:index.handler。 #!/usr/bin/env python import json

    def handler(event, context): return event
    步骤 2:开始工作流 在 Serverless 工作流控制台创建下面的流程,并开始执行。 流程名称:fnf-demo-callback。 流程角色:配置一个有 FC Invocation 权限的角色。 version: v1 type: flow steps: - type: task name: mytask resourceArn: acs:fc:::services/fnf-demo/functions/echo pattern: waitForCallback inputMappings: - target: taskToken source: $context.task.token outputMappings: - target: s source: $local.status
    流程开始后可以看到 mytask 步骤暂停在 TaskSubmitted 事件,等待回调。该事件的 output 中含有 taskToken 作为回调任务的标识。

    Screen Shot 2019-08-15 at 11.00.09 AM 步骤 3:回调 使用 Serverless 工作流的 Python SDK 在本地(或其他可以运行 Python 的环境)运行 callback.py 脚本,将 {task-token} 替换为 TaskSubmitted 事件中的值。 cd /tmp mkdir fnf-demo-callback cd fnf-demo-callback

    在虚拟环境中,安装 fnf python SDK。

    virtualenv env source env/bin/activate pip install -t . aliyun-python-sdk-core pip install -t . aliyun-python-sdk-fnf

    执行 worker 进程。

    export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret} python worker.py {task-token-from-TaskSubmitted}

    worker.py 代码:

    import os import sys from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.client import AcsClient from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest

    def main(): account_id = os.environ['ACCOUNT_ID'] akid = os.environ['AK_ID'] ak_secret = os.environ['AK_SECRET']

    fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou")

    task_token = sys.argv[1] print "task token " + task_token try: request = ReportTaskSucceededRequest.ReportTaskSucceededRequest() request.set_Output("{"status": "ok"}") request.set_TaskToken(task_token) resp = fnf_client.do_action_with_exception(request) print "Report task succeeded finished" except ServerException as e: print(e)

    if name == 'main': main()
    上述脚本回调成功后可以看到 mytask 步骤继续执行, ReportTaskSucceeded 中指定的输出 "{"status": "ok"}" 经过 outputMappings 的映射后变成 "{"s": "ok"}"。

    2020-03-27 10:49:44
    赞同 展开评论 打赏

快速交付实现商业价值。

相关产品

  • 函数计算
  • 相关电子书

    更多
    All in Serverless 阿里云核心产品全面升级 立即下载
    AIGC 浪潮之上,森马的 Serverless 实践之旅 立即下载
    极氪大数据 Serverless 应用实践 立即下载