Serverless 工作流实现分布式定时调度

本文涉及的产品
函数计算FC,每月15万CU 3个月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介:

Serverless 工作流 是一个高可用的任务编排服务,提供选择、并行、循环等流程控制,可视化的执行,异常捕捉和自动重试。极大简化复杂系统的开发和调试,让开发人员只需编写业务逻辑,免去流程控制和异常处理的重复性代码。

前言

对很多业务来说定时调度是最常见的需求,比如实现一个集群多台机器的定时状态检查。传统的通过 crond 服务来实现作业定时执行存在以下问题:

  1. 单机不稳定,如果 crond 服务停止或机器故障都会导致业务中断。
  2. 配置多台 worker 机器,分别启动 crond 服务来执行。worker 之间没法做到统一的调度,总的作业很难不重复的分配到各个 worker 上执行。
  3. 单个 worker 的执行从启动、执行、返回都是黑盒,无法可视化。基于所有 worker 总的作业数据也很难搜集。

这里的 worker 是指在某台机器上执行作业的程序。

使用 Serverless 工作流定时调度功能,可以非常简单的解决上述问题,主要有以下优势:

  1. 云端统一的定时调度,可靠性不受单个 worker 所在机器的影响。
  2. 定制化的 worker 输入,多个 worker 的输出自动聚合。
  3. 整个执行流程中每一步都是可视化的,并且可对单个 worker 执行异常做自动重试。

使用流程

可先参考帮助文档 使用 MNS 服务集成及回调编排任意任务类型 的单 worker 实现。以下主要介绍实现多 worker 以及定时调度。

创建 MNS 队列

前往 MNS 控制台 创建用于存放 worker 要执行的任务队列,比如命名为 workers

创建执行 worker 的工作流 flow

前往 Serverless 工作流控制台 使用以下定义创建工作流,比如命名为 demo-schedule-workers

version: v1
type: flow
steps:
  - type: foreach  # 并行循环步骤,并行的下发任务消息到 MNS 队列。
    name: workersForeach
    iterationMapping:
      collection: $.payload.workers
      item: workerName
    steps:
      - type: task
        name: workerTask
        resourceArn: acs:mns:::/queues/workers/messages  # 表示该任务(Task)步骤会向同区域, 同账号下的 MNS 队列 fnf-demo 发送消息。
        pattern: waitForCallback  # 表示该任务步骤在发送 MNS 消息成功后会暂停,直到收到回调。
        inputMappings:
            - target: task_token
              source: $context.task.token  # 从 context 对象中获取标识该任务的令牌 (task token)。
            - target: worker_name
              source: $input.workerName
        serviceParams:  # 服务集成参数。
            MessageBody: $  # 用映射后的 input 作为要发送消息的内容。
            Priority: 1  # 消息队列的优先级。

该流程主要做以下事情:

  1. 读取输入的 workers 任务列表。
  2. 通过 foreach 并行循环步骤遍历任务列表,并行执行 task 步骤 workerTask 将任务消息和系统自动生成的任务 taskToken 下发到 MNS 队列中。
  3. 流程阻塞,等待所有 task 执行完毕和上报状态。

taskToken 为 task 步骤中系统自动生成的 token,用于任务的 worker 报告状态。

编写 worker 脚本

worker 循环读取任务队列,执行任务(可以是任意类型的作业),作业完成后上报状态到工作流中。
一个简单的示例 worker.py 如下:

def main():
    region = os.environ['REGION']
    account_id = os.environ['ACCOUNT_ID']
    ak_id = os.environ['AK_ID']
    ak_secret = os.environ['AK_SECRET']

    queue_name = 'workers'
    fnf_client = AcsClient(
        ak_id,
        ak_secret,
        region,
        debug=False
    )

    mns_endpoint = 'https://{}.mns.{}.aliyuncs.com'.format(account_id, region)
    my_account = Account(mns_endpoint, ak_id, ak_secret)
    my_queue = my_account.get_queue(queue_name)
    my_queue.set_encoding(False)
    wait_seconds = 30

    try:
        while True:
            try:
                # Read message from mns queue
                print('Receiving messages...')
                recv_msg = my_queue.receive_message(wait_seconds)
                print('Received message: {}, body: {}'.format(recv_msg.message_id, recv_msg.message_body))

                # Parse message
                body = json.loads(recv_msg.message_body)
                task_token = body['task_token']
                worker_name = body['worker_name']

                # TODO here to implement your own worker logic
                worker()

                # After worker execution completed, report status to workflow
                output = {
                    worker_name: 'success'
                }
                output_str = json.dumps(output)
                request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
                request.set_Output(output_str)
                request.set_TaskToken(task_token)
                resp = fnf_client.do_action_with_exception(request)
                print('Report worker: {}, response: {}'.format(worker_name, resp))

                # Delete mns message in queue
                my_queue.delete_message(recv_msg.receipt_handle)
                print('Deleted message: {}'.format(recv_msg.message_id))
            except MNSExceptionBase as e:
                print(e)
            except ServerException as e:
                print(e)
                if e.error_code == 'TaskAlreadyCompleted':
                    my_queue.delete_message(recv_msg.receipt_handle)
                    print('Task already completed, deleted message: {}'.format(recv_msg.message_id))
    except ServerException as e:
        print(e)


