一. 新建OSS配置文件存储仓库
1. 进入OSS控制台,新建bucket【fc-config】
2. 创建一个子账户,用于读取OSS上的文件
- 进入【RAM访问控制】-【用户】-【新建用户】
- 为子账户分配权限
- 为子账户创建AccessKey
3. 编写测试文件
在本地创建一个【fc-test-config.json】文件,写入一些测试内容:
{
"host": "127.0.0.1",
"port": 3306,
"username": "test",
"password": "123456"
}
4. 上传测试文件
二、编写配置中心代码
1. 使用模板生成
fun init -n fc-config https://github.com/l616769490/python3-http-example.git
2. 编写代码
import json
import oss2
def handler(environ, start_response):
# 获取请求体
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
request_body = environ['wsgi.input'].read(request_body_size)
status, response = getConfigByName(str(request_body, encoding = "utf-8"))
response_headers = [('Content-type', 'application/json')]
start_response(status, response_headers)
# 返回数据
return [json.dumps(response).encode()]
def getConfigByName(fileName):
utils = OSSUtils('这里替换成你的AccessKeyId', '这里替换成你的AccessKeySecret')
return utils.getConfigByName(fileName, 'fc-config')
_ENDPOINT = 'https://oss-cn-shanghai.aliyuncs.com'
class OSSUtils:
""" 封装OSS中的常用操作
"""
def __init__(self, accessKeyId, accessKeySecret):
"""
:param accessKeyId 阿里云accessKeyId
:param accessKeySecret 阿里云accessKeySecret
"""
self.accessKeyId = accessKeyId
self.accessKeySecret = accessKeySecret
def getConfigList(self, bucketName, dirName = ''):
""" 获取配置列表
:param bucketName bucket名
:param dirName 文件夹名
"""
auth=oss2.Auth(self.accessKeyId, self.accessKeySecret)
bucket = oss2.Bucket(auth, _ENDPOINT, bucketName)
files = []
for obj in oss2.ObjectIterator(bucket, dirName, ''):
if obj.key != dirName:
files.append(obj.key)
return '200', files
def getConfigByName(self, fileName, bucketName, dirName = ''):
""" 获取配置
:param fileName:配置文件名
:param bucketName bucket名
:param dirName 文件夹名
:return: status, data status:成功返回200,失败返回404; data:成功返回数据,失败返回错误信息
"""
auth=oss2.Auth(self.accessKeyId, self.accessKeySecret)
bucket = oss2.Bucket(auth, _ENDPOINT, bucketName)
objectName = dirName + fileName
status = '200'
data = ''
try:
remote_stream = bucket.get_object(objectName)
data = str(remote_stream.read(), encoding = 'utf-8')
except Exception as err:
return err.status, err.message
return status, data
def updateConfig(self, data, bucketName, dirName = ''):
""" 修改或者新增配置文件
:param data{fileName, data}: fileName:文件名; data:文件内容
:param bucketName bucket名
:param dirName 文件夹名
:return: status, data status:成功返回200,失败返回500; data:成功返回数据,失败返回错误信息
"""
status = '200'
data = ''
if (data == None):
data = '新增或修改失败'
return status, data
fileName = data['fileName']
configData = data['data']
try:
auth=oss2.Auth(self.accessKeyId, self.accessKeySecret)
bucket = oss2.Bucket(auth, _ENDPOINT, bucketName)
objectName = dirName + fileName
bucket.put_object(objectName, configData)
data = '操作成功'
except Exception as err:
return err.status, err.message
return status, data
三、测试
四、其他函数计算中调用
1. 新建一个无触发器的普通函数计算【fc-config-test】,代码如下:
# -*- coding: utf-8 -*-
import requests
import datetime
import json
_HEADER = {
'Content-Type' : 'application/json; charset=utf-8',
'Date' : (datetime.datetime.now()-datetime.timedelta(hours=8)).strftime("%a, %d %b %Y %H:%M:%S GMT")
}
_CONF_HOST = '修改为你的配置中心地址'
def getDataForStr(host, data):
""" 获取配置文件
:param data:请求内容,字符串
"""
r = requests.post(host, headers = _HEADER, data = data.encode())
return r
def handler(event, context):
conf = json.loads(getDataForStr(_CONF_HOST, 'fc-test-config.json').text)
return conf