1. 背景介绍
有些企业在使用阿里云过程中,会构建自己的云管调用阿里云API完成自动化创建云资源。有些企业还没有云管,就只有一个审批系统(如:办公OA),研发通过提流程申请资源,运维手工去控制台开资源。
对于有云管的客户,在云管系统中会设计一块代码逻辑去完成跟阿里云OpenAPI的交互。有些客户就希望把这块跟阿里云交互的程序从云管剥离出来,因为这块逻辑相对独立。比如某英国企业,他们有在用SerivceNow做企业内部的IT服务,需要写一个Connect完成与阿里云做Integration。
对于只有一个审批系统的企业,他们也想尝试实现资源开通的自动化。比如某移动互联企业,他们的现状就是研发在内部提OA流程,运维看到申请单子之后去控制台手动开通。客户希望在OA点审批同意之后,能够完成阿里云资源开通一系列的交互动作。另外OA系统的开发由于不是自研,客户希望只做微小的功能开发就能够完成对接。
综上两种场景,本文介绍基于函数计算实现的异步任务执行框架(编程语言:Python3)。目的在于把跟阿里云资源开通相关的API封装到一个独立的模块,提供标准的API跟企业内部在用的ITSM或OA进行集成,降低客户对接API门槛,更快上阿里云。
2. 函数计算产品简介
函数计算是事件驱动的全托管计算服务。使用函数计算,您无需采购与管理服务器等基础设施,只需编写并上传代码。函数计算为您准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等功能。借助函数计算,您可以快速构建任何类型的应用和服务,并且只需为任务实际消耗的资源付费。(转载自官方文档)
工作流程:
本方案用到的功能
HTTP请求处理程序
可以使用HTTP请求处理程序更方便地处理HTTP请求。当调用函数时,函数计算使用您提供的执行方法来处理HTTP请求。
事件请求处理程序
事件触发器选择定时触发器。比如每分钟触发一次事件。
函数实例生命周期回调方法
当您实现并配置函数实例生命周期回调后,函数计算将在相关实例生命周期事件发生时调用对应的回调程序。函数实例生命周期涉及Initializer、PreFreeze和PreStop三种回调。当前,Python运行时支持Initializer和PreStop两种回调函数。
可以在Initializer回调函数中完成对数据库创建连接,在PreStop回调函数中完成对数据库连接的关闭。
函数计算可以提供Web服务,如果要提供生产级的公网服务,有两种选择:
配置ALB做负载。具体可参考:ALB支持添加函数计算FC作为后端服务
搭配API网关产品,提供更加安全的API服务。
3. APIGateway产品简介
API 网关(API Gateway)提供高性能、高可用的 API 托管服务,帮助用户对外开放其部署在 ECS、容器服务等阿里云产品上的应用,提供完整的 API 发布、管理、维护生命周期管理。用户只需进行简单的操作,即可快速、低成本、低风险地开放数据或服务。(转载自官方文档)
本方案用到的功能
创建后端服务为HTTP的API
具体配置可参考官网链接
API授权
API授权是指应用(APP)与API建立授权关系。应用(APP)是调用API时的身份,应用(APP)需要获得API的授权才能调用该API。具体可参考官网链接
4. 异步任务执行框架设计
架构图
框架提供一个标准HTTP请求,依据changeType不同会生成不同的资源变更。
变更进度可以将changeType = query 返回这条变更执行进展情况。
每隔1分钟会从RDS中从任务表获取最新任务,程序会依据不同资源类型调用不同Plugin执行任务。
云资源开通有两种类型,有些是同步开通,有些是异步开通。如果是同步开通,则函数计算调用云API之后只要成功就会更新任务及变更表,如果是异步开通,会再生成一条新的任务记录。
数据表
/******************************************/
/* DatabaseName = res_task */
/* TableName = change_task 变更表 */
/******************************************/
CREATE TABLE `change_task` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`feature` varchar(5000) NOT NULL,
`version` bigint(20) DEFAULT '1',
`invoke_system` varchar(200) DEFAULT NULL,
`request_id` varchar(200) DEFAULT NULL,
`task_type` varchar(150) DEFAULT NULL,
`task_status` varchar(150) DEFAULT 'init',
`task_result` varchar(5000) DEFAULT NULL,
`profile_env` varchar(200) DEFAULT NULL,
`change_task_id` varchar(150) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_id` (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8 COMMENT='变更任务表'
/******************************************/
/* DatabaseName = res_task */
/* TableName = task_job 任务队列表*/
/******************************************/
CREATE TABLE `task_job` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`feature` varchar(5000) NOT NULL,
`version` bigint(20) DEFAULT '1',
`execute_times` bigint(20) DEFAULT '0',
`command_type` varchar(200) DEFAULT NULL,
`command_status` varchar(200) DEFAULT 'init',
`dep_job_finish` bigint(20) DEFAULT '0' COMMENT '这条记录依赖的对象ID是否完成',
`job_result` varchar(5000) DEFAULT NULL,
`profile_env` varchar(200) DEFAULT NULL,
`change_task_id` varchar(200) NOT NULL,
`async_condition` varchar(2000) DEFAULT NULL COMMENT '用于异步查询用的条件',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_id` (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8 COMMENT='任务表'
任务队列表里面考虑了2点:
同一个变更可能会拆出多条执行任务,并且任务之间是有依赖关系。比如ECS资源创建,可能会拆出一条是创建VPC资源,再拆解一条成ECS资源。并且这两个任务之间是有依赖关系。
云资源的开通有些是异步的,即需要查询结果才知道资源是否真的创建成功。所以设计了一个用于异步查询的条件字段。
函数计算-代码
通过环境变量获取敏感信息
index.py
def initialize(context):
global connection,profile_env
try:
connection = pymysql.connect(
host=os.environ['MYSQL_ENDPOING'], # 替换为您的HOST名称。
port=int(os.environ['MYSQL_PORT']), # 替换为您的端口号。
user=os.environ['MYSQL_USER'], # 替换为您的用户名。
passwd=os.environ['MYSQL_PASSWORD'], # 替换为您的用户名对应的密码。
db=os.environ['MYSQL_DBNAME'], # 替换为您的数据库名称。
connect_timeout=5)
profile_env = os.environ['PROFILE'] # 当前环境标识
logger.info('connect mysql success!')
except Exception as e:
logger.error(e)
logger.error(
"ERROR: Unexpected error: Could not connect to MySql instance.")
raise Exception(str(e))
实现Flask应用
from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
return 'Web App with Python Flask!'
def handler(environ, start_response):
return app(environ, start_response)
通过临时密钥安全读写云资源(以RAM为例)
class RAM(object):
def __init__(self, sts_access_key, sts_access_secret, sts_token, region_id):
self.sts_access_key = sts_access_key
self.sts_access_secret = sts_access_secret
self.sts_token = sts_token
self.region_id = region_id
self.credentials = StsTokenCredential(self.sts_access_key, self.sts_access_secret, self.sts_token)
self.clt = AcsClient(region_id=self.region_id, credential=self.credentials)
def handler(event, context):
credit = context.credentials
auth = RAM(credit.access_key_id, credit.access_key_secret, credit.security_token, region)
return 'success'
5. 本文小结
通过本文的介绍,希望能够帮助大家了解阿里云函数计算及API网关产品,以及如何跟阿里云API进行深度集成。对文中提到的相关代码与方案感兴趣也可以直接钉钉找我交流。