def worker():
    print('Hello Serverless Workflow')


if __name__ == '__main__':
    main()
    

为工作流配置定时调度

可参考文档 创建定时调度,设置触发消息:

{"workers": ["worker1", "worker2", "worker3"]}

等待一段时间后,可看到工作流被定时执行,并处于等待 worker 执行状态:
image

启动本地 worker

在本地执行 python worker.py 启动 worker,可一台机器启动多个 worker 或在不同的机器上分别启动。
worker 启动后,可以看到工作流成功执行:
image

总结

通过 Serverless 工作流无需配置任何的服务器,就能实现一个分布式的定时调度系统。欢迎加入我们。

工作流官网客户群:
image

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
1月前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
7月前
|
监控 关系型数据库 Serverless
Serverless 应用引擎常见问题之工作流这执行输出通过jsonpath过滤如何解决
Serverless 应用引擎(Serverless Application Engine, SAE)是一种完全托管的应用平台,它允许开发者无需管理服务器即可构建和部署应用。以下是Serverless 应用引擎使用过程中的一些常见问题及其答案的汇总:
426 3
|
7月前
|
存储 弹性计算 Cloud Native
1 名工程师轻松管理 20 个工作流,创业企业用 Serverless 让数据处理流程提效
为应对挑战,语势科技采用云工作流CloudFlow和函数计算FC,实现数据处理流程的高效管理与弹性伸缩,提升整体研发效能。
65095 5
|
5月前
|
数据采集 JSON Serverless
通过百炼大模型+FC函数计算构建小红书图文工作流
使用阿里云函数服务和百炼平台,快速构建小红书图文创作工作流。通过两步轻松创建: 1) 在函数计算中利用Puppeteer构建卡片生成服务; 2) 在百炼平台上创建工作流,整合大模型、脚本和函数计算节点,实现图文内容的自动化处理和生成。此方案适合高效创作小红书内容。
1507 6
|
7月前
|
运维 Serverless API
四大软件架构:掌握单体、分布式、微服务、Serverless 的精髓
如果一个软件开发人员,不了解软件架构的演进,会制约技术的选型和开发人员的生存、晋升空间。这里我列举了目前主要的四种软件架构以及他们的优缺点,希望能够帮助软件开发人员拓展知识面。
|
运维 Java Serverless
深度解析四大主流软件架构模型:单体架构、分布式应用、微服务与Serverless的优缺点及场景应用
深度解析四大主流软件架构模型:单体架构、分布式应用、微服务与Serverless的优缺点及场景应用
1201 0
|
7月前
|
存储 Kubernetes Cloud Native
云原生离线工作流编排利器 -- 分布式工作流 Argo 集群
云原生离线工作流编排利器 -- 分布式工作流 Argo 集群
105250 2
|
7月前
|
数据可视化 Linux 调度
DolphinScheduler【部署 01】分布式可视化工作流任务调度工具DolphinScheduler部署使用实例分享(一篇入门学会使用DolphinScheduler)
DolphinScheduler【部署 01】分布式可视化工作流任务调度工具DolphinScheduler部署使用实例分享(一篇入门学会使用DolphinScheduler)
817 0
|
数据采集 Serverless
《Serverless 开发实战-快速开发一个分布式Puppeteer 网页截图服务》电子版地址
Serverless 开发实战-快速开发一个分布式Puppeteer 网页截图服务
《Serverless 开发实战-快速开发一个分布式Puppeteer 网页截图服务》电子版地址
|
缓存 运维 Kubernetes
开源工作流引擎如何支撑企业级 Serverless 架构?
Serverless 应用引擎(SAE)是一款底层基于 Kubernetes,实现了 Serverless 架构与微服务架构结合的云产品。作为一款不断迭代的云产品,在快速发展的过程中也遇到了许多挑战。如何在蓬勃发展的云原生时代中解决这些挑战,并进行可靠快速的云架构升级?SAE 团队和 KubeVela 社区针对这些挑战开展了紧密合作,并给出了云原生下的开源可复制解决方案——KubeVela Wor
199 1
开源工作流引擎如何支撑企业级 Serverless 架构?

热门文章

最新文章

相关产品

  • 函数计算
  • 下一篇
    DataWorks