1. 前言
随着计算机科学与技术的发展,越来越多的人开始接触编程,也有越来越多的在线编程平台诞生。以Python语言的在线编程平台为例,关于Python语言的在线编程平台大致分为两类,一类是OJ类型的,即在线评测的编程平台,另一类则是学习、工具类的在线编程平台,例如Anycodes在线编程等网站。
但是,无论是那种类型,其背后的核心模块“代码执行器”/“判题机”等,都是值得被关注的。一方面,这类网站通常情况下都需要比要严格的“安全考虑”,例如程序会不会有恶意代码,出现死循环、破坏计算机系统等,程序是否需要隔离运行,运行时是否会获取到其他人提交的代码等;另一方面,这类平台通常情况下都会对资源消耗比较大,尤其是比赛来临时,更是需要突然间对相关机器进行扩容,必要时需要大规模集群来进行应对。同时这类网站通常情况下也都有一个比较大的特点,那就是触发式,即每个代码执行前后实际上并没有非常紧密的前后文关系等。
综合上面的问题和特点,结合Serverless架构,本文将会通过“Serverless角度”,实现简单的Python语言的在线编程能力,并对其进行进一步探索。
2. 简单的在线代码执行器
2.1 基本流程
这一部分将会Python语言实现一个简单的在线代码执行工具,该工具拥有以下特点:
可以执行Python代码;
支持标准输入;
可以返回标准输出和标准错误。
该简单工具的整个过程包括:
2.2 代码实现
其中执行代码部分,我们可以通过一些常见的Python依赖库,例如subprocess来进行实现:
child = subprocess.Popen("python %s" % (fileName),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True)
output = child.communicate(input=input_data.encode("utf-8"))
print(output)
在函数计算中(无论阿里云函数计算还是腾讯云云函数),通常情况下,再不进行硬盘挂载的前提下,只有/tmp/目录是有可写入权限的,所以函数计算部分,可以将代码写入临时目录/tmp/,并进行代码运行,获得结果:
# -*- coding: utf-8 -*-
import os
import json
import uuid
import random
import subprocess
# 随机字符串
randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))
# Response
class Response:
def __init__(self, start_response, response, errorCode=None):
self.start = start_response
responseBody = {
'Error': {"Code": errorCode, "Message": response},
} if errorCode else {
'Response': response
}
# 默认增加uuid,便于后期定位
responseBody['ResponseId'] = str(uuid.uuid1())
print("Response: ", json.dumps(responseBody))
self.response = json.dumps(responseBody)
def __iter__(self):
status = '200'
response_headers = [('Content-type', 'application/json; charset=UTF-8')]
self.start(status, response_headers)
yield self.response.encode("utf-8")
def WriteCode(code, fileName):
try:
with open(fileName, "w") as f:
f.write(code)
return True
except Exception as e:
print(e)
return False
def RunCode(fileName, input_data=""):
child = subprocess.Popen("python %s" % (fileName),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True)
output = child.communicate(input=input_data.encode("utf-8"))
print(output)
return output[0].decode("utf-8")
def handler(environ, start_response):
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))
code = requestBody.get("code", None)
inputData = requestBody.get("input", "")
fileName = "/tmp/" + randomStr(5)
if code and WriteCode(code, fileName):
output = RunCode(fileName, inputData)
responseData = output
else:
responseData = "Error"
print(responseData)
return Response(start_response, {"result": responseData})
2.3 部署上线
当代码编写完成,可以直接通过Serverless-Devs工具进行项目部署,首先编写s.yaml文件:
edition: 1.0.0
name: web-framework-app
services:
ServerlessBookRunCodeDemo: # 业务名称/模块名称
component: fc
props: # 组件的属性值
region: cn-beijing
service:
name: ServerlessBook
description: Serverless图书案例
logConfig:
project: aliyun-fc-cn-beijing-9bd4a65b-c9d1-529d-9ce0-c91adfb823ab
logStore: function-log
function:
name: serverless_run_code_ordinary
description: 简单版代码在线执行器
codeUri: ./
handler: index.handler
memorySize: 128
runtime: python3
timeout: 5
triggers:
- name: httpTrigger
type: http
config:
authType: anonymous
methods:
- GET
customDomains:
- domainName: auto
protocol: HTTP
routeConfigs:
- path: '/*'
完成编写之后,可以通过:
s deploy
进行项目部署:
完成项目部署之后,可以通过:
s logs -t
对日志系统进行监听,当函数被触发时,可以看到我们的一些日志内容。
2.4 接口测试
接下来,可以通过PostMan进行接口测试,以Python语言的输出语句为例:
print('HELLO WORLD')
测试结果:
可以看到,系统是可以正常输出我们的预期结果:HELLO WORLD,当我们将该部分代码进行破坏:
print('HELLO WORLD)
再次执行,可以看到错误结果:
当我们将带有输入内容的代码传入:
tempInput = input('please input: ')
print('Output: ', tempInput)
并输入“serverless devs”,可以看到:
至此我们完成了一个非常简单的在线代码执行的接口。针对这个接口,是有蛮大的优化空间的:
超时时间的处理;
代码执行完成,可以进行清理。
当然,通过这个接口也可以看到这样一个问题:那就是代码执行过程中是阻塞的,我们没办法进行持续性的输入,就算是想要输入内容也是需要将代码和输入内容一并发送到服务端。这种模式和目前市面上常见的OJ模式很类似。所以,针对上述的简单的在线代码执行器而言,是可以进行部分完善与OJ系统进行结合使用。
3. 更贴近“本地”的代码执行器
3.1 本地代码执行与OJ模式的区别
众所周知,我们在本地执行以下代码:
import time
print("hello world")
time.sleep(10)
tempInput = input("please: ")
print("Input data: ", tempInput)
实际表现是:
系统输出hello world;
系统等待10秒;
系统提醒我们please,我们此时可以输入一个字符串;
系统输出Input data以及我们刚刚输入的字符串。
但是,这段代码如果应用于传统OJ或者上部分我们所实现的简单的在线代码执行器中,表现则有一些不同:
代码与我们要输入内容一同传给系统;
系统等待10秒;
输出hello world、please,以及最后输Input data和我们输入的内容;
可以看到,OJ上的体验效果和本地是有非常大的差距的。
3.2 新执行器的流程
为了减少这种体验不统一的问题,可以将代码和结构进行进一步的升级:
在整个项目中,包括了两个函数,两个存储桶:
业务逻辑函数:该函数的主要操作是业务逻辑,包括创建代码执行的任务(通过对象存储触发器进行异步函数执行),以及获取函数输出结果以及对任务函数的标准输入进行相关操作等;
执行器函数:该函数的主要作用是执行用户的函数代码,这部分是通过对象存储触发,通过下载代码、执行代码、获取输入、输出结果等;代码获取从代码存储桶,输出结果和获取输入从业务存储桶;
代码存储桶:该存储桶的作用是存储代码,当用户发起运行代码的请求, 业务逻辑函数收到用户代码后,会将代码存储到该存储桶,再由该存储桶处罚异步任务;
业务存储桶:该存储桶的作用是中间量的输出,主要包括输出内容的缓存、输入内容的缓存;该部分数据可以通过对象存储的本身特性进行生命周期的制定。
3.3 代码实现
为了让代码在线执行起来,更加贴近本地体验,该方案的代码分为两个函数:
函数:业务逻辑函数
# -*- coding: utf-8 -*-
import os
import oss2
import json
import uuid
import random
# 基本配置信息
AccessKey = {
"id": os.environ.get('AccessKeyId'),
"secret": os.environ.get('AccessKeySecret')
}
OSSCodeConf = {
'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketCodeName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
OSSTargetConf = {
'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketTargetName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
# 获取获取/上传文件到OSS的临时地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])
# 随机字符串
randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))
# Response
class Response:
def __init__(self, start_response, response, errorCode=None):
self.start = start_response
responseBody = {
'Error': {"Code": errorCode, "Message": response},
} if errorCode else {
'Response': response
}
# 默认增加uuid,便于后期定位
responseBody['ResponseId'] = str(uuid.uuid1())
print("Response: ", json.dumps(responseBody))
self.response = json.dumps(responseBody)
def __iter__(self):
status = '200'
response_headers = [('Content-type', 'application/json; charset=UTF-8')]
self.start(status, response_headers)
yield self.response.encode("utf-8")
def handler(environ, start_response):
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))
reqType = requestBody.get("type", None)
if reqType == "run":
# 运行代码
code = requestBody.get("code", None)
runId = randomStr(10)
codeBucket.put_object(runId, code.encode("utf-8"))
responseData = runId
elif reqType == "input":
# 输入内容
inputData = requestBody.get("input", None)
runId = requestBody.get("id", None)
targetBucket.put_object(runId + "-input", inputData.encode("utf-8"))
responseData = 'ok'
elif reqType == "output":
# 获取结果
runId = requestBody.get("id", None)
targetBucket.get_object_to_file(runId + "-output", '/tmp/' + runId)
with open('/tmp/' + runId) as f:
responseData = f.read()
else:
responseData = "Error"
print(responseData)
return Response(start_response, {"result": responseData})
函数:执行器函数
# -*- coding: utf-8 -*-
import os
import re
import oss2
import json
import time
import pexpect
# 基本配置信息
AccessKey = {
"id": os.environ.get('AccessKeyId'),
"secret": os.environ.get('AccessKeySecret')
}
OSSCodeConf = {
'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketCodeName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
OSSTargetConf = {
'endPoint': os.environ.get('OSSConfEndPoint'),
'bucketName': os.environ.get('OSSConfBucketTargetName'),
'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}
# 获取获取/上传文件到OSS的临时地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])
def handler(event, context):
event = json.loads(event.decode("utf-8"))
for eveEvent in event["events"]:
# 获取object
print("获取object")
code = eveEvent["oss"]["object"]["key"]
localFileName = "/tmp/" + event["events"][0]["oss"]["object"]["eTag"]
# 下载图片
print("下载代码")
print("code: ", code)
codeBucket.get_object_to_file(code, localFileName)
# 执行代码
foo = pexpect.spawn('python %s' % localFileName)
outputData = ""
startTime = time.time()
# timeout可以通过文件名来进行识别
try:
timeout = int(re.findall("timeout(.*?)s", code)[0])
except:
timeout = 60
while (time.time() - startTime) / 1000 <= timeout:
try:
tempOutput = foo.read_nonblocking(size=999999, timeout=0.01)
tempOutput = tempOutput.decode("utf-8", "ignore")
if len(str(tempOutput)) > 0:
outputData = outputData + tempOutput
# 输出数据存入oss
targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
except Exception as e:
print("Error: ", e)
# 有输入请求被阻塞
if str(e) == "Timeout exceeded.":
try:
# 从oss读取数据
targetBucket.get_object_to_file(code + "-input", localFileName + "-input")
targetBucket.delete_object(code + "-input")
with open(localFileName + "-input") as f:
inputData = f.read()
if inputData:
foo.sendline(inputData)
except:
pass
# 程序执行完成输出
elif "End Of File (EOF)" in str(e):
targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
return True
# 程序抛出异常
else:
outputData = outputData + "\n\nException: %s" % str(e)
targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
return False
3.4 部署上线
可以通过Serverless Devs将代码部署到线上,设定全局变量:
vars:
region: cn-beijing
AccessKeyId: ${Env(AccessKeyId)}
AccessKeySecret: ${Env(AccessKeySecret)}
OSSConfBucketCodeName: serverlessbook-runcode-code
OSSConfBucketTargetName: serverlessbook-runcode-others
CodeBucketEndpoint: ******
OSSConfObjectSignUrlTimeOut: 1200
service:
name: ServerlessBook
description: Serverless图书案例
logConfig:
project: aliyun-fc-cn-beijing-9bd4a65b-c9d1-529d-9ce0-c91adfb823ab
logStore: function-log
业务逻辑函数:
ServerlessBookRunCodeMain:
component: fc
actions:
pre-deploy:
- run: s ServerlessBookRunCodeMain build --use-docker
path: ./main
props:
region: ${vars.region}
service: ${vars.service}
function:
name: serverless_runcode_main
description: 业务逻辑
codeUri: ./main
handler: index.handler
memorySize: 128
runtime: python3
timeout: 5
environmentVariables:
AccessKeyId: ${vars.AccessKeyId}
AccessKeySecret: ${vars.AccessKeySecret}
OSSConfBucketCodeName: ${vars.OSSConfBucketCodeName}
OSSConfBucketTargetName: ${vars.OSSConfBucketTargetName}
OSSConfEndPoint: ${vars.CodeBucketEndpoint}
OSSConfObjectSignUrlTimeOut: 1200
triggers:
- name: httpTrigger
type: http
config:
authType: anonymous
methods:
- GET
customDomains:
- domainName: auto
protocol: HTTP
routeConfigs:
- path: '/*'
执行器函数:
ServerlessBookRunCodeCompiler:
component: fc
actions:
pre-deploy:
- run: s ServerlessBookRunCodeCompiler build --use-docker
path: ./compiler
Properties:
region: ${vars.region}
service: ${vars.service}
function:
name: serverless_runcode_compiler
description: 代码执行器
codeUri: ./compiler
handler: index.handler
memorySize: 128
runtime: python3
timeout: 60
environmentVariables: ${ServerlessBookRunCodeMain.props.function.environmentVariables}
Triggers:
- Name: OSSTrigger
Type: OSS
Parameters:
Bucket:
Events:
- 'oss:ObjectCreated:*'
Filter:
Prefix: ''
Suffix: ''
triggers:
- name: ossTrigger
type: oss
config:
bucketName: ${vars.OSSConfBucketCodeName}
events:
- 'oss:ObjectCreated:*'
filter:
Key:
Prefix: ''
Suffix: ''
在函数部分的配置文档中,可以看到有一个叫做Extends的字段,这个字段实际上是Serverless Devs的行为描述字段,通过该字段可以进行行为操作,例如,在上面的例子中,这部分为:
actions:
pre-deploy:
- run: s ServerlessBookRunCodeCompiler build --use-docker
path: ./compiler
实际含义是,在项目进行deploy操作之前,在./compiler目录下,执行指令:
$ s ServerlessBookRunCodeCompiler install --use-docker
当项目描述文档编写完成之后,可以通过:
$ s deploy
进行项目部署。
3.5 接口测试
同“简单的在线代码执行器”部分一样,在这部分也将会通过PostMan对接口进行测试。
测试代码:
import time
print('hello world')
time.sleep(10)
tempInput = input('please: ')
print('Input data: ', tempInput)
当我们通过PostMan发起请求执行这段代码之后:
我们可以看到系统会返回给我们一个token,该token将会作为我们整个请求任务的Id,此时,我们可以通过获取输出结果的接口,来获取结果:
由于代码中有:
time.sleep(10)
所以,迅速获得结果的时候是看不到后半部分的输出结果,我们可以设置一个轮训任务,不断通过该token对接口进行刷新:
我们可以看到,10秒钟后,代码执行到了输入部分:
tempInput = input('please: ')
此时,我们再通过输入接口,进行输入操作:
完成之后,我们可以看到输入成功(result: ok)的结果,此时,我们继续刷新之前的获取结果部分的请求:
可以看到,我们已经获得到了所有结果的输出。
相对于前一部分的结构,这种“更贴近本地的代码执行器“变得复杂了很多,但是在实际使用的过程中,却可以更好的模拟出本地执行代码时的一些现象,例如代码的休眠、阻塞、内容的输出等。
4. 总结
无论是简单的在线代码执行器部分,还是更贴近“本地”的代码执行器部分,这篇文章在所应用的内容是相对广泛的。通过这篇文章你可以看到:
HTTP触发器的基本使用方法;对象存储触发器的基本使用方;
函数计算组件、对象存储组件的基本使用方法,组件间依赖的实现方法;
同时,通过这篇文章,也可以从一个侧面看到这样一个常见问题的简单解答:我有一个项目,我是每个接口一个函数,还是多个接口复用一个函数?
针对这个问题,其实最主要的是看业务本身的诉求,如果多个接口表达的含义是一致的,或者是同类的,类似的,并且多个接口的资源消耗是类似的,那么放在一个函数中来通过不同的路径进行区分是完全可以的;如果出现资源消耗差距较大,或者函数类型、规模、类别区别过大的时候,将多个接口放在多个函数下也是没有问题的。
本文实际上是抛砖引玉,无论是OJ系统的“判题机”部分,还是在线编程工具的“执行器部分”,都可以很好的和Serverless架构有着比较有趣的结合点。这种结合点不仅仅可以解决传统在线编程所头疼的事情(安全问题,资源消耗问题,并发问题,流量不稳定问题),更可以将Serverless的价值在一个新的领域发挥出来。