Serverless 工作流实现分布式定时调度-阿里云开发者社区

开发者社区> 阿里云Serverless Compute> 正文

Serverless 工作流实现分布式定时调度

简介:

Serverless 工作流 是一个高可用的任务编排服务,提供选择、并行、循环等流程控制,可视化的执行,异常捕捉和自动重试。极大简化复杂系统的开发和调试,让开发人员只需编写业务逻辑,免去流程控制和异常处理的重复性代码。

前言

对很多业务来说定时调度是最常见的需求,比如实现一个集群多台机器的定时状态检查。传统的通过 crond 服务来实现作业定时执行存在以下问题:

  1. 单机不稳定,如果 crond 服务停止或机器故障都会导致业务中断。
  2. 配置多台 worker 机器,分别启动 crond 服务来执行。worker 之间没法做到统一的调度,总的作业很难不重复的分配到各个 worker 上执行。
  3. 单个 worker 的执行从启动、执行、返回都是黑盒,无法可视化。基于所有 worker 总的作业数据也很难搜集。

这里的 worker 是指在某台机器上执行作业的程序。

使用 Serverless 工作流定时调度功能,可以非常简单的解决上述问题,主要有以下优势:

  1. 云端统一的定时调度,可靠性不受单个 worker 所在机器的影响。
  2. 定制化的 worker 输入,多个 worker 的输出自动聚合。
  3. 整个执行流程中每一步都是可视化的,并且可对单个 worker 执行异常做自动重试。

使用流程

可先参考帮助文档 使用 MNS 服务集成及回调编排任意任务类型 的单 worker 实现。以下主要介绍实现多 worker 以及定时调度。

创建 MNS 队列

前往 MNS 控制台 创建用于存放 worker 要执行的任务队列,比如命名为 workers

创建执行 worker 的工作流 flow

前往 Serverless 工作流控制台 使用以下定义创建工作流,比如命名为 demo-schedule-workers

version: v1
type: flow
steps:
  - type: foreach  # 并行循环步骤,并行的下发任务消息到 MNS 队列。
    name: workersForeach
    iterationMapping:
      collection: $.payload.workers
      item: workerName
    steps:
      - type: task
        name: workerTask
        resourceArn: acs:mns:::/queues/workers/messages  # 表示该任务(Task)步骤会向同区域, 同账号下的 MNS 队列 fnf-demo 发送消息。
        pattern: waitForCallback  # 表示该任务步骤在发送 MNS 消息成功后会暂停,直到收到回调。
        inputMappings:
            - target: task_token
              source: $context.task.token  # 从 context 对象中获取标识该任务的令牌 (task token)。
            - target: worker_name
              source: $input.workerName
        serviceParams:  # 服务集成参数。
            MessageBody: $  # 用映射后的 input 作为要发送消息的内容。
            Priority: 1  # 消息队列的优先级。

该流程主要做以下事情:

  1. 读取输入的 workers 任务列表。
  2. 通过 foreach 并行循环步骤遍历任务列表,并行执行 task 步骤 workerTask 将任务消息和系统自动生成的任务 taskToken 下发到 MNS 队列中。
  3. 流程阻塞,等待所有 task 执行完毕和上报状态。

taskToken 为 task 步骤中系统自动生成的 token,用于任务的 worker 报告状态。

编写 worker 脚本

worker 循环读取任务队列,执行任务(可以是任意类型的作业),作业完成后上报状态到工作流中。
一个简单的示例 worker.py 如下:

def main():
    region = os.environ['REGION']
    account_id = os.environ['ACCOUNT_ID']
    ak_id = os.environ['AK_ID']
    ak_secret = os.environ['AK_SECRET']

    queue_name = 'workers'
    fnf_client = AcsClient(
        ak_id,
        ak_secret,
        region,
        debug=False
    )

    mns_endpoint = 'https://{}.mns.{}.aliyuncs.com'.format(account_id, region)
    my_account = Account(mns_endpoint, ak_id, ak_secret)
    my_queue = my_account.get_queue(queue_name)
    my_queue.set_encoding(False)
    wait_seconds = 30

    try:
        while True:
            try:
                # Read message from mns queue
                print('Receiving messages...')
                recv_msg = my_queue.receive_message(wait_seconds)
                print('Received message: {}, body: {}'.format(recv_msg.message_id, recv_msg.message_body))

                # Parse message
                body = json.loads(recv_msg.message_body)
                task_token = body['task_token']
                worker_name = body['worker_name']

                # TODO here to implement your own worker logic
                worker()

                # After worker execution completed, report status to workflow
                output = {
                    worker_name: 'success'
                }
                output_str = json.dumps(output)
                request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
                request.set_Output(output_str)
                request.set_TaskToken(task_token)
                resp = fnf_client.do_action_with_exception(request)
                print('Report worker: {}, response: {}'.format(worker_name, resp))

                # Delete mns message in queue
                my_queue.delete_message(recv_msg.receipt_handle)
                print('Deleted message: {}'.format(recv_msg.message_id))
            except MNSExceptionBase as e:
                print(e)
            except ServerException as e:
                print(e)
                if e.error_code == 'TaskAlreadyCompleted':
                    my_queue.delete_message(recv_msg.receipt_handle)
                    print('Task already completed, deleted message: {}'.format(recv_msg.message_id))
    except ServerException as e:
        print(e)


def worker():
    print('Hello Serverless Workflow')


if __name__ == '__main__':
    main()
    

为工作流配置定时调度

可参考文档 创建定时调度,设置触发消息:

{"workers": ["worker1", "worker2", "worker3"]}

等待一段时间后,可看到工作流被定时执行,并处于等待 worker 执行状态:
image

启动本地 worker

在本地执行 python worker.py 启动 worker,可一台机器启动多个 worker 或在不同的机器上分别启动。
worker 启动后,可以看到工作流成功执行:
image

总结

通过 Serverless 工作流无需配置任何的服务器,就能实现一个分布式的定时调度系统。欢迎加入我们。

工作流官网客户群:
image

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
阿里云Serverless Compute
使用钉钉扫一扫加入圈子
+ 订阅

Serverless微服务应用实践分享

函数计算官网