开发者社区 > 云原生 > Serverless > 正文

Serverless 工作流如何实现MNS 主题集成和消息发布?

Serverless 工作流如何实现MNS 主题集成和消息发布?

展开
收起
小天使爱美 2020-03-27 10:41:54 1251 0
1 条回答
写回答
取消 提交回答
  • 本文介绍了如何使用任务步骤的等待回调(waitForCallback)模式集成 MNS 主题,并发布消息到主题。MNS 主题接收到消息后,调用工作流 ReportTaskSucceeded 或 ReportTaskFailed API 回调任务状态。

    框架原理 应用部署后执行流程如下:

    执行工作流,任务步骤发布消息到 MNS 主题。任务步骤的 TaskToken 放入消息体一起发送到主题。 工作流任务步骤暂停执行,等待任务回调。 MNS 主题接收到消息后,将消息和 TaskToken 通过 HTTP 推送发送到函数计算 FC 的函数 HTTP 触发器,触发函数执行。 函数计算函数最终获取到 TaskToken,并调用 ReportTaskSucceeded 来报告任务状态。 流程继续执行。 11 部署应用 在 Serverless 工作流控制台创建流程,选择示例项目、任务步骤编排 MNS 主题模板,单击下一步。 2 在创建应用页面,创建模板对应的应用,并单击部署。 3 其中:

    应用名称:自定义参数,同一账号下必须唯一。

    TopicName:自定义参数,如果对应 MNS 主题不存在会自动创建。

    单击部署后,会显示应用下创建的所有资源。4 执行工作流。 执行输入

    { "messageBody": "hello world" } 5 应用代码 编排 MNS 主题的工作流。 将任务步骤回调的 TaskToken 封装在消息的 MessageBody 中,用于后续的回调。outputMappings 中读取 ReportTaskSucceeded 设置的 output。

    version: v1 type: flow steps: - type: task name: mns-topic-task resourceArn: acs:mns:::/topics/ /messages pattern: waitForCallback inputMappings: - target: messageBody source: $input.messageBody - target: taskToken source: $context.task.token outputMappings: - target: status source: $local.status serviceParams: MessageBody: $ 回调任务步骤的 FC 函数。 读取 MessageBody中封装的 TaskToken 回调任务状态设置 output 为 {"status":"success"}。

    def handler(environ, start_response): # Get request body try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except ValueError: request_body_size = 0 request_body = environ['wsgi.input'].read(request_body_size) print('Request body: {}'.format(request_body))

    body = json.loads(request_body)
    message_body_str =
    

    body['Message']

    # Read MessageBody and TaskToken from
    

    message body message_body = json.loads(message_body_str) task_token = message_body['taskToken'] ori_message_body = message_body['messageBody'] print('Task token: {}\norigin message body: {}'.format(task_token, ori_message_body))

    # Init fnf client use sts token
    context = environ['fc.context']
    creds = context.credentials
    sts_creds =
    

    StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) fnf_client = AcsClient(credential=sts_creds, region_id=context.region)

    # Report task succeeded to serverless
    

    workflow req = ReportTaskSucceededRequest() req.set_TaskToken(task_token) req.set_Output('{"status": "success"}') resp = fnf_client.do_action_with_exception(req) print('Report task response: {}'.format(resp))

    # Response to http request
    status = '200 OK'
    response_headers = [('Content-type',
    

    'text/plain')] start_response(status, response_headers) return [b'OK']

    2020-03-27 10:48:20
    赞同 展开评论 打赏

快速交付实现商业价值。

相关产品

  • 函数计算
  • 热门讨论

    热门文章

    相关电子书

    更多
    Hologres Serverless之路:揭秘弹性计算组 立即下载
    Serverless开发平台: 让研发效能再提升 立即下载
    Serverless 引领云上研发新范式 立即下载