更贴近“本地”的代码执行器
本地代码执行与OJ模式的区别
众所周知,我们在本地执行以下代码:
import time
print("hello world")
time.sleep(10)
tempInput = input("please: ")
print("Input data: ", tempInput)
实际表现是:
- 系统输出hello world
- 系统等待10秒
- 系统提醒我们please,我们此时可以输入一个字符串
- 系统输出Input data以及我们刚刚输入的字符串
但是,这段代码如果应用于传统OJ或者上部分我们所实现的简单的在线代码执行器中,表现则有一些不同:
- 代码与我们要输入内容一同传给系统
- 系统等待10秒
- 输出hello world、please,以及最后输Input data和我们输入的内容
可以看到,OJ上的体验效果和本地是有非常大的差距的。
新执行器的流程
为了减少这种体验不统一的问题,可以将代码和结构进行进一步的升级:
在整个项目中,包括了两个函数,两个存储桶:
- 业务逻辑函数:该函数的主要操作是业务逻辑,包括创建代码执行的任务(通过对象存储触发器进行异步函数执行),以及获取函数输出结果以及对任务函数的标准输入进行相关操作等;
- 执行器函数:该函数的主要作用是执行用户的函数代码,这部分是通过对象存储触发,通过下载代码、执行代码、获取输入、输出结果等;代码获取从代码存储桶,输出结果和获取输入从业务存储桶;
- 代码存储桶:该存储桶的作用是存储代码,当用户发起运行代码的请求, 业务逻辑函数收到用户代码后,会将代码存储到该存储桶,再由该存储桶处罚异步任务;
- 业务存储桶:该存储桶的作用是中间量的输出,主要包括输出内容的缓存、输入内容的缓存;该部分数据可以通过对象存储的本身特性进行生命周期的制定;
代码实现
为了让代码在线执行起来,更加贴近本地体验,该方案的代码分为两个函数:
函数:业务逻辑函数
# -*- coding: utf-8 -*-
import os
import oss2
import json
import uuid
import random
# 基本配置信息
AccessKey = {
"id": os.environ.get('AccessKeyId'),
"secret": os.environ.get('AccessKeySecret')
}
OSSCodeConf = {
'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketCodeName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
OSSTargetConf = {
'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketTargetName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
# 获取获取/上传文件到OSS的临时地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])
# 随机字符串
randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))
# Response
class Response:
def __init__(self, start_response, response, errorCode=None):
self.start = start_response
responseBody = {
'Error': {"Code": errorCode, "Message": response},
} if errorCode else {
'Response': response
}
# 默认增加uuid,便于后期定位
responseBody['ResponseId'] = str(uuid.uuid1())
print("Response: ", json.dumps(responseBody))
self.response = json.dumps(responseBody)
def __iter__(self):
status = '200'
response_headers = [('Content-type', 'application/json; charset=UTF-8')]
self.start(status, response_headers)
yield self.response.encode("utf-8")
def handler(environ, start_response):
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))
reqType = requestBody.get("type", None)
if reqType == "run":
# 运行代码
code = requestBody.get("code", None)
runId = randomStr(10)
codeBucket.put_object(runId, code.encode("utf-8"))
responseData = runId
elif reqType == "input":
# 输入内容
inputData = requestBody.get("input", None)
runId = requestBody.get("id", None)
targetBucket.put_object(runId + "-input", inputData.encode("utf-8"))
responseData = 'ok'
elif reqType == "output":
# 获取结果
runId = requestBody.get("id", None)
targetBucket.get_object_to_file(runId + "-output", '/tmp/' + runId)
with open('/tmp/' + runId) as f:
responseData = f.read()
else:
responseData = "Error"
print(responseData)
return Response(start_response, {"result": responseData})
函数:执行器函数
# -*- coding: utf-8 -*-
import os
import re
import oss2
import json
import time
import pexpect
# 基本配置信息
AccessKey = {
"id": os.environ.get('AccessKeyId'),
"secret": os.environ.get('AccessKeySecret')
}
OSSCodeConf = {
'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketCodeName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
OSSTargetConf = {
'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketTargetName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
# 获取获取/上传文件到OSS的临时地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])
def handler(event, context):
event = json.loads(event.decode("utf-8"))
for eveEvent in event["events"]:
# 获取object
print("获取object")
code = eveEvent["oss"]["object"]["key"]
localFileName = "/tmp/" + event["events"][0]["oss"]["object"]["eTag"]
# 下载图片
print("下载代码")
print("code: ", code)
codeBucket.get_object_to_file(code, localFileName)
# 执行代码
foo = pexpect.spawn('python %s' % localFileName)
outputData = ""
startTime = time.time()
# timeout可以通过文件名来进行识别
try:
timeout = int(re.findall("timeout(.*?)s", code)[0])
except:
timeout = 60
while (time.time() - startTime) / 1000 <= timeout:
try:
tempOutput = foo.read_nonblocking(size=999999, timeout=0.01)
tempOutput = tempOutput.decode("utf-8", "ignore")
if len(str(tempOutput)) > 0:
outputData = outputData + tempOutput
# 输出数据存入oss
targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
except Exception as e:
print("Error: ", e)
# 有输入请求被阻塞
if str(e) == "Timeout exceeded.":
try:
# 从oss读取数据
targetBucket.get_object_to_file(code + "-input", localFileName + "-input")
targetBucket.delete_object(code + "-input")
with open(localFileName + "-input") as f:
inputData = f.read()
if inputData:
foo.sendline(inputData)
except:
pass
# 程序执行完成输出
elif "End Of File (EOF)" in str(e):
targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
return True
# 程序抛出异常
else:
outputData = outputData + "\n\nException: %s" % str(e)
targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
return False
部署上线
可以通过Serverless Devs将代码部署到线上,设定全局变量:
Global:
Access: release
Region: cn-beijing
AccessKeyId: ${Env(AccessKeyId)}
AccessKeySecret: ${Env(AccessKeySecret)}
OSSConfBucketCodeName: serverlessbook-runcode-code
OSSConfBucketTargetName: serverlessbook-runcode-others
OSSConfObjectSignUrlTimeOut: 1200
Service:
Name: ServerlessBook
Description: Serverless图书案例
Log:
Project: aliyun-fc-cn-beijing-9bd4a65b-c9d1-529d-9ce0-c91adfb823ab
LogStore: function-log
代码存储桶:
CodeBucket:
Component: oss
Provider: alibaba
Access: ${Global.Access}
Properties:
Region: ${Global.Region}
Bucket: ${Global.OSSConfBucketCodeName}
业务存储桶:
TargetBucket:
Component: oss
Provider: alibaba
Access: ${Global.Access}
Properties:
Region: ${Global.Region}
Bucket: ${Global.OSSConfBucketTargetName}
业务逻辑函数:
ServerlessBookRunCodeMain:
Component: fc
Provider: alibaba
Access: ${Global.Access}
Extends:
deploy:
- Hook: s ServerlessBookRunCodeMain install docker
Src: ./main
Pre: true
Properties:
Region: ${Global.Region}
Service: ${Global.Service}
Function:
Name: serverless_runcode_main
Description: 业务逻辑
CodeUri: ./main
Handler: index.handler
MemorySize: 128
Runtime: python3
Timeout: 5
Environment:
- Key: AccessKeyId
Value: ${Global.AccessKeyId}
- Key: AccessKeySecret
Value: ${Global.AccessKeySecret}
- Key: OSSConfBucketCodeName
Value: ${CodeBucket.Output.Bucket}
- Key: OSSConfBucketTargetName
Value: ${TargetBucket.Output.Bucket}
- Key: OSSConfEndPoint
Value: ${CodeBucket.Output.Endpoint.Publish}
- Key: OSSConfObjectSignUrlTimeOut
Value: '1200'
Triggers:
- Name: RunCodeMain
Type: HTTP
Parameters:
AuthType: ANONYMOUS
Methods:
- GET
- POST
- PUT
Domains:
- Domain: Auto
执行器函数:
ServerlessBookRunCodeCompiler:
Component: fc
Provider: alibaba
Access: ${Global.Access}
Extends:
deploy:
- Hook: s ServerlessBookRunCodeCompiler install docker
Src: ./compiler
Pre: true
Properties:
Region: ${Global.Region}
Service: ${Global.Service}
Function:
Name: serverless_runcode_compiler
Description: 代码执行器
CodeUri: ./compiler
Handler: index.handler
MemorySize: 128
Runtime: python3
Timeout: 60
Environment: ${ServerlessBookRunCodeMain.Properties.Function.Environment}
Triggers:
- Name: OSSTrigger
Type: OSS
Parameters:
Bucket: ${CodeBucket.Output.Bucket}
Events:
- 'oss:ObjectCreated:*'
Filter:
Prefix: ''
Suffix: ''
在函数部分的配置文档中,可以看到有一个叫做Extends的字段,这个字段实际上是Serverless Devs的行为描述字段,通过该字段可以进行行为操作,例如,在上面的例子中,这部分为:
Extends:
deploy:
- Hook: s ServerlessBookRunCodeCompiler install docker
Src: ./compiler
Pre: true
实际含义是,在项目进行deploy操作之前,在./compiler目录下,执行指令:
$ s ServerlessBookRunCodeCompiler install docker
当项目描述文档编写完成之后,可以通过:
$ s deploy
进行项目部署,系统会先对项目进行分析:
检测到您有以下项目 < CodeBucket,TargetBucket,ServerlessBookRunCodeMain,ServerlessBookRunCodeCompiler > 要执行
分析完成,就会自动进入部署阶段,稍等片刻,即可完成部署。
接口测试
同“简单的在线代码执行器”部分一样,在这部分也将会通过PostMan对接口进行测试。
测试代码:
import time
print('hello world')
time.sleep(10)
tempInput = input('please: ')
print('Input data: ', tempInput)
当我们通过PostMan发起请求执行这段代码之后:
我们可以看到系统会返回给我们一个token,该token将会作为我们整个请求任务的Id,此时,我们可以通过获取输出结果的接口,来获取结果:
由于代码中有:
time.sleep(10)
所以,迅速获得结果的时候是看不到后半部分的输出结果,我们可以设置一个轮训任务,不断通过该token对接口进行刷新:
我们可以看到,10秒钟后,代码执行到了输入部分:
tempInput = input('please: ')
此时,我们再通过输入接口,进行输入操作:
完成之后,我们可以看到输入成功(result: ok)的结果,此时,我们继续刷新之前的获取结果部分的请求:
可以看到,我们已经获得到了所有结果的输出。
相对于前一部分的结构,这种“更贴近本地的代码执行器“变得复杂了很多,但是在实际使用的过程中,却可以更好的模拟出本地执行代码时的一些现象,例如代码的休眠、阻塞、内容的输出等。
总结
无论是简单的在线代码执行器部分,还是更贴近“本地”的代码执行器部分,这篇文章在所应用的内容是相对广泛的。通过这篇文章你可以看到:
- HTTP触发器的基本使用方法;对象存储触发器的基本使用方;
- 函数计算组件、对象存储组件的基本使用方法,组件间依赖的实现方法;
同时,通过这篇文章,也可以从一个侧面看到这样一个常见问题的简单解答:我有一个项目,我是每个接口一个函数,还是多个接口复用一个函数?
针对这个问题,其实最主要的是看业务本身的诉求,如果多个接口表达的含义是一致的,或者是同类的,类似的,并且多个接口的资源消耗是类似的,那么放在一个函数中来通过不同的路径进行区分是完全可以的;如果出现资源消耗差距较大,或者函数类型、规模、类别区别过大的时候,将多个接口放在多个函数下也是没有问题的。
本文实际上是抛砖引玉,无论是OJ系统的“判题机”部分,还是在线编程工具的“执行器部分”,都可以很好的和Serverless架构有着比较有趣的结合点。这种结合点不仅仅可以解决传统在线编程所头疼的事情(安全问题,资源消耗问题,并发问题,流量不稳定问题),更可以将Serverless的价值在一个新的领域发挥出来