Serverless 工作流如何实现MNS 主题集成和消息发布?
本文介绍了如何使用任务步骤的等待回调(waitForCallback)模式集成 MNS 主题,并发布消息到主题。MNS 主题接收到消息后,调用工作流 ReportTaskSucceeded 或 ReportTaskFailed API 回调任务状态。
框架原理 应用部署后执行流程如下:
执行工作流,任务步骤发布消息到 MNS 主题。任务步骤的 TaskToken 放入消息体一起发送到主题。 工作流任务步骤暂停执行,等待任务回调。 MNS 主题接收到消息后,将消息和 TaskToken 通过 HTTP 推送发送到函数计算 FC 的函数 HTTP 触发器,触发函数执行。 函数计算函数最终获取到 TaskToken,并调用 ReportTaskSucceeded 来报告任务状态。 流程继续执行。 11 部署应用 在 Serverless 工作流控制台创建流程,选择示例项目、任务步骤编排 MNS 主题模板,单击下一步。 2 在创建应用页面,创建模板对应的应用,并单击部署。 3 其中:
应用名称:自定义参数,同一账号下必须唯一。
TopicName:自定义参数,如果对应 MNS 主题不存在会自动创建。
单击部署后,会显示应用下创建的所有资源。4 执行工作流。 执行输入
{ "messageBody": "hello world" } 5 应用代码 编排 MNS 主题的工作流。 将任务步骤回调的 TaskToken 封装在消息的 MessageBody 中,用于后续的回调。outputMappings 中读取 ReportTaskSucceeded 设置的 output。
version: v1 type: flow steps: - type: task name: mns-topic-task resourceArn: acs:mns:::/topics/ /messages pattern: waitForCallback inputMappings: - target: messageBody source: $input.messageBody - target: taskToken source: $context.task.token outputMappings: - target: status source: $local.status serviceParams: MessageBody: $ 回调任务步骤的 FC 函数。 读取 MessageBody中封装的 TaskToken 回调任务状态设置 output 为 {"status":"success"}。
def handler(environ, start_response): # Get request body try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except ValueError: request_body_size = 0 request_body = environ['wsgi.input'].read(request_body_size) print('Request body: {}'.format(request_body))
body = json.loads(request_body)
message_body_str =
body['Message']
# Read MessageBody and TaskToken from
message body message_body = json.loads(message_body_str) task_token = message_body['taskToken'] ori_message_body = message_body['messageBody'] print('Task token: {}\norigin message body: {}'.format(task_token, ori_message_body))
# Init fnf client use sts token
context = environ['fc.context']
creds = context.credentials
sts_creds =
StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) fnf_client = AcsClient(credential=sts_creds, region_id=context.region)
# Report task succeeded to serverless
workflow req = ReportTaskSucceededRequest() req.set_TaskToken(task_token) req.set_Output('{"status": "success"}') resp = fnf_client.do_action_with_exception(req) print('Report task response: {}'.format(resp))
# Response to http request
status = '200 OK'
response_headers = [('Content-type',
'text/plain')] start_response(status, response_headers) return [b'OK']
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。