本文介绍了 Serverless 工作流的回调功能。相比较轮询,使用回调有效地降低了延迟、减少了轮询对服务器造成的不必要压力。另外,回调功能配合队列可以实现对非 FC 任务的编排,将 Serverless 工作流的编排范围扩展到任意类型的计算资源。
简介 长时间执行的任务通常会采用异步提交任务并返回任务标识 (ID),判断异步任务结束的方法通常有两种:轮询 (polling)和回调 (callback),在任务状态轮询中我们介绍了使用轮询来判断任务结束。Serverless 工作流的回调 (callback)功能,覆盖以下的痛点或场景:
消除轮询周期长带来的不必要延迟。 消除大流量场景下高并发的轮询造成不必要的服务器资源压力和浪费 。 编排非 FC Function 的任务,例如运行在自建机房或 ECS 上的进程 。 需要人工干预的步骤,例如通知审批通过。 下图展示了使用 MNS 队列服务集成结合回调 API 编排自建资源,拓宽了 Serverless 工作流的适用场景。
fnf-doc-service-integration-mns-queue 回调使用详解 在 Task 步骤中指定 pattern: waitForCallback,如下图状态机所示:该步骤会在提交 resourceArn 指定的任务后(如 FC invocation)该步骤会将一个 taskToken 存入到该步骤的 context 对象并进入一个暂停的状态,直到 Serverless 工作流收到回调或指定的任务超时。将 taskToken 传入 ReportTaskSucceed 或 ReportTaskFailed 接口去回调会使得该步骤继续执行。
fnf-doc-callback-state-machine - type: task name: mytask resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function} pattern: waitForCallback # 指定该 Task 步骤在提交任务后等待回调。 inputMappings: - target: taskToken source: $context.task.token # 将 context 中的 taskToken 作为 input 传入 resourceArn 指定的函数。 outputMappings: - target: k source: $local.key # 将 ReportTaskSucceeded 中 output {"key": "value"} 映射成 {"k": "value"} 并作为该步骤的输出。
示例 该示例共分为以下 3 个步骤:
准备 Task Function 开始工作流 回调 步骤 1:准备 Task Function 创建下面一个简单的函数,该函数会将输入直接返回。 服务:fnf-demo。 函数:echo。 运行环境:python2.7。 函数入口:index.handler。 #!/usr/bin/env python import json
def handler(event, context): return event
步骤 2:开始工作流 在 Serverless 工作流控制台创建下面的流程,并开始执行。 流程名称:fnf-demo-callback。 流程角色:配置一个有 FC Invocation 权限的角色。 version: v1 type: flow steps: - type: task name: mytask resourceArn: acs:fc:::services/fnf-demo/functions/echo pattern: waitForCallback inputMappings: - target: taskToken source: $context.task.token outputMappings: - target: s source: $local.status
流程开始后可以看到 mytask 步骤暂停在 TaskSubmitted 事件,等待回调。该事件的 output 中含有 taskToken 作为回调任务的标识。
Screen Shot 2019-08-15 at 11.00.09 AM 步骤 3:回调 使用 Serverless 工作流的 Python SDK 在本地(或其他可以运行 Python 的环境)运行 callback.py 脚本,将 {task-token} 替换为 TaskSubmitted 事件中的值。 cd /tmp mkdir fnf-demo-callback cd fnf-demo-callback
virtualenv env source env/bin/activate pip install -t . aliyun-python-sdk-core pip install -t . aliyun-python-sdk-fnf
export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret} python worker.py {task-token-from-TaskSubmitted}
import os import sys from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.client import AcsClient from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
def main(): account_id = os.environ['ACCOUNT_ID'] akid = os.environ['AK_ID'] ak_secret = os.environ['AK_SECRET']
fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou")
task_token = sys.argv[1] print "task token " + task_token try: request = ReportTaskSucceededRequest.ReportTaskSucceededRequest() request.set_Output("{"status": "ok"}") request.set_TaskToken(task_token) resp = fnf_client.do_action_with_exception(request) print "Report task succeeded finished" except ServerException as e: print(e)
if name == 'main': main()
上述脚本回调成功后可以看到 mytask 步骤继续执行, ReportTaskSucceeded 中指定的输出 "{"status": "ok"}" 经过 outputMappings 的映射后变成 "{"s": "ok"}"。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。