基于函数计算搭建的异步任务执行框架

简介: 本文介绍基于函数计算实现的异步任务执行框架(编程语言:Python3),把跟阿里云资源开通相关的API封装到一个独立的模块,提供标准的API跟企业内部在用的ITSM或OA进行集成,降低客户对接API门槛,更快上阿里云。


1. 背景介绍

有些企业在使用阿里云过程中,会构建自己的云管调用阿里云API完成自动化创建云资源。有些企业还没有云管,就只有一个审批系统(如:办公OA),研发通过提流程申请资源,运维手工去控制台开资源。

对于有云管的客户,在云管系统中会设计一块代码逻辑去完成跟阿里云OpenAPI的交互。有些客户就希望把这块跟阿里云交互的程序从云管剥离出来,因为这块逻辑相对独立。比如某英国企业,他们有在用SerivceNow做企业内部的IT服务,需要写一个Connect完成与阿里云做Integration。

对于只有一个审批系统的企业,他们也想尝试实现资源开通的自动化。比如某移动互联企业,他们的现状就是研发在内部提OA流程,运维看到申请单子之后去控制台手动开通。客户希望在OA点审批同意之后,能够完成阿里云资源开通一系列的交互动作。另外OA系统的开发由于不是自研,客户希望只做微小的功能开发就能够完成对接。

综上两种场景,本文介绍基于函数计算实现的异步任务执行框架(编程语言:Python3)。目的在于把跟阿里云资源开通相关的API封装到一个独立的模块,提供标准的API跟企业内部在用的ITSM或OA进行集成,降低客户对接API门槛,更快上阿里云。

2. 函数计算产品简介

函数计算是事件驱动的全托管计算服务。使用函数计算,您无需采购与管理服务器等基础设施,只需编写并上传代码。函数计算为您准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等功能。借助函数计算,您可以快速构建任何类型的应用和服务,并且只需为任务实际消耗的资源付费。(转载自官方文档)

工作流程:

本方案用到的功能

HTTP请求处理程序

可以使用HTTP请求处理程序更方便地处理HTTP请求。当调用函数时,函数计算使用您提供的执行方法来处理HTTP请求。

事件请求处理程序

事件触发器选择定时触发器。比如每分钟触发一次事件。

函数实例生命周期回调方法

当您实现并配置函数实例生命周期回调后,函数计算将在相关实例生命周期事件发生时调用对应的回调程序。函数实例生命周期涉及Initializer、PreFreeze和PreStop三种回调。当前,Python运行时支持Initializer和PreStop两种回调函数。

可以在Initializer回调函数中完成对数据库创建连接,在PreStop回调函数中完成对数据库连接的关闭。

函数计算可以提供Web服务,如果要提供生产级的公网服务,有两种选择:

3. APIGateway产品简介

API 网关(API Gateway)提供高性能、高可用的 API 托管服务,帮助用户对外开放其部署在 ECS、容器服务等阿里云产品上的应用,提供完整的 API 发布、管理、维护生命周期管理。用户只需进行简单的操作,即可快速、低成本、低风险地开放数据或服务。(转载自官方文档)

本方案用到的功能

创建后端服务为HTTP的API

具体配置可参考官网链接

API授权

API授权是指应用(APP)与API建立授权关系。应用(APP)是调用API时的身份,应用(APP)需要获得API的授权才能调用该API。具体可参考官网链接

4. 异步任务执行框架设计

架构图

  • 框架提供一个标准HTTP请求,依据changeType不同会生成不同的资源变更。

  • 变更进度可以将changeType = query 返回这条变更执行进展情况。

  • 每隔1分钟会从RDS中从任务表获取最新任务,程序会依据不同资源类型调用不同Plugin执行任务。

  • 云资源开通有两种类型,有些是同步开通,有些是异步开通。如果是同步开通,则函数计算调用云API之后只要成功就会更新任务及变更表,如果是异步开通,会再生成一条新的任务记录。

数据表

