开发者社区 > 云原生 > Serverless > 正文

Serverless 工作流如何实现任务状态轮询?

Serverless 工作流如何实现任务状态轮询?

展开
收起
小天使爱美 2020-03-27 10:41:11 1196 0
1 条回答
写回答
取消 提交回答
  • 本文介绍了任务状态轮询和 Serverless 工作流实现的具体步骤。

    简介 在长时间任务的场景中如果任务结束后没有回调机制,开发者通常会采用轮询的方式来判断任务的结束。可靠的轮询实现需要维护状态的持久化以保证即使当前轮询进程失败退出,进程恢复后轮询也会继续进行。本示例通过一个假设场景:用户调用函数计算提交了一个多媒体处理任务,该任务耗时从 1 分钟到几小时不等,任务执行状态可以通过 API 查询,介绍如何使用 Serverless 工作流实现一个通用可靠的任务轮询工作流。

    Serverless 工作流实现 下面的教程会将两个 FC 函数编排成一个任务轮询工作流,该示例需要以下 3 个步骤:

    创建 FC 函数 创建 Serverless 工作流流程 开始执行并查看结果 步骤1:创建 FC 函数 首先创建一个名为 fnf-demo 的 FC 服务,并在该服务下创建两个 Python2.7 的函数,详细步骤请参见 FC 文档。 StartJob 函数:模拟通过调用 API 开始一个长时间的任务,返回一个任务 ID。 import logging import uuid

    def handler(event, context): logger = logging.getLogger() id = uuid.uuid4() logger.info('Started job with ID %s' % id) return {"job_id": str(id)}
    GetJobStatus 函数: 模拟通过调用 API 获取指定任务的执行结果,比较当前的时间和函数第一次执行的时间的差值和输入中 delay 的值,返回不同的状态:“success” 或 “running”。 import logging import uuid import time import json

    start_time = int(time.time())

    def handler(event, context): evt = json.loads(event) logger = logging.getLogger() job_id = evt["job_id"] logger.info('Started job with ID %s' % job_id)

    now = int(time.time()) status = "running"

    delay = 60 if "delay" in evt: delay = evt["delay"]

    if now - start_time > delay: status = "success"

    try_count = 0 if "try_count" in evt: try_count = evt["try_count"]

    try_count = try_count + 1 logger.info('Job %s, status %s, try_count %d' % (job_id, status, try_count)) return {"job_id": job_id, "job_status":status, "try_count":try_count}
    步骤 2:创建 Serverless 工作流流程 该流程的主要逻辑描述如下:

    StartJob 步骤: 调用 StartJob 函数开始一个任务。 Wait10s 步骤: 等待 10 秒。 GetJobStatus 步骤: 调用 GetJobStatus 函数查询当前任务状态。 CheckJobComplete 步骤: 检查 GetJobStatus 函数返回的结果: 如果返回 "success" 整个流程执行成功。 如果轮询尝试次数大于 3 次,认为任务执行失败,流程执行失败。 如果返回 "running" 则跳回到 Wait10s 步骤,继续执行。 version: v1 type: flow steps: - type: task name: StartJob resourceArn: acs:fc:cn-hangzhou:{accountID}:services/fnf-demo/functions/StartJob - type: pass name: Init outputMappings: - target: try_count source: 0 - type: wait name: Wait10s duration: 10 - type: task name: GetJobStatus resourceArn: acs:fc:cn-hangzhou:{accountID}:services/fnf-demo/functions/GetJobStatus inputMappings: - target: job_id source: $local.job_id - target: delay source: $input.delay - target: try_count source: $local.try_count - type: choice name: CheckJobComplete inputMappings: - target: status source: $local.job_status - target: try_count source: $local.try_count choices: - condition: $.status == "success" goto: JobSucceeded - condition: $.try_count > 3 goto: JobFailed - condition: $.status == "running" goto: Wait10s - type: succeed name: JobSucceeded - type: fail name: JobFailed
    步骤 3:开始执行并查看结果 在控制台创建好的流程中单击 新执行 并提供以下 JSON 对象作为输入,其中 delay 字段的值模拟任务完成需要的时间,这里预期任务在开始 20秒 后, GetJobStatus 函数返回 “success”,在此之前均返回 “running”,您可以调整 delay 的值观察不同的执行结果。

    { "delay": 20 }
    下图展示的是轮询从开始到结束的流程执行可视化。Screen Shot 2019-06-26 at 12.30.01 PM 下图展示的是任务需要 20 秒完成,可以看到流程执行历史中第一次 GetJobStatus 返回 “running” 因此 CheckJobComplete 的后续步骤眺回到 Wait10s 进行等待和下一次查询,第二次查询返回 “success”,流程执行结束。Screen Shot 2019-06-26 at 12.39.26 PM

    2020-03-27 10:46:26
    赞同 展开评论 打赏

快速交付实现商业价值。

相关产品

  • 函数计算
  • 相关电子书

    更多
    All in Serverless 阿里云核心产品全面升级 立即下载
    AIGC 浪潮之上,森马的 Serverless 实践之旅 立即下载
    极氪大数据 Serverless 应用实践 立即下载