/******************************************/
/*   DatabaseName = res_task   */
/*   TableName = change_task 变更表  */
/******************************************/
CREATE TABLE `change_task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `feature` varchar(5000) NOT NULL,
  `version` bigint(20) DEFAULT '1',
  `invoke_system` varchar(200) DEFAULT NULL,
  `request_id` varchar(200) DEFAULT NULL,
  `task_type` varchar(150) DEFAULT NULL,
  `task_status` varchar(150) DEFAULT 'init',
  `task_result` varchar(5000) DEFAULT NULL,
  `profile_env` varchar(200) DEFAULT NULL,
  `change_task_id` varchar(150) NOT NULL,  
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_id` (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8 COMMENT='变更任务表'
/******************************************/
/*   DatabaseName = res_task   */
/*   TableName = task_job 任务队列表*/
/******************************************/
CREATE TABLE `task_job` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `feature` varchar(5000) NOT NULL,
  `version` bigint(20) DEFAULT '1',
  `execute_times` bigint(20) DEFAULT '0',  
  `command_type` varchar(200) DEFAULT NULL,
  `command_status` varchar(200) DEFAULT 'init',
  `dep_job_finish` bigint(20) DEFAULT '0' COMMENT '这条记录依赖的对象ID是否完成',
  `job_result` varchar(5000) DEFAULT NULL,
  `profile_env` varchar(200) DEFAULT NULL,
  `change_task_id` varchar(200) NOT NULL,
  `async_condition` varchar(2000) DEFAULT NULL COMMENT '用于异步查询用的条件',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_id` (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8 COMMENT='任务表'

任务队列表里面考虑了2点:

  • 同一个变更可能会拆出多条执行任务,并且任务之间是有依赖关系。比如ECS资源创建,可能会拆出一条是创建VPC资源,再拆解一条成ECS资源。并且这两个任务之间是有依赖关系。

  • 云资源的开通有些是异步的,即需要查询结果才知道资源是否真的创建成功。所以设计了一个用于异步查询的条件字段。

函数计算-代码

通过环境变量获取敏感信息

index.py

def initialize(context):
    global connection,profile_env
    try:
        connection = pymysql.connect(
            host=os.environ['MYSQL_ENDPOING'],  # 替换为您的HOST名称。
            port=int(os.environ['MYSQL_PORT']),  # 替换为您的端口号。
            user=os.environ['MYSQL_USER'],  # 替换为您的用户名。
            passwd=os.environ['MYSQL_PASSWORD'],  # 替换为您的用户名对应的密码。
            db=os.environ['MYSQL_DBNAME'],  # 替换为您的数据库名称。
            connect_timeout=5)
        profile_env = os.environ['PROFILE'] # 当前环境标识
        logger.info('connect mysql success!')
    except Exception as e:
        logger.error(e)
        logger.error(
            "ERROR: Unexpected error: Could not connect to MySql instance.")
        raise Exception(str(e))

实现Flask应用

from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return 'Web App with Python Flask!'

def handler(environ, start_response):
    return app(environ, start_response)

通过临时密钥安全读写云资源(以RAM为例)

class RAM(object):

    def __init__(self, sts_access_key, sts_access_secret, sts_token, region_id):
        self.sts_access_key = sts_access_key
        self.sts_access_secret = sts_access_secret
        self.sts_token = sts_token
        self.region_id = region_id
        self.credentials = StsTokenCredential(self.sts_access_key, self.sts_access_secret, self.sts_token)
        self.clt = AcsClient(region_id=self.region_id, credential=self.credentials)

def handler(event, context):
    credit = context.credentials
    auth = RAM(credit.access_key_id, credit.access_key_secret, credit.security_token, region)
    return 'success'

5. 本文小结

通过本文的介绍,希望能够帮助大家了解阿里云函数计算及API网关产品,以及如何跟阿里云API进行深度集成。对文中提到的相关代码与方案感兴趣也可以直接钉钉找我交流。

作者介绍
目录